5

CountDownLatch和CyclicBarrier:如何让多线程步调一致? - Love&Share

 2 years ago
source link: https://www.cnblogs.com/YXBLOGXYY/p/16079520.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

CountDownLatch和CyclicBarrier:如何让多线程步调一致?

案例:对账系统的业务是这样的,用户通过在线商城下单,会生成电子订单,保存在订单库;之后物流会生成派送单给用户发货,派送单保存在派送单库。为了防止漏派送或者重复派送,对账系统每天还会校验是否存在异常订单。对账系统的处理逻辑很简单,系统流程图如下。目前对账系统的处理逻辑是首先查询订单,然后查询派送单,之后对比订单和派送单,将差异写入差异库。

对上面的代码抽象就是这样的,就是在一个单线程里面循环查询订单、派送单,然后执行对账,最后将写入差异库。

 while(存在未对账订单){
   // 查询未对账订单
   pos = getPOrders();
   // 查询派送单
   dos = getDOrders();
   // 执行对账操作
   diff = check(pos, dos);
   // 差异写入差异库
   save(diff);
 }

1)上面的系统现在执行很慢,该怎样优化来执行速度呢?

  • 目前是单线程的,那单线程的话我们就考虑是否可以用多线程来做。查询未对账订单和查询派送单这两个操作是可以并行处理的。

2)实现查询对账订单和查询派送单并行执行的代码应该是怎样的?

 ​
 while(存在未对账订单){
   // 查询未对账订单
   Thread T1 = new Thread(()->{
     pos = getPOrders();
  });
   T1.start();
   // 查询派送单
   Thread T2 = new Thread(()->{
     dos = getDOrders();
  });
   T2.start();
   // 等待T1、T2结束
   T1.join();
   T2.join();
   // 执行对账操作
   diff = check(pos, dos);
   // 差异写入差异库
   save(diff);
 }
  • 我们在主线程中开了两个插队的线程,等这两个查询的插队线程执行完了,阻塞的主线程被唤醒,那么就可以执行对账还有写入差异库的操作了。

3)思考一下,我们上面的代码还有没有优化的空间呢?

  • 我们每次进行新的查询的对账的时候,都要创建两个新的线程出来,我们知道创建线程是比较好费时间的。那思考一下可不可以用线程池来减少创建线程的开销呢。

     ​
     // 创建2个线程的线程池
     Executor executor =
       Executors.newFixedThreadPool(2);
     while(存在未对账订单){
       // 查询未对账订单
       executor.execute(()-> {
         pos = getPOrders();
      });
       // 查询派送单
       executor.execute(()-> {
         dos = getDOrders();
      });

    /* ??如何实现等待??*/

    // 执行对账操作
       diff = check(pos, dos);
       // 差异写入差异库
       save(diff);
     }  

4)使用上面线程池的代码的话,我的join就不能调用了,那我的主线程就不知道什么时候两个查询操作执行完了,这个时候该怎么办?

  • 我们可以使用一个计数器,初始值呢设置为2,查询一次就减1,当两个查询执行完,那计数器就是0了,我们的主线程也就能被唤醒执行了。

5)上面的方案是我们自己想出来的,那java其实提供了一个非常方便的实现我们上面方案的工具类CountDownLatch,那使用CountDownLatch怎样优化我们的代码呢?

  • 主线程的话我们调用await方法来阻塞,两个查询线程我们执行countDown方法,会减1。主线程当检测到为0时就可以执行了

 ​
 // 创建2个线程的线程池
 Executor executor =
   Executors.newFixedThreadPool(2);
 while(存在未对账订单){
   // 计数器初始化为2
   CountDownLatch latch =
     new CountDownLatch(2);
   // 查询未对账订单
   executor.execute(()-> {
     pos = getPOrders();
     latch.countDown();
  });
   // 查询派送单
   executor.execute(()-> {
     dos = getDOrders();
     latch.countDown();
  });

// 等待两个查询操作结束
   latch.await();

// 执行对账操作
   diff = check(pos, dos);
   // 差异写入差异库
   save(diff);
 }

6)上面使用CountDownLatch和线程池的方案已经很不错了,在思考一下,我们的这个程序还能不能优化一下?

  • 我的对账操作和下一次查询操作其实是不影响的,那么他们之间是可以并发执行的。也就是我在进行本次对账的同时,是可以执行下一次的查询操作的。

7)对账需要查询出数据来才可以执行,这种的话对应什么模型?

  • 生产者-消费者模型

8)既然看出来了是生产者消费者模型,那就需要一个队列,生产者生产出来东西放到队列,消费者去队列当中取。但是针对我们上面的案例,一个队列的话肯会造成数据混乱,我们应该怎样设计?

  • 使用两个队列,两个队列间的元素还有对应关系。订单查询操作将订单查询结果插入订单队列,派送单查询操作将派送单插入派送单队列,这两个队列的元素之间是有一一对应的关系的。我们的对账操作每次从两个队列当中各取一个,这样数据肯定不会发生混乱。

9)我们如何用代码来实现查询和对账之间的并行呢?

  • 使用三个线程,一个线程 T1 执行订单的查询工作,一个线程 T2 执行派送单的查询工作,当线程 T1 和 T2 都各自生产完 1 条数据的时候,通知线程 T3 执行对账操作。这个想法虽看上去简单,但其实还隐藏着一个条件,那就是线程 T1 和线程 T2 的工作要步调一致,不能一个跑得太快,一个跑得太慢,只有这样才能做到各自生产完 1 条数据的时候,通知线程 T3。

10)我们上面的方案有哪些要解决的问题?怎样解决?

  • T1和T2要走的齐

  • 他们执行完之后要能通知到T3

  • 解决这两个问题的方案也很简单,还是搞一个计数器,初始化为2,T1执行完减1,T2执行完减1。当计数器值为0时,T3就可以执行了,T3执行的时候把我们计数器又重置为2,此时T1,T2又可以执行了。

11)实际项目中java其实给了我们现成的实现上面方案的工具类CyclicBarrier,代码实现的怎样的?

  • CyclicBarrier 的计数器有自动重置的功能,当减到 0 的时候,会自动重置你设置的初始值,所以他要带个循环

 ​
 // 订单队列
 Vector<P> pos;
 // 派送单队列
 Vector<D> dos;
 // 执行回调的线程池
 Executor executor =
   Executors.newFixedThreadPool(1);
 final CyclicBarrier barrier =
   new CyclicBarrier(2, ()->{
     executor.execute(()->check());
  });

void check(){
   P p = pos.remove(0);
   D d = dos.remove(0);
   // 执行对账操作
   diff = check(p, d);
   // 差异写入差异库
   save(diff);
 }

void checkAll(){
   // 循环查询订单库
   Thread T1 = new Thread(()->{
     while(存在未对账订单){
       // 查询订单库
       pos.add(getPOrders());
       // 等待
       barrier.await();
    }
  });
   T1.start();  
   // 循环查询运单库
   Thread T2 = new Thread(()->{
     while(存在未对账订单){
       // 查询运单库
       dos.add(getDOrders());
       // 等待
       barrier.await();
    }
  });
   T2.start();
 }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK