JAVA 多线程(生产者和消费者模式)

在生产-消费模式中:通常由两类线程,即若干个生产者的线程和若干个消费者的线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。

主要构成:

provider:生产者线程负责生产数据

consumer:消费者线程负责消费生产的数据

示例:

provider类:生产者

/** * 生产者 * @author Administrator * */ public class Provider implements Runnable{ //共享缓存区 private BlockingQueue<Data> queue; //标识线程是否运行的变量,volatile外部主线程可见性 private volatile boolean isRunning=true; //原子类 id生成器 private static AtomicInteger count=new AtomicInteger(0); //随机对象 private Random r=new Random(); public Provider(BlockingQueue<Data> queue) { this.queue=queue; } @Override public void run() { while(isRunning) { try { //随机休眠0-1000毫秒 表示获取数据(产生数据的耗时) Thread.sleep(r.nextInt(1000)); //获取id数据累计 int id=count.incrementAndGet(); Data data=new Data(); data.setId(id); data.setName("数据"+id); System.out.println(Thread.currentThread().getName()+"生产数据:"+data+",进行装载到缓冲区"); if(!this.queue.offer(data,2,TimeUnit.SECONDS)) { System.out.println("提交缓冲区数据失败。。。。"); } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop() { this.isRunning=false; } }

consumer类: 消费者

/** * 消费者 * @author Administrator * */ public class Consumer implements Runnable{ private BlockingQueue<Data> queue; private static Random r=new Random(); public Consumer(BlockingQueue<Data> queue) { this.queue=queue; } @Override public void run() { while (true) { try { //获取数据 Data data=this.queue.take(); //随机休眠0-1000毫秒 表示数据处理(消费耗时) Thread.sleep(r.nextInt(1000)); System.out.println(Thread.currentThread().getName()+"消费:"+data); } catch (InterruptedException e) { e.printStackTrace(); } } } }

Data类:数据封装类

public class Data { private int id; private String name; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "Data [id=" + id + ", name=" + name + "]"; } }

Main:测试类

public class Main { public static void main(String[] args) { //缓冲区容器 BlockingQueue<Data> queue=new LinkedBlockingQueue<>(10); //生产者 Provider p1=new Provider(queue); Provider p2=new Provider(queue); Provider p3=new Provider(queue); //消费者 Consumer c1=new Consumer(queue); Consumer c2=new Consumer(queue); Consumer c3=new Consumer(queue); //创建线程池运行,这是一个缓存线程池,可以创建无穷大的线程,没有任务的时候不创建线程,空闲线程存活时间为60s(默认) ExecutorService es=Executors.newCachedThreadPool(); es.execute(p1); es.execute(p2); es.execute(p3); es.execute(c1); es.execute(c2); es.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); p2.stop(); p3.stop(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }
经验分享 程序员 微信小程序 职场和发展