5

java | 自定义阻塞队列 - 拒绝策略阻塞队列

 1 year ago
source link: https://benpaodewoniu.github.io/2023/01/01/java155/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

java | 自定义阻塞队列 - 拒绝策略阻塞队列

java | 自定义阻塞队列 - 超时阻塞 使用策略模式进行改进。

使用接口的方式,让用户自己传入执行方法。

增加 tryPut

// 阻塞队列
@Slf4j(topic = "c.BlockQuene")
class BlockQuene<T> {
// 任务队列
private Deque<T> queue = new ArrayDeque<>();
// 锁
private ReentrantLock lock = new ReentrantLock();
// 满条件变量
private Condition fullWaitSet = lock.newCondition();
// 不满条件变量
private Condition noFullWaitSet = lock.newCondition();
// 容量
private int capcity;

public BlockQuene(int capcity) {
this.capcity = capcity;
}


// 阻塞获取
public T take() {
lock.lock();
try {
while (queue.isEmpty()) {
log.debug("队列为空");
try {
noFullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
log.debug("唤醒队列 ~ 进行添加");
fullWaitSet.signalAll();
return t;
} finally {
lock.unlock();
}
}

// 阻塞添加
public void put(T task) {
lock.lock();
try {
while (queue.size() == capcity) {
try {
log.debug("队列已满,等待...");
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("添加任务 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
} finally {
lock.unlock();
}
}

// 带超时的阻塞获取
public T poll(long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.isEmpty()) {
try {
// 返回的是剩余时间
if (nanos <= 0) {
return null;
}
nanos = noFullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
fullWaitSet.signal();
return t;
} finally {
lock.unlock();
}
}

// 带超时时间的阻塞添加
// 防止队列满了,一直死等
public boolean offer(T task, long timeout, TimeUnit unit) {
lock.lock();
try {
// 将 timeout 统一转化为纳秒
long nanos = unit.toNanos(timeout);
while (queue.size() == capcity) {
try {
log.debug("等待加入任务队列 {} ...", task);
if (nanos <= 0) {
log.debug("任务超时 {}", task);
return false;
}
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任务队列 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
return true;
} finally {
lock.unlock();
}
}

public int size() {
lock.lock();
try {
return queue.size();
} finally {
lock.unlock();
}
}

public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try {
if (queue.size() == capcity) {
log.debug("队列已满,直接执行");
rejectPolicy.reject(this, task);
} else {
log.debug("加入队列 {}", task);
queue.addLast(task);
noFullWaitSet.signalAll();
}
} finally {
lock.unlock();
}
}
}

增加函数接口

// 拒绝策略
@FunctionalInterface
interface RejectPolicy<T> {
void reject(BlockQuene<T> quene, T task);
}

用户编写策略

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(2, 500, TimeUnit.MILLISECONDS, 2, ((quene, task) -> {
// 死等
// quene.put(task);
// 带超时的等待
// quene.offer(task, 500, TimeUnit.MILLISECONDS);
// 让调用者放弃任务执行
// log.debug("放弃");
// 调用者抛出异常
// throw new RuntimeException("任务失败" + task);
// 让调用者自己执行任务
task.run();
}));
for (int i = 0; i < 10; i++) {
int j = i;
threadPool.execute(() -> {
try {
if (j > 3) {
Thread.sleep(1000);
} else {
Thread.sleep(10000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("{}", j);
});
}
}
}

用户可以自己编写策略,如上面

// 死等
// quene.put(task);
// 带超时的等待
// quene.offer(task, 500, TimeUnit.MILLISECONDS);
// 让调用者放弃任务执行
// log.debug("放弃");
// 调用者抛出异常
// throw new RuntimeException("任务失败" + task);
// 让调用者自己执行任务
task.run();

下载 JAVA 文件

22:13:55.407 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-0,5,main] com.redisc.Run$$Lambda$2/858242339@3108bc
22:13:55.415 [main] DEBUG c.ThreadPool - 新增 worker Thread[Thread-1,5,main] com.redisc.Run$$Lambda$2/858242339@7d907bac
22:13:55.416 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@3108bc
22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
22:13:55.416 [main] DEBUG c.BlockQuene - 加入队列 com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
22:13:55.416 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@7d907bac
22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:13:55.416 [main] DEBUG c.BlockQuene - 加入队列 com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:13:55.416 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@1d16f93d
22:13:55.416 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:56.417 [main] DEBUG c.Test - 4
22:13:56.417 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@67b92f0a
22:13:56.417 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:57.422 [main] DEBUG c.Test - 5
22:13:57.422 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@2b9627bc
22:13:57.422 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:58.427 [main] DEBUG c.Test - 6
22:13:58.427 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@65e2dbf3
22:13:58.427 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:13:59.428 [main] DEBUG c.Test - 7
22:13:59.428 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@4f970963
22:13:59.428 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:14:00.433 [main] DEBUG c.Test - 8
22:14:00.433 [main] DEBUG c.ThreadPool - 加入任务队列 com.redisc.Run$$Lambda$2/858242339@61f8bee4
22:14:00.434 [main] DEBUG c.BlockQuene - 队列已满,直接执行
22:14:01.439 [main] DEBUG c.Test - 9
22:14:05.421 [Thread-0] DEBUG c.Test - 0
22:14:05.421 [Thread-1] DEBUG c.Test - 1
22:14:05.421 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@3108bc
22:14:05.421 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@7d907bac
22:14:05.421 [Thread-1] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@3a5ed7a6
22:14:05.422 [Thread-0] DEBUG c.ThreadPool - 正在执行... com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:14:15.422 [Thread-0] DEBUG c.Test - 3
22:14:15.422 [Thread-1] DEBUG c.Test - 2
22:14:15.422 [Thread-0] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@6325a3ee
22:14:15.422 [Thread-1] DEBUG c.ThreadPool - 任务设置为空,com.redisc.Run$$Lambda$2/858242339@3a5ed7a6

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK