Spring boot 项目使用 redisson 延迟队列
背景:
有些场景下,需要延迟触发一些任务。比如:延迟几秒钟发送短信或者邮件;某些业务系统回调,需要延时几秒钟后回调。
当然,实现延时触发的方式有很多。博主这里采用 redisson 的 RDelayedQueue,一是因为接入简单,二是没有分布式的问题。毕竟现在微服务大行其道
接下来,进入正题。
使用步骤:
使用 延迟队列呢,一般就几个步骤:
- 添加消息进入延迟队列
- 从延迟队列中取出消息消费
下面贴一下代码:
1.首先定义一个消息的承载实体:
import lombok.Data; /** * @author hui * @description 使用 redisson 实现延迟触发 job 数据实体类 * @date 2020/11/13 */ @Data public class DelayJob { /** * 类名 */ private Class<?> clazz; /** * 方法名称 */ private String methodName; /** * 参数 */ private Object[] param; /** * 参数 class */ private Class<?>[] parameterTypes; }
2. 定义延迟任务的服务类
提供以下能力:1. 添加消息 2.单独线程监听消息 3.利用反射处理消息
import com.alibaba.fastjson.JSON; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.lang.reflect.Method; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * @author hui * @description 延迟job service * @date 2020/11/16 */ @Component @Slf4j public class DelayJobService { private static final String delayJobName = "delay_job"; @Autowired private RedissonClient redissonClient; @Autowired private ApplicationContext context; private ExecutorService executorService = Executors.newSingleThreadExecutor(); // /**系统可用的处理器核心数 * 2 */ // private ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2); @PostConstruct public void startJobListener() { ListenerTask task = new ListenerTask(redissonClient); executorService.submit(task); } /** * 添加延迟任务 * @param delayJob * @param delay 延迟时间 * @param timeUnit 时间单位 */ public void addDelayJob(DelayJob delayJob, long delay, TimeUnit timeUnit) { RBlockingQueue<DelayJob> blockingFairQueue = redissonClient.getBlockingQueue(delayJobName); RDelayedQueue<DelayJob> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(delayJob, delay, timeUnit); } /** * 内部类,ListenerTask,执行具体的队列消费 */ class ListenerTask implements Runnable { private RedissonClient redissonClient; private ListenerTask(RedissonClient redissonClient) { this.redissonClient = redissonClient; } @Override public void run() { RBlockingQueue<DelayJob> blockingFairQueue = redissonClient.getBlockingQueue(delayJobName); while (true) { try { DelayJob delayJob = blockingFairQueue.take(); //加载类 Class<?> delayJobClazz = delayJob.getClazz(); //获取方法 Method printMethod = delayJobClazz.getMethod(delayJob.getMethodName(), delayJob.getParameterTypes()); //调用 Object invoke = printMethod.invoke(context.getBean(delayJob.getClazz()), delayJob.getParam()); log.info("delayJob执行结果:{}", JSON.toJSONString(invoke)); } catch (Exception e) { log.warn(e.getMessage(), e); } } } } }
下一篇:
xxlJob 分布式定时任务