java 利用jdk延迟队列DelayQueue实现定时到期执行业务
延迟队列是指将未来某一特定时间到期的任务添加到该队列后,在任务到期时间可以重新获得该任务,用于执行相关的业务处理。
package com.demo; import java.text.SimpleDateFormat; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class DelayQueneTest { static DelayQueue<DelayTaskItem> queue = null; public static void main(String[] args) throws InterruptedException { queue = new DelayQueue<>(); System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)); ProduceThread produceThread=new ProduceThread(); produceThread.start(); ConsumeThread consumeThread=new ConsumeThread(); consumeThread.start(); } // 添加任务线程 public static class ProduceThread extends Thread { public ProduceThread() { } public void run() { DelayTaskItem item1 = new DelayTaskItem("item1", 5, TimeUnit.SECONDS); DelayTaskItem item2 = new DelayTaskItem("item2",10, TimeUnit.SECONDS); DelayTaskItem item3 = new DelayTaskItem("item3",15, TimeUnit.SECONDS); queue.put(item3); queue.put(item2); queue.put(item1); while (true) { DelayTaskItem item = new DelayTaskItem("item1", 5, TimeUnit.SECONDS); queue.put(item); try { Thread.sleep(3000l); } catch (InterruptedException e) { e.printStackTrace(); } } } } // 读取到期任务线程 public static class ConsumeThread extends Thread { public ConsumeThread() { } public void run() { while (true) { DelayTaskItem item = null; SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { item = queue.take(); System.out.format("name:{%s}, delayed time:{%s},system time:{%s} ", item.name, sdf.format(item.getTime()), LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)); } catch (InterruptedException e) { e.printStackTrace(); } } } } // 延迟任务需要实现Delayed接口的getDelay()和compareTo() static class DelayTaskItem implements Delayed { /* 触发时间*/ private long time; String name; public DelayTaskItem(String name, long time, TimeUnit unit) { this.name = name; this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0); } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayTaskItem item = (DelayTaskItem) o; long diff = this.time - item.time; if (diff <= 0) {// 改成>=会造成问题 return -1; }else { return 1; } } public long getTime() { return time; } public void setTime(long time) { this.time = time; } @Override public String toString() { return "Item{" + "time=" + time + ", name=" + name + + }; } } }
上述示例代码,创建一个延迟队列,在一个线程中生成延迟任务,并添加到延迟队列中,在另外一个线程,从队列中读取到期的任务。take()方式读取是阻塞方式读取,只有读取到到期的任务,才会返回。
此外,还可以用poll()读取,有到期的任务,返回到期的任务,如果没有到期的任务返回空值,此时可以由程序主动进行逻辑处理,具有一定的灵活性。
采用DelayQueue实现的延迟队列,只限于单节点,在集群环境中,各个节点之间的延迟任务无法共享。
下一篇:
js之遍历数组、字符串