Java并发系列:自定义线程池
主要组件:线程池、阻塞队列、生产者;
实现阻塞队列
class BlockingQueue<T> {
// 1. 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
//阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal(); //唤醒生产者
return t;
} finally {
lock.unlock();
}
}
//阻塞添加
public void put(T element) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
emptyWaitSet.signal(); //唤醒消费者
} finally {
lock.unlock();
}
}
//获取大小
public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}
poll增强(超时阻塞) await方法是一直等待,在这里加一个等待时间,如果超过则不再等待。
-
awaitNanos()可解决虚假唤醒问题。
线程池实现
-
(task = taskQueue.poll(timeout,timeUnit) ) != null 表示重复从队列中获得任务;
-
这里的run方法中taskQueue.take()方法是死等的,没有设置超时。那么线程在没有任务时不会被销毁,会在这里不断take。这其实也是一种线程池的策略(线程池有很多种策略)。 可以换成非阻塞的put方法。
拒绝策略
参考资料
1、黑马JUC 2、《Java并发编程艺术》
