messages = context.getMsgList();
+ Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size());
+ // 设置租户编号
+ String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID);
+ if (StrUtil.isNotEmpty(tenantId)) {
+ TenantContextHolder.setTenantId(Long.parseLong(tenantId));
+ }
+ }
+
+ @Override
+ public void consumeMessageAfter(ConsumeMessageContext context) {
+ TenantContextHolder.clear();
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java
new file mode 100644
index 0000000000..7f12ac5205
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java
@@ -0,0 +1,53 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
+import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+
+/**
+ * 多租户的 RocketMQ 初始化器
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQInitializer implements BeanPostProcessor {
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ if (bean instanceof DefaultRocketMQListenerContainer) {
+ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
+ initTenantConsumer(container.getConsumer());
+ } else if (bean instanceof RocketMQTemplate) {
+ RocketMQTemplate template = (RocketMQTemplate) bean;
+ initTenantProducer(template.getProducer());
+ }
+ return bean;
+ }
+
+ private void initTenantProducer(DefaultMQProducer producer) {
+ if (producer == null) {
+ return;
+ }
+ DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl();
+ if (producerImpl == null) {
+ return;
+ }
+ producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook());
+ }
+
+ private void initTenantConsumer(DefaultMQPushConsumer consumer) {
+ if (consumer == null) {
+ return;
+ }
+ DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl();
+ if (consumerImpl == null) {
+ return;
+ }
+ consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook());
+ }
+
+}
\ No newline at end of file
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java
new file mode 100644
index 0000000000..4f0307465f
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java
@@ -0,0 +1,36 @@
+package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类
+ *
+ * Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中
+ *
+ * @author 芋道源码
+ */
+public class TenantRocketMQSendMessageHook implements SendMessageHook {
+
+ @Override
+ public String hookName() {
+ return getClass().getSimpleName();
+ }
+
+ @Override
+ public void sendMessageBefore(SendMessageContext sendMessageContext) {
+ Long tenantId = TenantContextHolder.getTenantId();
+ if (tenantId == null) {
+ return;
+ }
+ sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString());
+ }
+
+ @Override
+ public void sendMessageAfter(SendMessageContext sendMessageContext) {
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java
new file mode 100644
index 0000000000..059d8f97fe
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2002-2021 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.messaging.handler.invocation;
+
+import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
+import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils;
+import org.springframework.core.DefaultParameterNameDiscoverer;
+import org.springframework.core.MethodParameter;
+import org.springframework.core.ParameterNameDiscoverer;
+import org.springframework.core.ResolvableType;
+import org.springframework.lang.Nullable;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.handler.HandlerMethod;
+import org.springframework.util.ObjectUtils;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+import java.util.Arrays;
+
+import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
+
+/**
+ * Extension of {@link HandlerMethod} that invokes the underlying method with
+ * argument values resolved from the current HTTP request through a list of
+ * {@link HandlerMethodArgumentResolver}.
+ *
+ * 针对 rabbitmq-spring 和 kafka-spring,不存在合适的拓展点,可以实现 Consumer 消费前,读取 Header 中的 tenant-id 设置到 {@link TenantContextHolder} 中
+ * TODO 芋艿:持续跟进,看看有没新的拓展点
+ *
+ * @author Rossen Stoyanchev
+ * @author Juergen Hoeller
+ * @since 4.0
+ */
+public class InvocableHandlerMethod extends HandlerMethod {
+
+ private static final Object[] EMPTY_ARGS = new Object[0];
+
+ private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite();
+
+ private ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer();
+
+ /**
+ * Create an instance from a {@code HandlerMethod}.
+ */
+ public InvocableHandlerMethod(HandlerMethod handlerMethod) {
+ super(handlerMethod);
+ }
+
+ /**
+ * Create an instance from a bean instance and a method.
+ */
+ public InvocableHandlerMethod(Object bean, Method method) {
+ super(bean, method);
+ }
+
+ /**
+ * Construct a new handler method with the given bean instance, method name and parameters.
+ * @param bean the object bean
+ * @param methodName the method name
+ * @param parameterTypes the method parameter types
+ * @throws NoSuchMethodException when the method cannot be found
+ */
+ public InvocableHandlerMethod(Object bean, String methodName, Class>... parameterTypes)
+ throws NoSuchMethodException {
+
+ super(bean, methodName, parameterTypes);
+ }
+
+ /**
+ * Set {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers} to use for resolving method argument values.
+ */
+ public void setMessageMethodArgumentResolvers(HandlerMethodArgumentResolverComposite argumentResolvers) {
+ this.resolvers = argumentResolvers;
+ }
+
+ /**
+ * Set the ParameterNameDiscoverer for resolving parameter names when needed
+ * (e.g. default request attribute name).
+ * Default is a {@link DefaultParameterNameDiscoverer}.
+ */
+ public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) {
+ this.parameterNameDiscoverer = parameterNameDiscoverer;
+ }
+
+ /**
+ * Invoke the method after resolving its argument values in the context of the given message.
+ *
Argument values are commonly resolved through
+ * {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers}.
+ * The {@code providedArgs} parameter however may supply argument values to be used directly,
+ * i.e. without argument resolution.
+ *
Delegates to {@link #getMethodArgumentValues} and calls {@link #doInvoke} with the
+ * resolved arguments.
+ * @param message the current message being processed
+ * @param providedArgs "given" arguments matched by type, not resolved
+ * @return the raw value returned by the invoked method
+ * @throws Exception raised if no suitable argument resolver can be found,
+ * or if the method raised an exception
+ * @see #getMethodArgumentValues
+ * @see #doInvoke
+ */
+ @Nullable
+ public Object invoke(Message> message, Object... providedArgs) throws Exception {
+ Object[] args = getMethodArgumentValues(message, providedArgs);
+ if (logger.isTraceEnabled()) {
+ logger.trace("Arguments: " + Arrays.toString(args));
+ }
+ // 注意:如下是本类的改动点!!!
+ // 情况一:无租户编号的情况
+ Long tenantId= parseTenantId(message);
+ if (tenantId == null) {
+ return doInvoke(args);
+ }
+ // 情况二:有租户的情况下
+ return TenantUtils.execute(tenantId, () -> doInvoke(args));
+ }
+
+ private Long parseTenantId(Message> message) {
+ Object tenantId = message.getHeaders().get(HEADER_TENANT_ID);
+ if (tenantId == null) {
+ return null;
+ }
+ if (tenantId instanceof Long) {
+ return (Long) tenantId;
+ }
+ if (tenantId instanceof Number) {
+ return ((Number) tenantId).longValue();
+ }
+ if (tenantId instanceof String) {
+ return Long.parseLong((String) tenantId);
+ }
+ if (tenantId instanceof byte[]) {
+ return Long.parseLong(new String((byte[]) tenantId));
+ }
+ throw new IllegalArgumentException("未知的数据类型:" + tenantId);
+ }
+
+ /**
+ * Get the method argument values for the current message, checking the provided
+ * argument values and falling back to the configured argument resolvers.
+ *
The resulting array will be passed into {@link #doInvoke}.
+ * @since 5.1.2
+ */
+ protected Object[] getMethodArgumentValues(Message> message, Object... providedArgs) throws Exception {
+ MethodParameter[] parameters = getMethodParameters();
+ if (ObjectUtils.isEmpty(parameters)) {
+ return EMPTY_ARGS;
+ }
+
+ Object[] args = new Object[parameters.length];
+ for (int i = 0; i < parameters.length; i++) {
+ MethodParameter parameter = parameters[i];
+ parameter.initParameterNameDiscovery(this.parameterNameDiscoverer);
+ args[i] = findProvidedArgument(parameter, providedArgs);
+ if (args[i] != null) {
+ continue;
+ }
+ if (!this.resolvers.supportsParameter(parameter)) {
+ throw new MethodArgumentResolutionException(
+ message, parameter, formatArgumentError(parameter, "No suitable resolver"));
+ }
+ try {
+ args[i] = this.resolvers.resolveArgument(parameter, message);
+ }
+ catch (Exception ex) {
+ // Leave stack trace for later, exception may actually be resolved and handled...
+ if (logger.isDebugEnabled()) {
+ String exMsg = ex.getMessage();
+ if (exMsg != null && !exMsg.contains(parameter.getExecutable().toGenericString())) {
+ logger.debug(formatArgumentError(parameter, exMsg));
+ }
+ }
+ throw ex;
+ }
+ }
+ return args;
+ }
+
+ /**
+ * Invoke the handler method with the given argument values.
+ */
+ @Nullable
+ protected Object doInvoke(Object... args) throws Exception {
+ try {
+ return getBridgedMethod().invoke(getBean(), args);
+ }
+ catch (IllegalArgumentException ex) {
+ assertTargetBean(getBridgedMethod(), getBean(), args);
+ String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument");
+ throw new IllegalStateException(formatInvokeError(text, args), ex);
+ }
+ catch (InvocationTargetException ex) {
+ // Unwrap for HandlerExceptionResolvers ...
+ Throwable targetException = ex.getTargetException();
+ if (targetException instanceof RuntimeException) {
+ throw (RuntimeException) targetException;
+ }
+ else if (targetException instanceof Error) {
+ throw (Error) targetException;
+ }
+ else if (targetException instanceof Exception) {
+ throw (Exception) targetException;
+ }
+ else {
+ throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException);
+ }
+ }
+ }
+
+ MethodParameter getAsyncReturnValueType(@Nullable Object returnValue) {
+ return new AsyncResultMethodParameter(returnValue);
+ }
+
+ private class AsyncResultMethodParameter extends HandlerMethodParameter {
+
+ @Nullable
+ private final Object returnValue;
+
+ private final ResolvableType returnType;
+
+ public AsyncResultMethodParameter(@Nullable Object returnValue) {
+ super(-1);
+ this.returnValue = returnValue;
+ this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric();
+ }
+
+ protected AsyncResultMethodParameter(AsyncResultMethodParameter original) {
+ super(original);
+ this.returnValue = original.returnValue;
+ this.returnType = original.returnType;
+ }
+
+ @Override
+ public Class> getParameterType() {
+ if (this.returnValue != null) {
+ return this.returnValue.getClass();
+ }
+ if (!ResolvableType.NONE.equals(this.returnType)) {
+ return this.returnType.toClass();
+ }
+ return super.getParameterType();
+ }
+
+ @Override
+ public Type getGenericParameterType() {
+ return this.returnType.getType();
+ }
+
+ @Override
+ public AsyncResultMethodParameter clone() {
+ return new AsyncResultMethodParameter(this);
+ }
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000000..a495842a0a
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,2 @@
+org.springframework.boot.env.EnvironmentPostProcessor=\
+ cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor
diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/test/java/cn/iocoder/yudao/framework/tenant/core/job/TestJob.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/test/java/cn/iocoder/yudao/framework/tenant/core/job/TestJob.java
deleted file mode 100644
index 2a6d200c4a..0000000000
--- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/test/java/cn/iocoder/yudao/framework/tenant/core/job/TestJob.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package cn.iocoder.yudao.framework.tenant.core.job;
-
-import cn.hutool.core.collection.CollUtil;
-import cn.iocoder.yudao.framework.quartz.core.handler.JobHandler;
-import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-@Component
-public class TestJob implements JobHandler {
-
- private final List tenantIds = new CopyOnWriteArrayList<>();
-
- @Override
- @TenantJob // 标记多租户
- public String execute(String param) throws Exception {
- tenantIds.add(TenantContextHolder.getTenantId());
- return "success";
- }
-
- public List getTenantIds() {
- CollUtil.sort(tenantIds, Long::compareTo);
- return tenantIds;
- }
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-job/src/main/java/cn/iocoder/yudao/framework/quartz/core/service/JobLogFrameworkService.java b/yudao-framework/yudao-spring-boot-starter-job/src/main/java/cn/iocoder/yudao/framework/quartz/core/service/JobLogFrameworkService.java
index 889921dfd9..418dbfcd62 100644
--- a/yudao-framework/yudao-spring-boot-starter-job/src/main/java/cn/iocoder/yudao/framework/quartz/core/service/JobLogFrameworkService.java
+++ b/yudao-framework/yudao-spring-boot-starter-job/src/main/java/cn/iocoder/yudao/framework/quartz/core/service/JobLogFrameworkService.java
@@ -40,5 +40,4 @@ public interface JobLogFrameworkService {
@NotNull(message = "结束时间不能为空") LocalDateTime endTime,
@NotNull(message = "运行时长不能为空") Integer duration,
boolean success, String result);
-
}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
index 75303d4e35..c8972f16b2 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
+++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml
@@ -12,7 +12,7 @@
jar
${project.artifactId}
- 消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费
+ 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
https://github.com/YunaiV/ruoyi-vue-pro
@@ -21,6 +21,23 @@
cn.iocoder.boot
yudao-spring-boot-starter-redis
+
+
+
+ org.springframework.kafka
+ spring-kafka
+ true
+
+
+ org.springframework.amqp
+ spring-rabbit
+ true
+
+
+ org.apache.rocketmq
+ rocketmq-spring-boot-starter
+ true
+
-
+
\ No newline at end of file
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java
deleted file mode 100644
index fbc2a2826d..0000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.pubsub;
-
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Redis Channel Message 抽象类
- *
- * @author 芋道源码
- */
-public abstract class AbstractChannelMessage extends AbstractRedisMessage {
-
- /**
- * 获得 Redis Channel
- *
- * @return Channel
- */
- @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
- public abstract String getChannel();
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java
deleted file mode 100644
index 29ea833f34..0000000000
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package cn.iocoder.yudao.framework.mq.core.stream;
-
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-/**
- * Redis Stream Message 抽象类
- *
- * @author 芋道源码
- */
-public abstract class AbstractStreamMessage extends AbstractRedisMessage {
-
- /**
- * 获得 Redis Stream Key
- *
- * @return Channel
- */
- @JsonIgnore // 避免序列化
- public abstract String getStreamKey();
-
-}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
index 48eaf23861..3b716cb774 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java
@@ -1,6 +1,4 @@
/**
- * 消息队列,基于 Redis 提供:
- * 1. 基于 Pub/Sub 实现广播消费
- * 2. 基于 Stream 实现集群消费
+ * 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种
*/
package cn.iocoder.yudao.framework.mq;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java
new file mode 100644
index 0000000000..770c50ff7d
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java
@@ -0,0 +1,29 @@
+package cn.iocoder.yudao.framework.mq.rabbitmq.config;
+
+import cn.hutool.core.util.ReflectUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.utils.SerializationUtils;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+
+import java.lang.reflect.Field;
+
+/**
+ * RabbitMQ 消息队列配置类
+ *
+ * @author 芋道源码
+ */
+@AutoConfiguration
+@Slf4j
+@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate")
+public class YudaoRabbitMQAutoConfiguration {
+
+ static {
+ // 强制设置 SerializationUtils 的 TRUST_ALL 为 true,避免 RabbitMQ Consumer 反序列化消息报错
+ // 为什么不通过设置 spring.amqp.deserialization.trust.all 呢?因为可能在 SerializationUtils static 初始化后
+ Field trustAllField = ReflectUtil.getField(SerializationUtils.class, "TRUST_ALL");
+ ReflectUtil.removeFinalModify(trustAllField);
+ ReflectUtil.setFieldValue(SerializationUtils.class, trustAllField, true);
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java
new file mode 100644
index 0000000000..2773b58281
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * 占位符,无特殊逻辑
+ */
+package cn.iocoder.yudao.framework.mq.rabbitmq.core;
\ No newline at end of file
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java
new file mode 100644
index 0000000000..9f6032c925
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * 消息队列,基于 RabbitMQ 提供
+ */
+package cn.iocoder.yudao.framework.mq.rabbitmq;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java
similarity index 77%
rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java
index e300b1ad52..bbc63b719e 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java
@@ -1,21 +1,20 @@
-package cn.iocoder.yudao.framework.mq.config;
+package cn.iocoder.yudao.framework.mq.redis.config;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.system.SystemUtil;
import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
-import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisServerCommands;
import org.springframework.data.redis.connection.stream.Consumer;
@@ -27,7 +26,6 @@ import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
-import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.annotation.EnableScheduling;
@@ -42,7 +40,7 @@ import java.util.Properties;
@Slf4j
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
-public class YudaoMQAutoConfiguration {
+public class YudaoRedisMQAutoConfiguration {
@Bean
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
@@ -59,10 +57,9 @@ public class YudaoMQAutoConfiguration {
* 创建 Redis Pub/Sub 广播消费的容器
*/
@Bean(initMethod = "start", destroyMethod = "stop")
- @ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
- @ConditionalOnProperty(prefix = "yudao.mq.redis.pubsub", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.pubsub.enable=false 禁用多租户
+ @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public RedisMessageListenerContainer redisMessageListenerContainer(
- RedisMQTemplate redisMQTemplate, List> listeners) {
+ RedisMQTemplate redisMQTemplate, List> listeners) {
// 创建 RedisMessageListenerContainer 对象
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 设置 RedisConnection 工厂。
@@ -81,9 +78,8 @@ public class YudaoMQAutoConfiguration {
* 创建 Redis Stream 重新消费的任务
*/
@Bean
- @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
- @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
- public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners,
+ @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
+ public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners,
RedisMQTemplate redisTemplate,
@Value("${spring.application.name}") String groupName,
RedissonClient redissonClient) {
@@ -92,14 +88,13 @@ public class YudaoMQAutoConfiguration {
/**
* 创建 Redis Stream 集群消费的容器
- *
- * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
+ *
+ * 基础知识:Redis Stream 的 xreadgroup 命令
*/
@Bean(initMethod = "start", destroyMethod = "stop")
- @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
- @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
+ @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
public StreamMessageListenerContainer> redisStreamMessageListenerContainer(
- RedisMQTemplate redisMQTemplate, List> listeners) {
+ RedisMQTemplate redisMQTemplate, List> listeners) {
RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate();
checkRedisVersion(redisTemplate);
// 第一步,创建 StreamMessageListenerContainer 容器
@@ -111,8 +106,7 @@ public class YudaoMQAutoConfiguration {
.build();
// 创建 container 对象
StreamMessageListenerContainer> container =
-// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
- DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
+ StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
// 第二步,注册监听器,消费对应的 Stream 主题
String consumerName = buildConsumerName();
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java
similarity index 80%
rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java
index 8a31feda7e..5755ffa517 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java
@@ -1,10 +1,10 @@
-package cn.iocoder.yudao.framework.mq.core;
+package cn.iocoder.yudao.framework.mq.redis.core;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
-import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.springframework.data.redis.connection.stream.RecordId;
@@ -35,7 +35,7 @@ public class RedisMQTemplate {
*
* @param message 消息
*/
- public void send(T message) {
+ public void send(T message) {
try {
sendMessageBefore(message);
// 发送消息
@@ -51,7 +51,7 @@ public class RedisMQTemplate {
* @param message 消息
* @return 消息记录的编号对象
*/
- public RecordId send(T message) {
+ public RecordId send(T message) {
try {
sendMessageBefore(message);
// 发送消息
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java
similarity index 79%
rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java
index 11d8e1337e..dbcee7fe25 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java
@@ -1,6 +1,6 @@
-package cn.iocoder.yudao.framework.mq.core.interceptor;
+package cn.iocoder.yudao.framework.mq.redis.core.interceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
/**
* {@link AbstractRedisMessage} 消息拦截器
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java
similarity index 93%
rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java
index ea0f53d192..b84f17c152 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java
@@ -1,8 +1,8 @@
-package cn.iocoder.yudao.framework.mq.job;
+package cn.iocoder.yudao.framework.mq.redis.core.job;
import cn.hutool.core.collection.CollUtil;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
@@ -33,7 +33,7 @@ public class RedisPendingMessageResendJob {
*/
private static final int EXPIRE_TIME = 5 * 60;
- private final List> listeners;
+ private final List> listeners;
private final RedisMQTemplate redisTemplate;
private final String groupName;
private final RedissonClient redissonClient;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java
similarity index 88%
rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java
index f02e89d6f9..ee40814ddd 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java
@@ -1,4 +1,4 @@
-package cn.iocoder.yudao.framework.mq.core.message;
+package cn.iocoder.yudao.framework.mq.redis.core.message;
import lombok.Data;
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java
new file mode 100644
index 0000000000..d5ea5b9d59
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java
@@ -0,0 +1,23 @@
+package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
+
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+/**
+ * Redis Channel Message 抽象类
+ *
+ * @author 芋道源码
+ */
+public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage {
+
+ /**
+ * 获得 Redis Channel,默认使用类名
+ *
+ * @return Channel
+ */
+ @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
+ public String getChannel() {
+ return getClass().getSimpleName();
+ }
+
+}
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java
similarity index 85%
rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java
rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java
index e7d737d1b6..fd7c910c95 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java
@@ -1,10 +1,10 @@
-package cn.iocoder.yudao.framework.mq.core.pubsub;
+package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
import cn.hutool.core.util.TypeUtil;
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
-import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
-import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
-import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
+import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
+import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
+import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
import lombok.Setter;
import lombok.SneakyThrows;
import org.springframework.data.redis.connection.Message;
@@ -20,7 +20,7 @@ import java.util.List;
*
* @author 芋道源码
*/
-public abstract class AbstractChannelMessageListener implements MessageListener {
+public abstract class AbstractRedisChannelMessageListener implements MessageListener {
/**
* 消息类型
@@ -37,7 +37,7 @@ public abstract class AbstractChannelMessageListener
+public abstract class AbstractRedisStreamMessageListener
implements StreamListener> {
/**
@@ -48,7 +48,7 @@ public abstract class AbstractStreamMessageListener> extends DefaultStreamMessageListenerContainer {
-
- /**
- * 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现
- */
- public static > StreamMessageListenerContainer create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions options) {
- Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!");
- Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!");
- return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options);
- }
-
- public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions containerOptions) {
- super(connectionFactory, containerOptions);
- }
-
- /**
- * 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现
- */
- @Override
- public Subscription register(StreamReadRequest streamRequest, StreamListener listener) {
- return this.doRegisterX(getReadTaskX(streamRequest, listener));
- }
-
- @SuppressWarnings("unchecked")
- private StreamPollTask getReadTaskX(StreamReadRequest streamRequest, StreamListener listener) {
- StreamPollTask task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener);
- // 修改 readFunction 方法
- Function> readFunction = (Function>) ReflectUtil.getFieldValue(task, "readFunction");
- ReflectUtil.setFieldValue(task, "readFunction", (Function>) readOffset -> {
- List records = readFunction.apply(readOffset);
- //【重点】保证 records 不是空,避免 NPE 的问题!!!
- return records != null ? records : Collections.emptyList();
- });
- return task;
- }
-
- private Subscription doRegisterX(Task task) {
- return ReflectUtil.invoke(this, "doRegister", task);
- }
-
-}
-
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
index c47aa4d7b2..6608654531 100644
--- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
+++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports
@@ -1 +1,2 @@
-cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration
\ No newline at end of file
+cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
+cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration
\ No newline at end of file
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md
new file mode 100644
index 0000000000..08586b3794
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md
new file mode 100644
index 0000000000..b66d6334c9
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md
new file mode 100644
index 0000000000..eff46e2f75
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md
new file mode 100644
index 0000000000..08586b3794
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md
@@ -0,0 +1 @@
+
diff --git a/yudao-framework/yudao-spring-boot-starter-mybatis/src/main/java/cn/iocoder/yudao/framework/mybatis/core/mapper/BaseMapperX.java b/yudao-framework/yudao-spring-boot-starter-mybatis/src/main/java/cn/iocoder/yudao/framework/mybatis/core/mapper/BaseMapperX.java
index 4811147b8a..d70c216260 100644
--- a/yudao-framework/yudao-spring-boot-starter-mybatis/src/main/java/cn/iocoder/yudao/framework/mybatis/core/mapper/BaseMapperX.java
+++ b/yudao-framework/yudao-spring-boot-starter-mybatis/src/main/java/cn/iocoder/yudao/framework/mybatis/core/mapper/BaseMapperX.java
@@ -7,6 +7,7 @@ import cn.iocoder.yudao.framework.mybatis.core.util.MyBatisUtils;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.toolkit.Db;
@@ -55,7 +56,7 @@ public interface BaseMapperX extends MPJBaseMapper {
}
default Long selectCount() {
- return selectCount(new QueryWrapper());
+ return selectCount(new QueryWrapper<>());
}
default Long selectCount(String field, Object value) {
diff --git a/yudao-framework/yudao-spring-boot-starter-mybatis/src/main/java/cn/iocoder/yudao/framework/mybatis/core/query/MPJLambdaWrapperX.java b/yudao-framework/yudao-spring-boot-starter-mybatis/src/main/java/cn/iocoder/yudao/framework/mybatis/core/query/MPJLambdaWrapperX.java
new file mode 100644
index 0000000000..7950a2f96f
--- /dev/null
+++ b/yudao-framework/yudao-spring-boot-starter-mybatis/src/main/java/cn/iocoder/yudao/framework/mybatis/core/query/MPJLambdaWrapperX.java
@@ -0,0 +1,313 @@
+package cn.iocoder.yudao.framework.mybatis.core.query;
+
+import cn.hutool.core.util.ArrayUtil;
+import cn.hutool.core.util.ObjectUtil;
+import cn.iocoder.yudao.framework.common.util.collection.ArrayUtils;
+import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
+import com.github.yulichang.toolkit.MPJWrappers;
+import com.github.yulichang.wrapper.MPJLambdaWrapper;
+import org.springframework.util.StringUtils;
+
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * 拓展 MyBatis Plus Join QueryWrapper 类,主要增加如下功能:
+ *
+ * 1. 拼接条件的方法,增加 xxxIfPresent 方法,用于判断值不存在的时候,不要拼接到条件中。
+ *
+ * @param 数据类型
+ */
+public class MPJLambdaWrapperX extends MPJLambdaWrapper {
+
+ public MPJLambdaWrapperX likeIfPresent(SFunction column, String val) {
+ MPJWrappers.lambdaJoin().like(column, val);
+ if (StringUtils.hasText(val)) {
+ return (MPJLambdaWrapperX) super.like(column, val);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX inIfPresent(SFunction column, Collection> values) {
+ if (ObjectUtil.isAllNotEmpty(values) && !ArrayUtil.isEmpty(values)) {
+ return (MPJLambdaWrapperX) super.in(column, values);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX inIfPresent(SFunction column, Object... values) {
+ if (ObjectUtil.isAllNotEmpty(values) && !ArrayUtil.isEmpty(values)) {
+ return (MPJLambdaWrapperX) super.in(column, values);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX eqIfPresent(SFunction column, Object val) {
+ if (ObjectUtil.isNotEmpty(val)) {
+ return (MPJLambdaWrapperX) super.eq(column, val);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX neIfPresent(SFunction column, Object val) {
+ if (ObjectUtil.isNotEmpty(val)) {
+ return (MPJLambdaWrapperX) super.ne(column, val);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX gtIfPresent(SFunction column, Object val) {
+ if (val != null) {
+ return (MPJLambdaWrapperX) super.gt(column, val);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX geIfPresent(SFunction column, Object val) {
+ if (val != null) {
+ return (MPJLambdaWrapperX) super.ge(column, val);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX ltIfPresent(SFunction column, Object val) {
+ if (val != null) {
+ return (MPJLambdaWrapperX) super.lt(column, val);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX leIfPresent(SFunction column, Object val) {
+ if (val != null) {
+ return (MPJLambdaWrapperX) super.le(column, val);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX betweenIfPresent(SFunction column, Object val1, Object val2) {
+ if (val1 != null && val2 != null) {
+ return (MPJLambdaWrapperX) super.between(column, val1, val2);
+ }
+ if (val1 != null) {
+ return (MPJLambdaWrapperX) ge(column, val1);
+ }
+ if (val2 != null) {
+ return (MPJLambdaWrapperX) le(column, val2);
+ }
+ return this;
+ }
+
+ public MPJLambdaWrapperX betweenIfPresent(SFunction column, Object[] values) {
+ Object val1 = ArrayUtils.get(values, 0);
+ Object val2 = ArrayUtils.get(values, 1);
+ return betweenIfPresent(column, val1, val2);
+ }
+
+ // ========== 重写父类方法,方便链式调用 ==========
+
+ @Override
+ public MPJLambdaWrapperX eq(boolean condition, SFunction column, Object val) {
+ super.eq(condition, column, val);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX eq(SFunction column, Object val) {
+ super.eq(column, val);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX orderByDesc(SFunction column) {
+ //noinspection unchecked
+ super.orderByDesc(true, column);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX last(String lastSql) {
+ super.last(lastSql);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX in(SFunction column, Collection> coll) {
+ super.in(column, coll);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAll(Class> clazz) {
+ super.selectAll(clazz);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAll(Class> clazz, String prefix) {
+ super.selectAll(clazz, prefix);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAs(SFunction column, String alias) {
+ super.selectAs(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAs(String column, SFunction alias) {
+ super.selectAs(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAs(SFunction column, SFunction alias) {
+ super.selectAs(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAs(String index, SFunction column, SFunction alias) {
+ super.selectAs(index, column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAsClass(Class source, Class> tag) {
+ super.selectAsClass(source, tag);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectSub(Class clazz, Consumer> consumer, SFunction alias) {
+ super.selectSub(clazz, consumer, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectSub(Class clazz, String st, Consumer> consumer, SFunction alias) {
+ super.selectSub(clazz, st, consumer, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectCount(SFunction column) {
+ super.selectCount(column);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectCount(Object column, String alias) {
+ super.selectCount(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectCount(Object column, SFunction alias) {
+ super.selectCount(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectCount(SFunction column, String alias) {
+ super.selectCount(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectCount(SFunction column, SFunction alias) {
+ super.selectCount(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectSum(SFunction column) {
+ super.selectSum(column);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectSum(SFunction column, String alias) {
+ super.selectSum(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectSum(SFunction column, SFunction alias) {
+ super.selectSum(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectMax(SFunction column) {
+ super.selectMax(column);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectMax(SFunction column, String alias) {
+ super.selectMax(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectMax(SFunction column, SFunction alias) {
+ super.selectMax(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectMin(SFunction column) {
+ super.selectMin(column);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectMin(SFunction column, String alias) {
+ super.selectMin(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectMin(SFunction column, SFunction alias) {
+ super.selectMin(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAvg(SFunction column) {
+ super.selectAvg(column);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAvg(SFunction column, String alias) {
+ super.selectAvg(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectAvg(SFunction column, SFunction alias) {
+ super.selectAvg(column, alias);
+ return this;
+ }
+
+ @Override
+ public MPJLambdaWrapperX selectLen(SFunction