当前位置: 首页> 财经> 股票 > 温州网页设计前端招聘_保定最大的网络公司_怎样建立自己网站_如何做营销策划方案

温州网页设计前端招聘_保定最大的网络公司_怎样建立自己网站_如何做营销策划方案

时间:2025/8/23 21:31:18来源:https://blog.csdn.net/laohuangaa/article/details/144931572 浏览次数:1次
温州网页设计前端招聘_保定最大的网络公司_怎样建立自己网站_如何做营销策划方案

文章目录

  • 1. 前言
  • 2. 介绍
  • 3. leader-follower 模式
  • 4. 参数
  • 5. 核心方法
    • 5.1 构造器
    • 5.2 生产者
    • 5.3 消费者
      • 5.3.1 peek
      • 5.3.2 poll
      • 5.3.3 poll(long timeout, TimeUnit unit)
      • 5.3.4 take()
    • 5.4 其他方法
  • 6. 小结


1. 前言

上一篇文章已经介绍了 LinkedBlockingQueue,这篇文章就来介绍下 DelayQueue。

系列文章:

  • 【阻塞队列】- 生产者和消费者模式
  • 【阻塞队列】- ArrayBlockingQueue 的原理
  • 【阻塞队列】- ArrayBlockingQueue 的原理-迭代器
  • 【阻塞队列】- LinkedBlockingQueue 的原理
  • 【阻塞队列前置知识】- Spliterator 接口
  • 【阻塞队列】- LinkedBlockingQueue - LBQSpliterator

2. 介绍

DelayQueue 是 Java 中 java.util.concurrent 包提供的一个阻塞队列,属于 Java 并发工具包的一部分。主要用于处理带有延迟时间的任务或事件,也就是说队列中的元素只有在其指定的延迟时间到期后才能被取出。


3. leader-follower 模式

DelayQueue 里面生产者和消费者的阻塞唤醒遵循 leader-follower 模式,这个模式的核心就是 leader 先负责阻塞等待节点到期,follower 节点会一直阻塞直到 leader 线程执行完成。

Leader-Follower 模式 是一种并发编程模式主要用于处理并发任务的调度。在 Leader-Follower 模式中,一组线程(也可以称为工作线程或处理单元)在等待任务时处于“等待”状态,一旦有任务到达,其中一个线程会被选为 “Leader” 来获取该任务并处理,而其他线程则继续保持“Follower”状态。任务处理完成后,原 Leader 线程可能会重新进入 Follower 状态,等待下一个任务的到来。
Leader-Follower 模式下两种线程的职责如下:

  1. Leader 线程: 当前正在等待队列头部任务的线程, 线程会等待下一个任务的延迟时间(即 delay)结束后获取到该任务返回并交出 leader 的身份
  2. Follower 线程: 除了 leader 线程之外的其他线程被称为 follower 线程,follower 线程会无限期地等待,直到它们被选中成为新的 leader 线程

4. 参数

下面来看下里面的几个参数。

/*** 锁*/
private final transient ReentrantLock lock = new ReentrantLock();
/*** 优先队列*/
private final PriorityQueue<E> q = new PriorityQueue<E>();private Thread leader = null;private final Condition available = lock.newCondition();

上面几个就是队列里面的标准属性了

  1. lock 是消费者和生产者锁
  2. q 是优先队列,因为 DelayQueue 里面需要根据超时时间排序,优先弹出最先过期的任务,所以需要用 PriorityQueue 来存储。
  3. leader 是上面的 leader-follower 模式中的 leader 线程
  4. available 是 lock 的条件等待队列,leader 线程和 follower 线程都会使用这个条件去阻塞唤醒

最后来说下,因为这个队列里面存储节点的 PriorityQueue 是没有上限的,所以生产者和消费者并不会因为添加获取节点阻塞。但是 DelayQueue 里面使用了 leader-follower 模式,所以消费者会因为不同的身份(leader 和 follower)而阻塞,但是生产者是不会的。


5. 核心方法

5.1 构造器

/*** 构造器*/
public DelayQueue() {}/*** 添加集合* @param c*/
public DelayQueue(Collection<? extends E> c) {this.addAll(c);
}

构造器里面是没有设置容量的,所以上面的 PriorityQueue 是没有容量上限的,用的时候要注意不能一直往里面放任务,避免内存溢出。


5.2 生产者

生产者一共有四个方法:add(E)、offer(E)、put(E)、offer(E, long, TimeUnit)。但是其实这几个方法都调用的一个方法,就是 offer(E)

/*** 添加方法* @param e* @return*/
public boolean add(E e) {return offer(e);
}/*** 添加元素* @param e*/
public void put(E e) {offer(e);
}/*** 添加元素* @param e 元素* @param timeout 过期时间,没用* @param unit 过期时间单位,没用* @return*/
public boolean offer(E e, long timeout, TimeUnit unit) {return offer(e);
}

我们就来看下这个 offer 方法。

/*** 添加一个元素(任务元素)* @param e* @return*/
public boolean offer(E e) {// 加锁final ReentrantLock lock = this.lock;lock.lock();try {// 加入优先队列q.offer(e);if (q.peek() == e) {// 元素添加到队列头部,头部队列变化了,说明队列里面有新节点了// 比如原来的队列队头元素超时时间是:5s// 现在新增一个 3s 超时时间的节点,这时候就需要唤醒阻塞等待的线程了,因为有可能其他 leader 线程或者普通线程等待时间是 5sleader = null;// 唤醒阻塞等待的线程(一个)available.signal();}return true;} finally {// 解锁lock.unlock();}
}

首先加锁,然后把节点加入优先队列,接着如果元素添加到队列头部,头部队列变化了,说明队列里面有新节点了,比如原来的队列队头元素超时时间是 5 s,现在新增一个 3s 超时时间的节点,这时候就需要唤醒阻塞等待的线程了,因为有可能其他 leader 线程或者普通线程等待时间是 5s,需要唤醒来重新等待

leader 线程是阻塞等待节点过期的线程,由于现在有一个早过期的节点加入了队列,所以 leader 线程也要重新选择了,那么在唤醒阻塞线程之前需要将 leader 置空。


5.3 消费者

5.3.1 peek

/*** 获取第一个元素* @return*/
public E peek() {final ReentrantLock lock = this.lock;lock.lock();try {// 获取队头元素return q.peek();} finally {lock.unlock();}
}

从队列头部获取一个元素。


5.3.2 poll

这个方法是从队头中取出一个元素,这里面因为取出的元素需要是过期的,所以需要先使用 peek 获取队首元素看看有没有过期,如果过期了再用 poll,否则就不管。

/*** 从队头取出一个元素* @return*/
public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {// 获取首个元素E first = q.peek();// 如果为空或者延迟时间还没到, 一般来说就是这个延时时间是截止时间 - 当前时间if (first == null || first.getDelay(NANOSECONDS) > 0)// 返回nullreturn null;else// 从优先队列中获取队头元素return q.poll();} finally {lock.unlock();}
}

5.3.3 poll(long timeout, TimeUnit unit)

这个方法是带超时的 poll,先来看下整体的逻辑。

/*** 有超时时间的 poll* @param timeout 超时时间* @param unit 时间单位* @return* @throws InterruptedException*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {// 过期时间long nanos = unit.toNanos(timeout);// 加锁final ReentrantLock lock = this.lock;// 可中断锁lock.lockInterruptibly();try {for (;;) {// 第一个元素E first = q.peek();if (first == null) {// 如果队列没有元素if (nanos <= 0)// 如果过期时间是负数,那么说明阻塞了 timeout 时间还是没能获取到节点return null;else// 阻塞时间是 nanos,这里唤醒有可能就是添加了节点提前被唤醒了,但是如果是打断就直接退出执行 finally 了nanos = available.awaitNanos(nanos);} else {// 队列里面有元素,获取第一个任务剩余时间long delay = first.getDelay(NANOSECONDS);// 如果第一个节点的过期时间到了if (delay <= 0)// 直接返回结果return q.poll();if (nanos <= 0)// 到这里就是阻塞了 nanos 时间但是还是没有获取到有节点过期return null;// 设置空值,减少内存损耗first = null;// 1.如果 nanos 小于 delay,就说明阻塞了 nanos 时间之后还是没能等到第一个节点过期// 2.如果 leader 不为空,也说明了当前线程是一个 follower 线程,直接阻塞 nanos 时间// 阻塞了这么久唤醒// 如果是第一种情况,那么 nanos <= 0 之后直接返回空了// 如果是第二种情况,那么就代表有 leader 线程了,当前线程是 follower 线程,所以继续阻塞if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);else {// leader 线程还没有设置,那么当前线程就成为 leader 线程去阻塞获取元素Thread thisThread = Thread.currentThread();leader = thisThread;try {// 然后让当前线程作为 leader 去等待 delay 时间long timeLeft = available.awaitNanos(delay);// 这是剩下的超时时间nanos -= delay - timeLeft;} finally {// 被唤醒之后释放 leader 线程// 不管是有没有阻塞到第一个节点的超时时间,这里都会去掉 leader 的身份然后继续循环if (leader == thisThread)leader = null;}}}}} finally {// 到这里 leader 线程就获取到元素退出了,然后如果队列里面还有元素if (leader == null && q.peek() != null)// 唤醒一个新的消费者去成为新的 leader 去消费available.signal();lock.unlock();}
}

然后我们来分析下里面的核心逻辑,加完锁之后在 for 循环里面会先获取第一个元素,如果第一个元素是空,就说明队列里面没有元素了,这时候直接阻塞 nanos 时间。

// 第一个元素
E first = q.peek();
if (first == null) {// 如果队列没有元素if (nanos <= 0)// 如果过期时间是负数,那么说明阻塞了 timeout 时间还是没能获取到节点return null;else// 阻塞时间是 nanos,这里唤醒有可能就是添加了节点提前被唤醒了,但是如果是打断就直接退出执行 finally 了nanos = available.awaitNanos(nanos);

那如果队列没有满,就获取下第一个任务的剩余时间,然后判断下第一个节点的过期时间有没有到了。如果到的话,直接返回结果,如果超时时间 <= 0,那么直接返回空。

// 队列里面有元素,获取第一个任务剩余时间
long delay = first.getDelay(NANOSECONDS);
// 如果第一个节点的过期时间到了
if (delay <= 0)// 直接返回结果return q.poll();
if (nanos <= 0)// 到这里就是阻塞了 nanos 时间但是还是没有获取到有节点过期return null;
// 设置空值,减少内存损耗
first = null;

下面接着看,如果 nanos 小于 delay,就说明阻塞了 nanos 时间之后还是没能等到第一个节点过期。如果 leader 不为空,也说明了当前线程是一个 follower 线程,直接阻塞 nanos 时间。

if (nanos < delay || leader != null)nanos = available.awaitNanos(nanos);

作为 follower 线程就需要一直阻塞,只是这个方法设置了一个超时时间,所以最多只能阻塞 nanos 的时间。但是如果不满足上面的 if,就说明当前还没有设置 leader 线程,同时第一个节点的过期时间 delay < 传入的超时时间 nanos,那说明了什么?leader 线程只需要阻塞 delay 就能获取到过期节点!!!

所以接下来会设置当前线程为 leader 线程,然后阻塞等待 delay 的超时时间,当 leader 被唤醒的时候,需要设置下还剩多少时间,也就是 nanos -= delay - timeLeft。最后在 finally 中设置 leader = null

else {// leader 线程还没有设置,那么当前线程就成为 leader 线程去阻塞获取元素Thread thisThread = Thread.currentThread();leader = thisThread;try {// 然后让当前线程作为 leader 去等待 delay 时间long timeLeft = available.awaitNanos(delay);// 这是剩下的超时时间nanos -= delay - timeLeft;} finally {// 被唤醒之后释放 leader 线程// 不管是有没有阻塞到第一个节点的超时时间,这里都会去掉 leader 的身份然后继续循环if (leader == thisThread)leader = null;}
}

当 leader 被唤醒之后会重新在 for 循环 里面重新开始循环,并且获取到第一个过期节点返回。最终当 leader 获取到节点之后,会唤醒一个等待的 follower 线程去成为新的 leader 去消费。

finally {// 到这里 leader 线程就获取到元素退出了,然后如果队列里面还有元素if (leader == null && q.peek() != null)// 唤醒一个新的消费者去成为新的 leader 去消费available.signal();lock.unlock();
}

所以这里面的核心逻辑就是 leader 线程去获取节点,follower 等待 leader 阻塞获取节点之后被唤醒再成为新的 leader。


5.3.4 take()

public E take() throws InterruptedException {// 加可中断锁final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {for (;;) {// 第一个元素E first = q.peek();if (first == null)// 获取不到,消费者阻塞等待available.await();else {// 能获取到,就判断下延迟时间和当前时间的比值,看看第一个元素有没有过期long delay = first.getDelay(NANOSECONDS);if (delay <= 0)// 如果过期了,就获取第一个元素return q.poll();// 没过期,先设置下 null,方便 GCfirst = null;// 当一个线程成为队列的领导者时,它会负责等待队列中的延时元素if (leader != null)// 当前线程不是 leader,那么无限等待available.await();else {// 没有线程成为 leader,那么当前线程就作为 leader 去阻塞获取第一个节点元素Thread thisThread = Thread.currentThread();leader = thisThread;try {// leader 线程去阻塞 delay 时间获取节点,follower 线程无限阻塞available.awaitNanos(delay);} finally {// 到这里就是阻塞时间够了,或者被打断了if (leader == thisThread)// leader 的作用就是阻塞等待,现在被唤醒了 leader 的作用就完成了// 如果被唤醒的时候是被打断的,那么下一次 for 循环还会继续阻塞直到有一个节点到期leader = null;}}}}} finally {// 获取到值之后,如果 leader == null && q.peek() != null// 就代表队列里面还有节点,这时候就可以唤醒下一个 follower 了,因为现在 leader = nullif (leader == null && q.peek() != null)// 唤醒其他等待线程(一个)available.signal();// 解锁lock.unlock();}
}

这个方法和上面的 poll(long timeout, TimeUnit unit) 的是一样的,逻辑都一样,不同的是 poll 最多阻塞 timeout 超时时间,但是 take 是无限阻塞的,这里就不多解释了。


5.4 其他方法

这个方法是返回第一个过期元素。

/*** 返回第一个元素,如果过期就返回,没过期就返回 null* @return*/
private E peekExpired() {// assert lock.isHeldByCurrentThread();E first = q.peek();return (first == null || first.getDelay(NANOSECONDS) > 0) ?null : first;
}

下面这个方法是清空队列,因为加锁了,所以可以直接使用 q.clear()

/*** 清除队列*/
public void clear() {final ReentrantLock lock = this.lock;lock.lock();try {q.clear();} finally {lock.unlock();}
}

下面还有两个删除的方法。

/*** 移除元素* @param o* @return*/
public boolean remove(Object o) {final ReentrantLock lock = this.lock;lock.lock();try {return q.remove(o);} finally {lock.unlock();}
}/*** 删除队列的所有元素* @param o*/
void removeEQ(Object o) {final ReentrantLock lock = this.lock;lock.lock();try {for (Iterator<E> it = q.iterator(); it.hasNext(); ) {if (o == it.next()) {it.remove();break;}}} finally {lock.unlock();}
}

6. 小结

到这里 DelayQueue 就解析完了,这个队列其实内部实现并不复杂,因为使用了现成的 PriorityQueue 优先队列,所以其他的逻辑并不是很多,但是里面关于 leader-follower 的模式使用还是值得学习的。





如有错误,欢迎指出!!!

关键字:温州网页设计前端招聘_保定最大的网络公司_怎样建立自己网站_如何做营销策划方案

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: