9

springboot~disruptor异步队列

 3 years ago
source link: https://www.cnblogs.com/lori/p/14853661.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Disruptor

Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。

Java内置队列的问题

介绍Disruptor之前,我们先来看一看常用的线程安全的内置队列有什么问题。Java的内置队列如下表所示。

队列的底层一般分成三种:数组、链表和堆。其中,堆一般情况下是为了实现带有优先级特性的队列,暂且不考虑。

从数组和链表两种数据结构来看,基于数组线程安全的队列,比较典型的是ArrayBlockingQueue,它主要通过加锁的方式来保证线程安全;基于链表的线程安全队列分成LinkedBlockingQueue和ConcurrentLinkedQueue两大类,前者也通过锁的方式来实现线程安全,而后者以及上面表格中的LinkedTransferQueue都是通过原子变量compare and swap(以下简称“CAS”)这种不加锁的方式来实现的。

但是对 volatile类型的变量进行 CAS 操作,存在伪共享问题,下面介绍一下

CPU的缓存系统是以缓存行(cache line)为单位存储的,一般的大小为64bytes。在多线程程序的执行过程中,存在着一种情况,多个需要频繁修改的变量存在同一个缓存行当中。

假设:有两个线程分别访问并修改X和Y这两个变量,X和Y恰好在同一个缓存行上,这两个线程分别在不同的CPU上执行。那么每个CPU分别更新好X和Y时将缓存行刷入内存时,发现有别的修改了各自缓存行内的数据,这时缓存行会失效,从L3中重新获取。这样的话,程序执行效率明显下降。为了减少这种情况的发生,其实就是避免X和Y在同一个缓存行中,可以主动添加一些无关变量将缓存行填充满,比如在X对象中添加一些变量,让它有64 Byte那么大,正好占满一个缓存行。

伪共享问题 的解决方案

简单的说,就是 以空间换时间: 使用占位字节,将变量的所在的 缓冲行 塞满。
disruptor 无锁框架就是这么干的。

Disruptor框架是如何解决伪共享问题的?

在Disruptor中有一个重要的类Sequence,该类包装了一个volatile修饰的long类型数据value,无论是Disruptor中的基于数组实现的缓冲区RingBuffer,还是生产者,消费者,都有各自独立的Sequence,RingBuffer缓冲区中,Sequence标示着写入进度,例如每次生产者要写入数据进缓冲区时,都要调用RingBuffer.next()来获得下一个可使用的相对位置。对于生产者和消费者来说,Sequence标示着它们的事件序号。

/**
 * 停车场问题.
 * 1) 事件对象Event
 * 2)三个消费者Handler
 * 3)一个生产者Processer
 * 4)执行Main方法
 */
public class DisruptorCar {
    private static final Integer NUM = 1; // 1,10,100,1000

