ArrayBlockingQueue 源码

📅 2026/7/2 1:42:23
ArrayBlockingQueue 源码
核心变量publicclassArrayBlockingQueueEextendsAbstractQueueEimplementsBlockingQueueE,java.io.Serializable{// 底层存储定长数组一旦初始化容量不可扩容finalObject[]items;// 出队索引take/poll 取元素位置inttakeIndex;// 入队索引put/offer 添加元素位置intputIndex;// 当前队列元素总数intcount;// 全局独占锁所有入队、出队操作共用一把锁核心特性单锁finalReentrantLocklock;// 非空条件队列有元素时唤醒消费者privatefinalConditionnotEmpty;// 非满条件队列有空位时唤醒生产者privatefinalConditionnotFull;}构造方法数组固定容量无扩容机制满了就阻塞 / 抛异常入队、出队、查询全部竞争同一把 ReentrantLock// 1. 基础构造指定容量默认非公平锁publicArrayBlockingQueue(intcapacity){this(capacity,false);}// 2. 完整构造容量 是否公平锁publicArrayBlockingQueue(intcapacity,booleanfair){if(capacity0)thrownewIllegalArgumentException();// 初始化定长数组容量固定itemsnewObject[capacity];// 创建可重入锁fairtrue公平false非公平默认locknewReentrantLock(fair);// 绑定两个条件变量共用同一把locknotEmptylock.newCondition();notFulllock.newCondition();}// 3. 带集合初始化的构造publicArrayBlockingQueue(intcapacity,booleanfair,Collection?extendsEc){this(capacity,fair);// 加锁批量导入集合元素finalReentrantLocklockthis.lock;lock.lock();try{inti0;for(Ee:c){checkNotNull(e);items[i]e;}counti;putIndex(icapacity)?0:i;}finally{lock.unlock();}}入队方法add调用 offer失败直接抛队列满异常不阻塞publicbooleanadd(Ee){// 调用父类AbstractQueue的addreturnsuper.add(e);}// AbstractQueue.addpublicbooleanadd(Ee){if(offer(e))returntrue;elsethrownewIllegalStateException(Queue full);}offer不超时加锁队列满直接返回 false不阻塞publicbooleanoffer(Ee){checkNotNull(e);// 不允许null元素finalReentrantLocklockthis.lock;lock.lock();try{// 判断队列未满才入队if(countitems.length)returnfalse;else{enqueue(e);returntrue;}}finally{lock.unlock();}}offer超时publicbooleanoffer(Ee,longtimeout,TimeUnitunit)throwsInterruptedException{checkNotNull(e);longnanosunit.toNanos(timeout);finalReentrantLocklockthis.lock;// 可中断加锁lock.lockInterruptibly();try{// 队列满循环等待超时while(countitems.length){if(nanos0)returnfalse;// 生产者等待notFull释放锁超时唤醒nanosnotFull.awaitNanos(nanos);}enqueue(e);returntrue;}finally{lock.unlock();}}putpublicvoidput(Ee)throwsInterruptedException{checkNotNull(e);finalReentrantLocklockthis.lock;lock.lockInterruptibly();try{// 队列满无限阻塞等待notFullwhile(countitems.length)notFull.await();enqueue(e);}finally{lock.unlock();}}enqueue环形数组putIndex 循环 0 ~ (length - 1)privatevoidenqueue(Ex){// putIndex位置放入元素finalObject[]itemsthis.items;items[putIndex]x;// 环形数组到末尾重置索引为0if(putIndexitems.length)putIndex0;count;// 放入元素后唤醒一个阻塞的消费者notEmpty.signal();}对比方法队列满时的动作返回值是否阻塞add(e)直接失败抛 IllegalStateException不阻塞offer(e)直接失败return false不阻塞put(e)阻塞等待空位无返回无限阻塞可中断notFull.awaitoffer(e,time,unit)限时阻塞超时返回 false限时阻塞notFull.awaitNanos