zl程序教程

您现在的位置是:首页 >  后端

当前栏目

java多线程系列(九)—ArrayBlockingQueue源码分析详解编程语言

JAVA多线程源码编程语言 详解 分析 &# 系列
2023-06-13 09:20:44 时间
 /** items index for next take, poll, peek or remove */ 

 int takeIndex;


 /** items index for next put, offer, or add */ 

 int putIndex;


 /** Number of elements in the queue */ 

 int count;


 /* 

 * Concurrency control uses the classic two-condition algorithm 

 * found in any textbook. 

 */ 

 

 /** Main lock guarding all access */ 

 final ReentrantLock lock;


 /** Condition for waiting takes */ 

 private final Condition notEmpty;


 /** Condition for waiting puts */   private final Condition notFull;


 public ArrayBlockingQueue(int capacity, boolean fair) { 

 if (capacity  = 0) 

 throw new IllegalArgumentException(); 

 this.items = new Object[capacity]; 

 lock = new ReentrantLock(fair); 

 notEmpty = lock.newCondition(); 

 notFull = lock.newCondition(); 

 }


从源码可以看到,创建一个object数组,然后创建一个公平或非公平锁,然后创建出队条件和入队条件


 checkNotNull(e);   final ReentrantLock lock = this.lock;   lock.lock();   try {   if (count == items.length)   return false;   else {   enqueue(e);   return true;   }   } finally {   lock.unlock();   }   }


private void enqueue(E x) { 

 // assert lock.getHoldCount() == 1; 

 // assert items[putIndex] == null; 

 final Object[] items = this.items; 

 items[putIndex] = x; 

 if (++putIndex == items.length) 

 putIndex = 0; 

 count++; 

 notEmpty.signal(); 

 }


指定时间的offer方法
 public boolean offer(E e, long timeout, TimeUnit unit) 

 throws InterruptedException { 

 

 checkNotNull(e); 

 long nanos = unit.toNanos(timeout); 

 final ReentrantLock lock = this.lock; 

 lock.lockInterruptibly(); 

 try { 

 while (count == items.length) { 

 if (nanos  = 0) 

 return false; 

 nanos = notFull.awaitNanos(nanos); 

 } 

 enqueue(e); 

 return true; 

 } finally { 

 lock.unlock(); 

 } 

 }


方法大致和前面的一致,不同的时候是当队列满的时候,会等待一段时间,此时入队条件等待一段时间,一段时间后继续进入循环进行判断队列还满


public boolean add(E e) { 

 return super.add(e); 

 }


 else   throw new IllegalStateException( Queue full    }


执行offer方法,这个时候可以对比上面直接调用offer,offer方法如果入队失败会直接返回false,而add方法会抛出异常


put方法
public void put(E e) throws InterruptedException { 

 checkNotNull(e); 

 final ReentrantLock lock = this.lock; 

 lock.lockInterruptibly(); 

 try { 

 while (count == items.length) 

 notFull.await(); 

 enqueue(e); 

 } finally { 

 lock.unlock(); 

 } 

 }


public E poll() { 

 final ReentrantLock lock = this.lock; 

 lock.lock(); 

 try { 

 return (count == 0) ? null : dequeue(); 

 } finally { 

 lock.unlock(); 

 } 

 }


 private E dequeue() { 

 // assert lock.getHoldCount() == 1; 

 // assert items[takeIndex] != null; 

 final Object[] items = this.items; 

 @SuppressWarnings( unchecked ) 

 E x = (E) items[takeIndex]; 

 items[takeIndex] = null; 

 if (++takeIndex == items.length) 

 takeIndex = 0; 

 count--; 

 if (itrs != null) 

 itrs.elementDequeued(); 

 notFull.signal(); 

 return x; 

 }


指定时间的poll方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException { 

 long nanos = unit.toNanos(timeout); 

 final ReentrantLock lock = this.lock; 

 lock.lockInterruptibly(); 

 try { 

 while (count == 0) { 

 if (nanos  = 0) 

 return null; 

 nanos = notEmpty.awaitNanos(nanos); 

 } 

 return dequeue(); 

 } finally { 

 lock.unlock(); 

 } 

 }


与offer的指定时间和没有指定时间类似,poll指定时间的方法和没有指定时间的poll思路大致是一样的


当此时队列为空的,为等待一段时间,然后自动唤醒,继续进入循环,直到队列中有元素,然后执行dequeue方法


take方法
 public E take() throws InterruptedException { 

 final ReentrantLock lock = this.lock; 

 lock.lockInterruptibly(); 

 try { 

 while (count == 0) 

 notEmpty.await(); 

 return dequeue(); 

 } finally { 

 lock.unlock(); 

 } 

 }


offer(E e, long timeout, TimeUnit unit) 队列满的时候,等待一段时间,释放锁,一段时间后,进入就绪状态
poll(long timeout, TimeUnit unit) 队列为空的时候,等待一段时间,释放锁,一段时候后,进入就绪状态

总体的设计思路,通过一个数组来模拟一个数组,出队和入队都是同步的,也就是同一时间只能有一个入队或者出队操作,然后在入队的时候,如果队列已满的话,根据方法的不同有不同的策略,可以直接返回或者抛出异常,也可以阻塞一段时间,等会在尝试入队,或者直接阻塞,直到有人唤醒。而出队的时候,如果为空可以直接返回,也可以等待一段时间然后再次尝试,也可以阻塞,直到有人唤醒