redis实现延时队列(附完整代码)
最近在复习所学过的队列的知识,像什么LinkedBlockingDeque。ArrayBlockingQueue,还有ribbitmq里的乱七八糟的,其本质我感觉啊这些技术就是一些队列,只不过大体上分为单机队列和分布式队列而已,当然本文的重点在于redis实现延时队列啊,可能有人会说,用ribbitmq这个专门的消息中间件实现延时队列不香么,给消息设置个ttl,失效了放入死信队列进行监听,不就行了吗。可是面试的时候或者有些公司就是不想用消息中间件,你难不成还去劝说老总吗?显然这不现实,我们要做的就是尽量能多学点就多学点吧,保不齐以后就有这么刻薄的需求要我们来实现呢。
思路
先介绍下redis的5大数据类型吧,string hash set sortset list,其中与本文有关的就是sortset,大白话来说就是sortset可以进行排序,可以进行排序就意味这是不是与队列搭边了。其实大体上的思路就是存数据时以时间戳为排序规则存入,取数据时:当前时间戳>存入数据的时间戳则消费。
生产者实现
实际开发中可以根据需求封装一个vo类作为消息存入redis,我这里直接简单写了把i作为value存入QUEUENAME集合中,设置scroe为(当前时间+i分钟)的时间戳
@Autowired RedisTemplate redisTemplate; String QUEUENAME = "zzh:queue"; SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Test public void pr() { for (int i = 1; i <= 1; i++) { Calendar now = Calendar.getInstance(); now.setTime(new Date()); now.set(Calendar.MINUTE, now.get(Calendar.MINUTE) + i); System.out.println("生产了:" + i + "当前时间为:" + simpleDateFormat.format(System.currentTimeMillis()) + "消费时间为:" + simpleDateFormat.format(now.getTime())); //往QUEUENAME这个集合中放入i,设置scorce为排序规则 redisTemplate.opsForZSet().add(QUEUENAME, i, now.getTime().getTime()); } }
消费者实现
拿出score范围在0-当前时间戳的每条数据,遍历比较是否时间戳<当前时间戳,如果小于表示没有到达可以消费的时间,直至>了才可以消费
/** * @param * @method 每间隔1秒执行一次 */ @Scheduled(cron = "*/1 * * * * * ") public void cs() { System.out.println("------------等待消费--------------"); //取出QUEUENAME集合中的score在0-当前时间戳这个范围的所有值 Set<Integer> set = redisTemplate.opsForZSet().rangeByScore(QUEUENAME, 0, System.currentTimeMillis()); Iterator<Integer> iterator = set.iterator(); while (iterator.hasNext()) { Integer value = iterator.next(); //遍历取出每一个score Double score = redisTemplate.opsForZSet().score(QUEUENAME, value); //达到了时间就进行消费 if (System.currentTimeMillis() > score) { System.out.println("消费了:" + value + "消费时间为:" + simpleDateFormat.format(System.currentTimeMillis())); redisTemplate.opsForZSet().remove(QUEUENAME, value); } } }
测试
可以看到只有达到时间才进行消费
完整代码链接
觉得不错的点个小星星吧
上一篇:
通过多线程提高代码的执行效率例子