Redisson的DelayedQueue最佳实践
build.gradle
compile org.redisson:redisson:3.11.4
RedissonConfig.java
@Configuration public class RedissonConfig { @Value("${spring.redis.host}") private String host; @Value("${spring.redis.port}") private String port; @Value("${spring.redis.password}") private String password; /** * 单机模式 * @return */ @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = new Config(); SingleServerConfig singleServerConfig = config.useSingleServer(); singleServerConfig.setAddress("redis://" + host + ":" + port); if (StringUtils.isNotBlank(password)) { singleServerConfig.setPassword(password); } return Redisson.create(config); } }
RedissonService.java
@Service public class RedissonService { @Autowired private RedissonClient redissonClient; public void addDelay() { RBlockingQueue<CallDTO> blockingFairQueue = redissonClient.getBlockingQueue("delay_queue_call"); RDelayedQueue<CallDTO> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(new CallDTO(), 5, TimeUnit.SECONDS); // 不要调用下面的方法,否者会导致消费不及时 // delayedQueue.destroy(); } }
说明:delayedQueue.destroy()方法是取消客户端定时任务,调用之后不能主动获取到监听的数据,建议不要调用,让它跟redisson实例一起销毁(spring容器关闭的时候)
AppStartupRunner.java
/** * spring-boot项目启动完成运行 */ @Log4j2 @Component public class AppStartupRunner implements CommandLineRunner { @Autowired private RedissonClient redissonClient; @Override public void run(String... args) { new Thread(()->{ RBlockingQueue<CallDTO> blockingFairQueue = redissonClient.getBlockingQueue("delay_queue_call"); // 开启客户端监听(必须调用),否者系统重启时拿不到已过期数据,要等到系统第一次调用getDelayedQueue方法时才能开启监听 redissonClient.getDelayedQueue(blockingFairQueue); while (true){ CallDTO dto = null; try { dto = blockingFairQueue.take(); } catch (Exception e) { continue; } if (Objects.isNull(dto)) { continue; } System.out.println(String.format("receive1=======dto:%s", JSON.toJSONString(dto))); } }).start(); } }
注意:延时队列的过期机制是靠客户端的监听(定时任务+redis订阅)实现的,所以在消费数据的时候要先开启监听,监听的开启逻辑在redissonClient.getDelayedQueue()里面(会触发一次任务执行,并开启定时任务)
延时队列的具体实现最好查看源码,涉及三个队列和一个发布订阅通道 参考: 阻塞队列 List:KEY = queueName,执行 BLPOP 命令从左端弹出元素,右端插入元素。当一条数据到达过期时间的时候,会从redisson_delay_queue:{DelayMessage}中移除,加入到这个队列,客户端监听的就是这个队列,这个队列里面的全都是已经过期的数据。 有序集合 Sorted Set:KEY = redisson_delay_queue_timeout:{queueName},score 是元素的过期时间,按从小到大排序,过期时间小于当前时间表示已过期,删除集合中的元素,同时将普通集合 List中对应的元素删除,并将元素添加到阻塞队列 List等待客户端消费。 普通集合 List:KEY = redisson_delay_queue:{DelayMessage},按顺序从右端添加元素,元素过期会被删除。 发布/订阅通道:redisson_delay_queue_channel,往延时队列中放入一个数据时,会将延时时间publish出去,客户端收到之后会按这个时间延时之后再执行定时任务。
上一篇:
通过多线程提高代码的执行效率例子