Java并发系列:自定义线程池
主要组件:线程池、阻塞队列、生产者;
实现阻塞队列
class BlockingQueue<T> { // 1. 任务队列 private Deque<T> queue = new ArrayDeque<>(); // 2. 锁 private ReentrantLock lock = new ReentrantLock(); // 3. 生产者条件变量 private Condition fullWaitSet = lock.newCondition(); // 4. 消费者条件变量 private Condition emptyWaitSet = lock.newCondition(); // 5. 容量 private int capcity; public BlockingQueue(int capcity) { this.capcity = capcity; } //阻塞获取 public T take() { lock.lock(); try { while (queue.isEmpty()) { try { emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); fullWaitSet.signal(); //唤醒生产者 return t; } finally { lock.unlock(); } } //阻塞添加 public void put(T element) { lock.lock(); try { while (queue.size() == capcity) { try { log.debug("等待加入任务队列 {} ...", task); fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任务队列 {}", task); queue.addLast(task); emptyWaitSet.signal(); //唤醒消费者 } finally { lock.unlock(); } } //获取大小 public int size() { lock.lock(); try { return queue.size(); } finally { lock.unlock(); } } }
poll增强(超时阻塞) await方法是一直等待,在这里加一个等待时间,如果超过则不再等待。
-
awaitNanos()可解决虚假唤醒问题。
线程池实现
-
(task = taskQueue.poll(timeout,timeUnit) ) != null 表示重复从队列中获得任务;
-
这里的run方法中taskQueue.take()方法是死等的,没有设置超时。那么线程在没有任务时不会被销毁,会在这里不断take。这其实也是一种线程池的策略(线程池有很多种策略)。 可以换成非阻塞的put方法。
拒绝策略
参考资料
1、黑马JUC 2、《Java并发编程艺术》