From e1c34e912458abaae2cde27c85e0e2bb00b785a6 Mon Sep 17 00:00:00 2001 From: YunaiV Date: Wed, 1 Nov 2023 22:46:02 +0800 Subject: [PATCH] =?UTF-8?q?mq=EF=BC=9A=E4=BC=98=E5=8C=96=20redis=20stream?= =?UTF-8?q?=20=E7=9A=84=E5=91=BD=E5=90=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../config/YudaoRedisMQAutoConfiguration.java | 16 ++++++++-------- .../framework/mq/redis/core/RedisMQTemplate.java | 8 ++++---- .../core/job/RedisPendingMessageResendJob.java | 4 ++-- ...age.java => AbstractRedisChannelMessage.java} | 2 +- ... => AbstractRedisChannelMessageListener.java} | 4 ++-- ...sage.java => AbstractRedisStreamMessage.java} | 2 +- ...a => AbstractRedisStreamMessageListener.java} | 4 ++-- .../《芋道 Spring Boot 事件机制 Event 入门》.md | 1 + 8 files changed, 21 insertions(+), 20 deletions(-) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/{AbstractChannelMessage.java => AbstractRedisChannelMessage.java} (87%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/{AbstractChannelMessageListener.java => AbstractRedisChannelMessageListener.java} (94%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/{AbstractStreamMessage.java => AbstractRedisStreamMessage.java} (85%) rename yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/stream/{AbstractStreamMessageListener.java => AbstractRedisStreamMessageListener.java} (96%) create mode 100644 yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java index 7ecd879640..bbc63b719e 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java @@ -7,8 +7,8 @@ import cn.iocoder.yudao.framework.common.enums.DocumentEnum; import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; -import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessageListener; -import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RedissonClient; @@ -57,9 +57,9 @@ public class YudaoRedisMQAutoConfiguration { * 创建 Redis Pub/Sub 广播消费的容器 */ @Bean(initMethod = "start", destroyMethod = "stop") - @ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 + @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public RedisMessageListenerContainer redisMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { + RedisMQTemplate redisMQTemplate, List> listeners) { // 创建 RedisMessageListenerContainer 对象 RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 设置 RedisConnection 工厂。 @@ -78,8 +78,8 @@ public class YudaoRedisMQAutoConfiguration { * 创建 Redis Stream 重新消费的任务 */ @Bean - @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, + @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 + public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, @Value("${spring.application.name}") String groupName, RedissonClient redissonClient) { @@ -92,9 +92,9 @@ public class YudaoRedisMQAutoConfiguration { * 基础知识:Redis Stream 的 xreadgroup 命令 */ @Bean(initMethod = "start", destroyMethod = "stop") - @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 + @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public StreamMessageListenerContainer> redisStreamMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { + RedisMQTemplate redisMQTemplate, List> listeners) { RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate(); checkRedisVersion(redisTemplate); // 第一步,创建 StreamMessageListenerContainer 容器 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java index 92a0f772ef..5755ffa517 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java @@ -3,8 +3,8 @@ package cn.iocoder.yudao.framework.mq.redis.core; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; -import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessage; -import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessage; +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage; import lombok.AllArgsConstructor; import lombok.Getter; import org.springframework.data.redis.connection.stream.RecordId; @@ -35,7 +35,7 @@ public class RedisMQTemplate { * * @param message 消息 */ - public void send(T message) { + public void send(T message) { try { sendMessageBefore(message); // 发送消息 @@ -51,7 +51,7 @@ public class RedisMQTemplate { * @param message 消息 * @return 消息记录的编号对象 */ - public RecordId send(T message) { + public RecordId send(T message) { try { sendMessageBefore(message); // 发送消息 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java index 02ede126ee..b84f17c152 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java @@ -2,7 +2,7 @@ package cn.iocoder.yudao.framework.mq.redis.core.job; import cn.hutool.core.collection.CollUtil; import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; @@ -33,7 +33,7 @@ public class RedisPendingMessageResendJob { */ private static final int EXPIRE_TIME = 5 * 60; - private final List> listeners; + private final List> listeners; private final RedisMQTemplate redisTemplate; private final String groupName; private final RedissonClient redissonClient; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java similarity index 87% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java index ce03efcf39..d5ea5b9d59 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java @@ -8,7 +8,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; * * @author 芋道源码 */ -public abstract class AbstractChannelMessage extends AbstractRedisMessage { +public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage { /** * 获得 Redis Channel,默认使用类名 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java similarity index 94% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessageListener.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java index bae90a9b86..fd7c910c95 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractChannelMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java @@ -20,7 +20,7 @@ import java.util.List; * * @author 芋道源码 */ -public abstract class AbstractChannelMessageListener implements MessageListener { +public abstract class AbstractRedisChannelMessageListener implements MessageListener { /** * 消息类型 @@ -37,7 +37,7 @@ public abstract class AbstractChannelMessageListener +public abstract class AbstractRedisStreamMessageListener implements StreamListener> { /** @@ -48,7 +48,7 @@ public abstract class AbstractStreamMessageListener