3

Disruptor-简单使用 - 王谷雨

 1 year ago
source link: https://www.cnblogs.com/konghuanxi/p/17303118.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Disruptor-简单使用

Disruptor是一个高性能的无锁并发框架,其主要应用场景是在高并发、低延迟的系统中,如金融领域的交易系统,游戏服务器等。其优点就是非常快,号称能支撑每秒600万订单。需要注意的是,Disruptor是单机框架,对标JDK中的Queue,而非可用于分布式系统的MQ

本文基于Disruptor v3.4.*版本

既然是简单使用,这阶段只需要关注:

  • 消费者:EventHandler
  • 消息的传递:消息的载体Event

首先,我们定义消息的载体Event,生产者向消费者传递的消息通过Event承载

class LongEvent { private long value; public void set(long value) { this.value = value; } @Override public String toString() { return "LongEvent{" + "value=" + value + '}'; }}

然后定义Event生产工厂,这用于初始化Event

EventFactory<LongEvent> factory = new EventFactory<LongEvent>() { @Override public LongEvent newInstance() { return new LongEvent(); }};

接下来就可以构建Disruptor了,以下是完整代码

// 消息载体(event)static class LongEvent { private long value; public void set(long value) { this.value = value; } @Override public String toString() { return "LongEvent{" + "value=" + value + '}'; }} // 发布消息的转换器public static void translate(LongEvent event, long sequence, ByteBuffer buffer){ event.set(buffer.getLong(0));} public static void main(String[] args) throws Exception { // event生产工厂,初始化RingBuffer的时候使用 EventFactory<LongEvent> factory = new EventFactory<LongEvent>() { @Override public LongEvent newInstance() { return new LongEvent(); } }; // 指定RingBuffer的大小(必须是2的n次方) int bufferSize = 1024; // 构造Disruptor(默认使用多生产者模式、BlockingWaitStrategy阻塞策略) Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE); // Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy()); // 设置消费者 EventHandler<LongEvent> handler = (event, sequence, endOfBatch) -> { System.out.println("Event: " + event); }; disruptor.handleEventsWith(handler); // 启动disruptor,启动所有需要运行的线程 disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); ByteBuffer bb = ByteBuffer.allocate(8); for (long i = 0; i < 100; i++) { bb.putLong(i); // 发布事件 ringBuffer.publishEvent(LongEventMain::translate, bb); }}

消费者组合(多使用场景)

Disruptor不仅可以当高性能的队列使用,还支持消费者的串行、并行消费等

以下只展示关键代码(设置消费者),其余部分参考上一节的简单demo

  1. disruptor.handleEventsWith(handlerA).then(handlerB);
  2. disruptor.handleEventsWith(handlerA, handlerB);
  3. 链内串行,多链并行

    disruptor.handleEventsWith(handlerA).then(handlerC);disruptor.handleEventsWith(handlerB).then(handlerD);
  4. 菱形(C、D都执行完才到E)

    disruptor.handleEventsWith(handlerA).then(handlerC);disruptor.handleEventsWith(handlerB).then(handlerD);disruptor.after(handlerC, handlerD).then(handlerE);
  5. 分组(AB都执行完才到CD)

    disruptor.handleEventsWith(handlerA, handlerB).then(handlerC, handlerD);
  6. 分组不重复消费

    组内竞争,组外串行:每个消息在每个分组中只有一个消费者能消费成功,如果就是分组A中只有HandlerA2能得到数据,分组B中只有HandlerB1获得

    // 注意:此处的handler实现的是WorkHandler接口disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3) .then(handlerB1, handlerB2, handlerB3);
  7. 分组不重复消费(菱形)

    // handlerA、handlerB实现WorkHandler接口// handlerC 实现EventHandler或WorkHandler接口均可disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3) .then(handlerB1, handlerB2, handlerB3) .then(handlerC);

    消费者速度比生产者快时,需要等待。因此就有了不同的等待策略以适应不同场景

    • BlockingWaitStrategy

      默认策略。使用锁和 Condition 的等待、唤醒机制。速度慢,但节省CPU资源并且在不同部署环境中能提供更加一致的性能表现。

    • YieldingWaitStrategy

      二段式,一阶段自旋100次,二阶段执行Thread.yield,需要低延迟的场景可使用此策略

    • SleepingWaitStrategy

      三段式,一阶段自旋,二阶段执行Thread.yield,三阶段睡眠

    • BusySpinWaitStrategy

      性能最高的策略,与 YieldingWaitStrategy 一样在低延迟场景使用,但是此策略要求消费者数量低于 CPU 逻辑内核总数

    其他小技巧

    1. 清除消息载体 Event 中的数据

      如果 Event 中存在大对象,应该在消费者链的末尾,添加一个清除数据的消费者,以帮助jvm垃圾回收。demo中的 LongEvent 是 private long value; 所以没必要添加。

本文介绍了 Disruptor 的简单使用,以及复杂场景下消费者的配置。下篇开坑 Disruptor 源码解析。


参考资料

Disruptor官方文档


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK