异步化 Redis MQ 的初始化,提升启动速度
This commit is contained in:
parent
c5b547e0f3
commit
1819e0e1d6
|
@ -25,6 +25,7 @@ import org.springframework.data.redis.listener.ChannelTopic;
|
||||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||||
import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
|
import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
|
||||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||||
|
import org.springframework.scheduling.annotation.Async;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
@ -54,6 +55,7 @@ public class YudaoMQAutoConfiguration {
|
||||||
* 创建 Redis Pub/Sub 广播消费的容器
|
* 创建 Redis Pub/Sub 广播消费的容器
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
|
@Async // 异步化,可降低 2 秒左右的启动时间
|
||||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
public RedisMessageListenerContainer redisMessageListenerContainer(
|
||||||
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
|
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
|
||||||
// 创建 RedisMessageListenerContainer 对象
|
// 创建 RedisMessageListenerContainer 对象
|
||||||
|
@ -76,6 +78,7 @@ public class YudaoMQAutoConfiguration {
|
||||||
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
||||||
*/
|
*/
|
||||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||||
|
@Async // 异步化,可降低 5 秒左右的启动时间
|
||||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
||||||
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
||||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
||||||
|
@ -95,6 +98,8 @@ public class YudaoMQAutoConfiguration {
|
||||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||||
String consumerName = buildConsumerName();
|
String consumerName = buildConsumerName();
|
||||||
listeners.parallelStream().forEach(listener -> {
|
listeners.parallelStream().forEach(listener -> {
|
||||||
|
log.info("[redisStreamMessageListenerContainer][开始注册 StreamKey({}) 对应的监听器({})]",
|
||||||
|
listener.getStreamKey(), listener.getClass().getName());
|
||||||
// 创建 listener 对应的消费者分组
|
// 创建 listener 对应的消费者分组
|
||||||
try {
|
try {
|
||||||
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
|
||||||
|
@ -111,6 +116,8 @@ public class YudaoMQAutoConfiguration {
|
||||||
.autoAcknowledge(false) // 不自动 ack
|
.autoAcknowledge(false) // 不自动 ack
|
||||||
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
.cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
|
||||||
container.register(builder.build(), listener);
|
container.register(builder.build(), listener);
|
||||||
|
log.info("[redisStreamMessageListenerContainer][完成注册 StreamKey({}) 对应的监听器({})]",
|
||||||
|
listener.getStreamKey(), listener.getClass().getName());
|
||||||
});
|
});
|
||||||
return container;
|
return container;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ spring:
|
||||||
multi-statement-allow: true
|
multi-statement-allow: true
|
||||||
dynamic: # 多数据源配置
|
dynamic: # 多数据源配置
|
||||||
druid: # Druid 【连接池】相关的全局配置
|
druid: # Druid 【连接池】相关的全局配置
|
||||||
initial-size: 5 # 初始连接数
|
initial-size: 1 # 初始连接数
|
||||||
min-idle: 10 # 最小连接池数量
|
min-idle: 10 # 最小连接池数量
|
||||||
max-active: 20 # 最大连接池数量
|
max-active: 20 # 最大连接池数量
|
||||||
max-wait: 600000 # 配置获取连接等待超时的时间,单位:毫秒
|
max-wait: 600000 # 配置获取连接等待超时的时间,单位:毫秒
|
||||||
|
@ -171,6 +171,7 @@ logging:
|
||||||
cn.iocoder.yudao.module.member.dal.mysql: debug
|
cn.iocoder.yudao.module.member.dal.mysql: debug
|
||||||
cn.iocoder.yudao.module.trade.dal.mysql: debug
|
cn.iocoder.yudao.module.trade.dal.mysql: debug
|
||||||
cn.iocoder.yudao.module.promotion.dal.mysql: debug
|
cn.iocoder.yudao.module.promotion.dal.mysql: debug
|
||||||
|
debug: false
|
||||||
|
|
||||||
--- #################### 微信公众号、小程序相关配置 ####################
|
--- #################### 微信公众号、小程序相关配置 ####################
|
||||||
wx:
|
wx:
|
||||||
|
|
Loading…
Reference in New Issue