    /**
     * 测试入口 ,
     * 一个生产者(汽车进入停车场);
     * 三个消费者(一个记录汽车信息,一个发送消息给系统,一个发送消息告知司机)
     * 前两个消费者同步执行,都有结果了再执行第三个消费者
     */
    @Test
     public  void main() throws InterruptedException {
        long beginTime = System.currentTimeMillis();
        int bufferSize = 2048; // 2的N次方
        try {
            // 创建线程池,负责处理Disruptor的四个消费者
            ExecutorService executor = Executors.newFixedThreadPool(4);

            // 初始化一个 Disruptor
            Disruptor<MyInParkingDataEvent> disruptor = new Disruptor<MyInParkingDataEvent>(new EventFactory<MyInParkingDataEvent>() {
                @Override
                public MyInParkingDataEvent newInstance() {
                    return new MyInParkingDataEvent(); // Event 初始化工厂
                }
            }, bufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());

            // 使用disruptor创建消费者组 MyParkingDataInDbHandler 和 MyParkingDataToKafkaHandler
            EventHandlerGroup<MyInParkingDataEvent> handlerGroup = disruptor.handleEventsWith(
                    new MyParkingDataInDbHandler(), new MyParkingDataToKafkaHandler());

            // 当上面两个消费者处理结束后在消耗 smsHandler
            MyParkingDataSmsHandler myParkingDataSmsHandler = new MyParkingDataSmsHandler();
            handlerGroup.then(myParkingDataSmsHandler);

            // 启动Disruptor
            disruptor.start();

            CountDownLatch countDownLatch = new CountDownLatch(1); // 一个生产者线程准备好了就可以通知主线程继续工作了
            // 生产者生成数据
            executor.submit(new MyInParkingDataEventPublisher(countDownLatch, disruptor));
            countDownLatch.await(); // 等待生产者结束

            disruptor.shutdown();
            executor.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("总耗时:" + (System.currentTimeMillis() - beginTime));
    }

    public class MyInParkingDataEvent {

        private String carLicense; // 车牌号

        public String getCarLicense() {
            return carLicense;
        }

        public void setCarLicense(String carLicense) {
            this.carLicense = carLicense;
        }

    }

    /**
     * Handler 第一个消费者,负责保存进场汽车的信息
     */
    public class MyParkingDataInDbHandler implements EventHandler<MyInParkingDataEvent>, WorkHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent) throws Exception {
            long threadId = Thread.currentThread().getId(); // 获取当前线程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
            System.out.println(String.format("Thread Id %s 保存 %s 到数据库中 ....", threadId, carLicense));
        }

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            this.onEvent(myInParkingDataEvent);
        }

    }

    /**
     * 第二个消费者,负责发送通知告知工作人员(Kafka是一种高吞吐量的分布式发布订阅消息系统)
     */
    public class MyParkingDataToKafkaHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 获取当前线程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
            System.out.println(String.format("Thread Id %s 发送 %s 进入停车场信息给 kafka系统...", threadId, carLicense));
        }

    }

    /**
     * 第三个消费者,sms短信服务,告知司机你已经进入停车场,计费开始。
     */
    public class MyParkingDataSmsHandler implements EventHandler<MyInParkingDataEvent> {

        @Override
        public void onEvent(MyInParkingDataEvent myInParkingDataEvent, long sequence, boolean endOfBatch)
                throws Exception {
            long threadId = Thread.currentThread().getId(); // 获取当前线程id
            String carLicense = myInParkingDataEvent.getCarLicense(); // 获取车牌号
            System.out.println(String.format("Thread Id %s 给  %s 的车主发送一条短信,并告知他计费开始了 ....", threadId, carLicense));
        }

    }

    /**
     * 生产者,进入停车场的车辆
     */
    public class MyInParkingDataEventPublisher implements Runnable {

        private CountDownLatch countDownLatch; // 用于监听初始化操作,等初始化执行完毕后,通知主线程继续工作
        private Disruptor<MyInParkingDataEvent> disruptor;

        public MyInParkingDataEventPublisher(CountDownLatch countDownLatch,
                                             Disruptor<MyInParkingDataEvent> disruptor) {
            this.countDownLatch = countDownLatch;
            this.disruptor = disruptor;
        }

        @Override
        public void run() {
            MyInParkingDataEventTranslator eventTranslator = new MyInParkingDataEventTranslator();
            try {
                for (int i = 0; i < NUM; i++) {
                    disruptor.publishEvent(eventTranslator);
                    Thread.sleep(1000); // 假设一秒钟进一辆车
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                countDownLatch.countDown(); // 执行完毕后通知 await()方法
                System.out.println(NUM + "辆车已经全部进入进入停车场!");
            }
        }

    }

    class MyInParkingDataEventTranslator implements EventTranslator<MyInParkingDataEvent> {

        @Override
        public void translateTo(MyInParkingDataEvent myInParkingDataEvent, long sequence) {
            this.generateData(myInParkingDataEvent);
        }

        private MyInParkingDataEvent generateData(MyInParkingDataEvent myInParkingDataEvent) {
            myInParkingDataEvent.setCarLicense("车牌号: 鄂A-" + (int) (Math.random() * 100000)); // 随机生成一个车牌号
            System.out.println("Thread Id " + Thread.currentThread().getId() + " 写完一个event");
            return myInParkingDataEvent;
        }

    }

}


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK