6

java | 自定义阻塞队列 - 超时阻塞

 1 year ago
source link: https://benpaodewoniu.github.io/2023/01/01/java154/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

java | 线程池 自定义阻塞队列 - 基本阻塞队列 完成了基本的阻塞队列,但是,其还有不足。

这一章节,将加上超时逻辑。

超时逻辑,分别是

主要针对的是阻塞队列。

超时阻塞队列

// 阻塞队列
@Slf4j(topic = "c.BlockQuene")
class BlockQuene<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满条件变量
private Condition fullWaitSet = lock.newCondition();
// 不满条件变量
private Condition noFullWaitSet = lock.newCondition();
// 容量
private int capcity;

public BlockQuene(int capcity) {
this.capcity = capcity;
}


// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
log.debug("队列为空");
try {
noFullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.debug("唤醒队列 ~ 进行添加");
fullWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}

// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("队列已满,等待...");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任务 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
} finally {
lock.unlock();
}
}

// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回的是剩余时间
if (nanos <= 0) {
return null;
}
nanos = noFullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

// 带超时时间的阻塞添加
// 防止队列满了,一直死等
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
if (nanos <= 0) {
log.debug("任务超时 {}", task);
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}

public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}
}

里面主要增加了逻辑

  • poll(long timeout, TimeUnit unit)
  • offer(T task, long timeout, TimeUnit unit)

下载 JAVA 文件

日志输出如下

14:40:12.691 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$1/836514715@2286778
14:40:12.702 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:12.702 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@2286778
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:12.702 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:12.702 [main] DEBUG c.BlockQuene - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:12.702 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152
14:40:12.702 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ...
14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1f28c152 ...
14:40:13.208 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1f28c152
14:40:13.208 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895
14:40:13.208 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ...
14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@7791a895 ...
14:40:13.709 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@7791a895
14:40:13.709 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
14:40:13.709 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ...
14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6 ...
14:40:14.213 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@3a5ed7a6
14:40:14.213 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee
14:40:14.213 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ...
14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@6325a3ee ...
14:40:14.719 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@6325a3ee
14:40:14.719 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d
14:40:14.719 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ...
14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@1d16f93d ...
14:40:15.221 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@1d16f93d
14:40:15.221 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a
14:40:15.221 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ...
14:40:15.723 [main] DEBUG c.BlockQuene - 等待加入任务队列 com.redisc.Run$$Lambda$1/836514715@67b92f0a ...
14:40:15.723 [main] DEBUG c.BlockQuene - 任务超时 com.redisc.Run$$Lambda$1/836514715@67b92f0a
14:40:22.702 [Thread-0] DEBUG c.Test - 0
14:40:22.702 [Thread-1] DEBUG c.Test - 1
14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@5a8e6209
14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@2286778
14:40:22.703 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@731a74c
14:40:22.703 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:32.706 [Thread-1] DEBUG c.Test - 2
14:40:32.706 [Thread-0] DEBUG c.Test - 3
14:40:32.706 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@369f73a2
14:40:32.706 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$1/836514715@731a74c

可以发现很明显的发现就执行了 4 个任务。

虽然增加了超时逻辑,但是也丧失了灵活性。

比如,有的任务比较不重要,我希望超时放弃,有的任务很重要,我希望一直等待,显然,这套框架不满足灵活性。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK