一个简单的生产者消费者案例
生产者消费者模式
为什么要引入生产者消费者模式
- 简单来说就是为了解决多线程下程序先后执行问题 遇到的例:实际生产中出现的场景
服务D依赖于服务A、B、C产生的数据,服务D必须等待A、B、C产生结果并持久化到数据库中才能进行下一步操作,否则等待。
解决问题:
- 首先注入Blocking-queue。
package com.jiubo.cmd.config.queue; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * 注入BlockingQueue bean * * @author :Zzz * @date :Created in 2022/3/2 15:47 * @version: [1.0.0] */ @Component @Slf4j public class SipResponseQueue { private static final BlockingQueue<String> DEVICE_STATUS_QUEUE = CollUtil.newBlockingQueue(300, true); public void produceDeviceStatus(String responseInfo) { try { DEVICE_STATUS_QUEUE.put(responseInfo); } catch (InterruptedException e) { log.error("堆栈put错误: {}", e.getMessage()); e.printStackTrace(); } } public String consumeDeviceStatus() { String statusResponse = StrUtil.EMPTY; try { statusResponse = DEVICE_STATUS_QUEUE.poll(5, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("堆栈poll错误: {}", e.getMessage()); e.printStackTrace(); } return statusResponse; } }
- 一个比较简单的使用案例。
@Resource private SipResponseQueue sipResponseQueue; @Resource(name = "threadPoolTaskExecutor") private ThreadPoolTaskExecutor executor; @GetMapping("/test") public void test() { executor.execute(this::test2); while (true) { String rep = sipResponseQueue.consumeDeviceStatus(); if (StrUtil.isNotBlank(rep) && rep.length() > 1) { System.out.println("test"); return; } } } private void test2() { ThreadUtil.sleep(10, TimeUnit.SECONDS); sipResponseQueue.produceDeviceStatus("test"); }