Redis实现异步消息队列,延时队列

异步消息队列

Redis中的 list(列表)实现异步消息队列,使用rpush / lpush 操作插入队列消息,使用 lpop 和 rpop 来出队消息。

队列空了怎么办?

如果队列空了,客户端就会陷入pop的死循环,不停地pop,没有数据,接着pop,有没有数据。这样的空轮询拉高了客户端的CPU,redis的QPS也会被拉高,Redis的慢查询可能会显著增多。

解决方案:使用命令 blpop、brpop,b(blocking,阻塞)。

阻塞读在队列没有数据的时候,会立即进入休眠状态,一旦数据到来,则立即醒来,消费的延迟几乎为零。用这两个命令代替 lpop、rpop可以解决上面的问题。

延迟队列的实现

延迟队列可以使用 zset(有序列表)实现,我们将消息序列化成一个字符串作为列表的value,这个消息的到期处理时间作为score,然后多个线程轮询zset 获取到期的任务进行执行,多线程保证了可靠性,因为多个线程,需要考虑并发执行的问题,一个任务不能被多次执行。

代码如下:

package list;

import java.lang.reflect.Type;

import java.util.Set;

import java.util.UUID;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

/**

* 延时异步消息队列的实现

*/

public class RedisDelayingQueue<T> {

static class TaskItem<T> {

public String id;

public T msg;

}

// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference

private Type TaskType = new TypeReference<TaskItem<T>>() {

}.getType();

private Jedis jedis;

private String queueKey;

public RedisDelayingQueue(Jedis jedis, String queueKey) {

this.jedis = jedis;

this.queueKey = queueKey;

}

public void delay(T msg) {

TaskItem<T> task = new TaskItem<T>();

task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid

task.msg = msg;

String s = JSON.toJSONString(task); // fastjson 序列化

jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试

}

public void loop() {

while (!Thread.interrupted()) {

// 只取一条

Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);

if (values.isEmpty()) {

try {

Thread.sleep(500); // 歇会继续

} catch (InterruptedException e) {

break;

}

continue;

}

String s = values.iterator().next();

if (jedis.zrem(queueKey, s) > 0) { // 抢到了

TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化

this.handleMsg(task.msg);

}

}

}

public void handleMsg(T msg) {

System.out.println(msg);

}

public static void main(String[] args) {

Jedis jedis = new Jedis();

final RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");

Thread producer = new Thread() {

public void run() {

for (int i = 0; i < 10; i++) {

queue.delay("codehole" + i);

}

}

};

Thread consumer = new Thread() {

public void run() {

queue.loop();

}

};

producer.start();

consumer.start();

try {

producer.join();

Thread.sleep(6000);

consumer.interrupt();

consumer.join();

} catch (InterruptedException e) {

}

}

}

优化代码如下:待续

经验分享 程序员 微信小程序 职场和发展