阻塞Map BlockingMap的实现
做socket应用用到了BlockingQueue接口,可用于生产者消费者模式,多个线程阻塞着等待queue的数据到来,但是如果是该线程需要等待某个特定的数据该如何处理呢,自己写了个BlockingMap
public interface BlockingMap<V> { public void put(Integer key, V o) throws InterruptedException; public V take(Integer key) throws InterruptedException; public V poll(Integer key, long timeout) throws InterruptedException; } public class HashBlockingMap<V> implements BlockingMap<V> { private ConcurrentMap<Integer, Item<V>> map; private final ReentrantLock lock = new ReentrantLock(); public HashBlockingMap() { map = new ConcurrentHashMap<Integer, Item<V>>(); } public void put(Integer key, V o) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { if (map.containsKey(key)) { Item<V> item = map.get(key); item.put(o); } else { Item<V> item = new Item<V>(); map.put(key, item); item.put(o); } } finally { lock.unlock(); } } public V take(Integer key) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { if (!map.containsKey(key)) { map.put(key, new Item<V>()); } } finally { lock.unlock(); } Item<V> item = map.get(key); V x = item.take(); map.remove(key); return x; } public V poll(Integer key, long timeout) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { if (!map.containsKey(key)) { map.put(key, new Item<V>()); } } finally { lock.unlock(); } Item<V> item = map.get(key); V x = item.poll(timeout); map.remove(key); return x; } private static class Item<E> { private final ReentrantLock lock = new ReentrantLock(); private final Condition cond = lock.newCondition(); private E obj = null; private void put(E o) throws InterruptedException { if (o == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { obj = o; cond.signal(); } finally { lock.unlock(); } } E take() throws InterruptedException { E x; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (obj == null) { cond.await(); } } catch (InterruptedException ie) { cond.signal(); throw ie; } x = obj; } finally { lock.unlock(); } return x; } private E poll(long timeout) throws InterruptedException { timeout = TimeUnit.MILLISECONDS.toNanos(timeout); E x; final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { if (obj != null) { x = obj; break; } if (timeout <= 0) { return null; } try { timeout = cond.awaitNanos(timeout); } catch (InterruptedException ie) { cond.signal(); throw ie; } } } finally { lock.unlock(); } return x; } } } // 消费者根据sequence取得自己想要的对象 Response response = blockingMap.poll(sequence, timeout); // 生产者 blockingMap.put(response.getSequence(), response);