Java并发系列:自定义线程池

主要组件:线程池、阻塞队列、生产者;

实现阻塞队列

class BlockingQueue<T> {
 	// 1. 任务队列
 	private Deque<T> queue = new ArrayDeque<>();
 	// 2. 锁
 	private ReentrantLock lock = new ReentrantLock();
 	// 3. 生产者条件变量
 	private Condition fullWaitSet = lock.newCondition();
 	// 4. 消费者条件变量
 	private Condition emptyWaitSet = lock.newCondition();
 	// 5. 容量
 	private int capcity;
 	
 	public BlockingQueue(int capcity) {
 		this.capcity = capcity;
 	}
	
	//阻塞获取
	public T take() {
		lock.lock();
	 	try {
 			while (queue.isEmpty()) {
 				try {
 					emptyWaitSet.await();
 				} catch (InterruptedException e) {
 					e.printStackTrace();
 				}
			 }
 			T t = queue.removeFirst();
 			fullWaitSet.signal();     //唤醒生产者
 			return t;
 		} finally {
 			lock.unlock();
		}
	}

	//阻塞添加
	public void put(T element) {
		lock.lock();
 		try {
 			while (queue.size() == capcity) {
 				try {
 					log.debug("等待加入任务队列 {} ...", task);
 					fullWaitSet.await();
 				} catch (InterruptedException e) {
 						e.printStackTrace();
 				}
 			}
 			log.debug("加入任务队列 {}", task);
 			queue.addLast(task);
 			emptyWaitSet.signal(); 	//唤醒消费者
 		} finally {
 			lock.unlock();
 		}
	}

	//获取大小
	public int size() {
 		lock.lock();
 		try {
 			return queue.size();
 		} finally {
 			lock.unlock();
 		}
 	}
 	
 }

poll增强(超时阻塞) await方法是一直等待,在这里加一个等待时间,如果超过则不再等待。

    awaitNanos()可解决虚假唤醒问题。

线程池实现

    (task = taskQueue.poll(timeout,timeUnit) ) != null 表示重复从队列中获得任务;
    这里的run方法中taskQueue.take()方法是死等的,没有设置超时。那么线程在没有任务时不会被销毁,会在这里不断take。这其实也是一种线程池的策略(线程池有很多种策略)。 可以换成非阻塞的put方法。

拒绝策略

参考资料

1、黑马JUC 2、《Java并发编程艺术》

经验分享 程序员 微信小程序 职场和发展