5

一文学习JUC

 2 years ago
source link: https://haojunsheng.github.io/2022/03/juc/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

什么是并发

CPU,内存和IO设备三者之间的速度是不匹配的,更好的利用CPU资源。

为什么要并发

CPU,内存和IO设备三者之间的速度是不匹配的,为了解决这个问题,做了下面的优化:

  • CPU增加了缓存,以均衡与内存的速度差异
  • 操作系统增加了进程、线程,以分时复用CPU,进而均衡CPU与I/O设备的速度差异;
  • 编译程序优化指令执行次序,使得缓存能够得到更加合理地利用。
    • 重排序包括编译器重排序和处理器重排序。对于后者,需要插入内存屏障才可以进行解决。

最终期望可以提高程序的性能,包括响应速度和吞吐量。

可以使用Lmbench测量上下文切换的时长,使用vmstat测量上下文切换的次数,我们可以关注CS字段。

并发三大问题

优化之后,虽然性能变好,但是却引来了三大问题,让我们程序的正确性出现了问题。

可见性一般是缓存导致的。指的是一个线程对共享变量的修改,另外一个线程能够立刻看到。

如果只有一个CPU,则不存在可见性问题。

img

如果是多个CPU,可见性问题并不好解决。

img

原子性是线程切换导致的。把一个或者多个操作在CPU执行的过程中不被中断的特性称为原子性

img
img

有序性是编译优化带来的。

如双重校验锁获取单例:

public class Singleton {
  static Singleton instance;
  static Singleton getInstance(){
    if (instance == null) {
      synchronized(Singleton.class) {
        if (instance == null)
          instance = new Singleton();
        }
    }
    return instance;
  }
}

我们以为的顺序:

  1. 分配一块内存M;
  2. 在内存M上初始化Singleton对象;
  3. 然后M的地址赋值给instance变量。

实际上的顺序:

  1. 分配一块内存M;
  2. 将M的地址赋值给instance变量;
  3. 最后在内存M上初始化Singleton对象。

这样就可能引发NPE问题。

img

Java内存模型JMM:解决可见性和有序性

前面提到:导致可见性的原因是缓存,导致有序性的原因是编译优化,那解决可见性、有序性最直接的办法就是禁用缓存和编译优化,但是这样问题虽然解决了,我们程序的性能可就堪忧了。因此,我们只能按需禁用缓存和编译优化。

因此,Java为我们提供了volatilesynchronizedfinal 三个关键字,以及六项 Happens-Before 规则

synchronized

volatile

语义是:禁用CPU缓存,必须从内从中读取或者写入。

final

表示一个变量是常量。

Happens-Before 规则

含义:前面一个操作的结果对后续操作是可见的

Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则。

程序的顺序性规则

在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。

如下面的,第5行 Happens-Before 第6行。

class VolatileExample {
  int x = 0;
  volatile boolean v = false;
  public void writer() {
    x = 42;
    v = true;
  }
  public void reader() {
    if (v == true) {
      // 这里x会是多少呢?
    }
  }
}

volatile变量规则

对一个volatile变量的写操作, Happens-Before 于后续对这个volatile变量的读操作。

这条规则是指如果A Happens-Before B,且B Happens-Before C,那么A Happens-Before C。

img
  1. “x=42” Happens-Before 写变量 “v=true” ,这是规则1的内容;
  2. 写变量“v=true” Happens-Before 读变量 “v=true”,这是规则2的内容 。

再根据这个传递性规则,我们得到结果:“x=42” Happens-Before 读变量“v=true”。

管程中锁的规则

对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。

线程 start() 规则

指主线程A启动子线程B后,子线程B能够看到主线程在启动子线程B前的操作。

Thread B = new Thread(()->{
  // 主线程调用B.start()之前
  // 所有对共享变量的修改,此处皆可见
  // 此例中,var==77
});
// 此处对共享变量var修改
var = 77;
// 主线程启动子线程
B.start();

线程 join() 规则

指主线程A等待子线程B完成,当子线程B完成后,主线程能够看到子线程的操作。

如果在线程A中,调用线程B的 join() 并成功返回,那么线程B中的任意操作Happens-Before 于该 join() 操作的返回。

Thread B = new Thread(()->{
  // 此处对共享变量var修改
  var = 66;
});
// 例如此处对共享变量修改,
// 则这个修改结果对线程B可见
// 主线程启动子线程
B.start();
B.join()
// 子线程所有对共享变量的修改
// 在主线程调用B.join()之后皆可见
// 此例中,var==66

线程中断规则interrupt

对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。

对象终结规则finalize

一个对象的初始化完成(构造函数执行结束)先行发生于它的finalize()方法的开始。

重排序问题

数据依赖性

image-20220308225710487

这种关系,如果发生了重排序,程序的执行结果就会被改变。

as-if-serial语义

该语义的意思是:不管如何进行重排序,单线程情况下程序的执行结果不能被改变。因此存在数据依赖的不能重排序。

double pi = 3.14; // A 
double r = 1.0; // B 
doublearea=pi*r*r; //C

可见,A和B没有依赖关系,但是C和A、B都有关系,所以C必须最后,A和B可以重新排序。

锁:解决原子性问题synchronized

原子性问题的源头是线程切换。在单核时代,可以通过禁用CPU中断来避免线程切换,在多核时代,即使禁止中断也无法解决原子性问题。因此,我们需要保证同一时刻只有一个线程操作共享变量。

img

我们需要注意锁的范围:

  • 当修饰非静态方法的时候,锁定的是当前实例对象 this。
  • 当修饰静态方法的时候,锁定的是当前类的 Class 对象;
  • 同步方法块,锁的是Synchonized括号里配置的对象;

上面的图是一把锁保护一个资源。

事实上,一把锁可以保护多个资源。

  • 保护没有关联关系的多个资源;
  • 保护有关联关系的多个资源;

我们还需要了解锁的实现原理,这个需要了解JVM的相关知识,如Java对象头。

此外,锁的升级也需要我们了解。

img

更多详情参考:不可不说的Java“锁”事

死锁及解决方案

img

粗粒度的锁浪费性能,可以使用细粒度的锁提高并行度,进行性能优化。但是可能导致死锁:

一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。

img

死锁产生条件

下面4个条件同时发生才会死锁。

  • 互斥,共享资源 X 和 Y 只能被一个线程占用;(不能避免)
  • 占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
  • 不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
  • 循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。

互斥这一条不可避免。其他三条任意破坏一个即可。

破坏占有且等待

一次性申请所有资源。

img

class Allocator {
  private List<Object> als =
    new ArrayList<>();
  // 一次性申请所有资源
  synchronized boolean apply(
    Object from, Object to){
    if(als.contains(from) ||
         als.contains(to)){
      return false;  
    } else {
      als.add(from);
      als.add(to);  
    }
    return true;
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
  }
}

class Account {
  // actr应该为单例
  private Allocator actr;
  private int balance;
  // 转账
  void transfer(Account target, int amt){
    // 一次性申请转出账户和转入账户,直到成功
    while(!actr.apply(this, target))
      ;
    try{
      // 锁定转出账户
      synchronized(this){              
        // 锁定转入账户
        synchronized(target){           
          if (this.balance > amt){
            this.balance -= amt;
            target.balance += amt;
          }
        }
      }
    } finally {
      actr.free(this, target)
    }
  } 
}

用“等待-通知”机制优化循环等待

在上面的代码第32行中,我们用死循环的方式。这种方式当apply耗时长,或者并发冲突量大时就不太适用。更好的方式应该是:

如果线程要求的条件(转出账本和转入账本同在文件架上)不满足,则线程阻塞自己,进入等待状态;当线程要求的条件(转出账本和转入账本同在文件架上)满足后,通知等待的线程重新执行。其中,使用线程阻塞的方式就能避免循环等待消耗CPU的问题。

总结一下:完整的等待-通知机制:线程首先获取互斥锁,当线程要求的条件不满足时,释放互斥锁,进入等待状态;当要求的条件满足时,通知等待的线程,重新获取互斥锁

class Allocator {
  private List als;
  // 一次性申请所有资源
  synchronized void apply(
    Object from, Object to){
    // 经典写法
    // 这里必须用while
    // 当线程被唤醒后,是从wait命令后开始执行的,
    // 而执行时间点往往跟唤醒时间点不一致,所以条件变量此时不一定满足了。
    // 所以通过while循环可以再验证,
    // 而if条件却做不到,它只能从wait命令后开始执行,所以要用while
    while(als.contains(from) ||
         als.contains(to)){
      try{
        wait();
      }catch(Exception e){
      }   
    } 
    als.add(from);
    als.add(to);  
  }
  // 归还资源
  synchronized void free(
    Object from, Object to){
    als.remove(from);
    als.remove(to);
    notifyAll();
  }
}

破坏不可抢占条件

这个synchronized做不到,原因是 synchronized 申请资源的时候,如果申请不到,线程直接进入阻塞状态了,而线程进入阻塞状态,啥都干不了,也释放不了线程已经占有的资源。

只能使用java.util.concurrent.Lock工具类。

破坏循环等待条件

对资源进行编号。

管程Monitor

可以解决并发编程中的两大核心问题:同步和互斥。前者是指同一时刻只允许一个线程访问共享资源,后者是指多个线程之间如何通信、协作。

管程指的是管理共享变量以及对共享变量的操作过程,让他们支持并发。具体来讲是管理类的成员变量和成员方法,让这个类是线程安全的。

解决互斥:将线程不安全的队列封装起来,对外提供线程安全的操作方法,例如入队操作和出队操作。

img

解决同步:在管程模型里,共享变量和对共享变量的操作是被封装起来的。框的上面只有一个入口,并且在入口旁边还有一个入口等待队列。当多个线程同时试图进入管程内部时,只允许一个线程进入,其他线程则在入口等待队列中等待。

img

条件变量条件变量等待队列的作用是什么呢?其实就是解决线程同步问题。你可以结合上面提到的阻塞队列的例子加深一下理解(阻塞队列的例子,是用管程来实现线程安全的阻塞队列,这个阻塞队列和管程内部的等待队列没有关系,本文中一定要注意阻塞队列和等待队列是不同的)。

假设有个线程T1执行阻塞队列的出队操作,执行出队操作,需要注意有个前提条件,就是阻塞队列不能是空的(空队列只能出Null值,是不允许的),阻塞队列不空这个前提条件对应的就是管程里的条件变量。 如果线程T1进入管程后恰好发现阻塞队列是空的,那怎么办呢?等待啊,去哪里等呢?就去条件变量对应的等待队列里面等。此时线程T1就去“队列不空”这个条件变量的等待队列中等待。

再假设之后另外一个线程T2执行阻塞队列的入队操作,入队操作执行成功之后,“阻塞队列不空”*这个条件对于线程T1来说已经满足了,此时线程T2要通知T1,告诉它需要的条件已经满足了。当线程T1得到通知后,会从*等待队列里面出来,但是出来之后不是马上执行,而是重新进入到入口等待队列里面。

Java内置的管程方案synchronized,只有一个条件变量。

img

wait操作是把当前线程放入条件变量的等待队列中,notifyall。

Java线程相关知识

通用线程生命周期

img
  1. 初始状态,指的是线程已经被创建,但是还不允许分配CPU执行。这个状态属于编程语言特有的,不过这里所谓的被创建,仅仅是在编程语言层面被创建,而在操作系统层面,真正的线程还没有创建。
  2. 可运行状态,指的是线程可以分配CPU执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配CPU执行。
  3. 当有空闲的CPU时,操作系统会将其分配给一个处于可运行状态的线程,被分配到CPU的线程的状态就转换成了运行状态
  4. 运行状态的线程如果调用一个阻塞的API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放CPU使用权,休眠状态的线程永远没有机会获得CPU使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
  5. 线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。

Java线程生命周期

  1. NEW(初始化状态)
  2. RUNNABLE(可运行/运行状态)
  3. BLOCKED(阻塞状态)
  4. WAITING(无时限等待)
  5. TIMED_WAITING(有时限等待)
  6. TERMINATED(终止状态)

在操作系统层面,Java线程中的BLOCKED、WAITING、TIMED_WAITING是一种状态,即前面我们提到的休眠状态。也就是说只要Java线程处于这三种状态之一,那么这个线程就永远没有CPU的使用权

img

CPU密集型

IO密集型

并发工具类

Lock和Condition

class X {
  private final Lock rtl = new ReentrantLock();
  int value;
  public void addOne() {
    // 获取锁
    rtl.lock();  
    try {
      value+=1;
    } finally {
      // 保证锁能释放
      rtl.unlock();
    }
  }
}

为什么设计Lock和Condition

我们知道synchronized实现了管程,但是SDK层面又定义了Lock和Condition两个接口,前者可以解决互斥问题,后者可以解决同步问题,同样可以实现管程。那为什么又定义新的接口呢?

解决死锁问题,有一个方案是【破坏不可抢占条件】。但是synchronized并不能解决这个问题。这是因为synchronized申请资源的时候,如果申请不到,线程直接进入阻塞状态了。但是线程一旦进入阻塞,就无法释放线程已经占有的资源。但是我们期望的是:对于“不可抢占”这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏掉了。

如果新设计一把锁,那应该怎么做呢?

  1. 能够响应中断。synchronized的问题是,持有锁A后,如果尝试获取锁B失败,那么线程就进入阻塞状态,一旦发生死锁,就没有任何机会来唤醒阻塞的线程。但如果阻塞状态的线程能够响应中断信号,也就是说当我们给阻塞的线程发送中断信号的时候,能够唤醒它,那它就有机会释放曾经持有的锁A。这样就破坏了不可抢占条件了。
  2. 支持超时。如果线程在一段时间之内没有获取到锁,不是进入阻塞状态,而是返回一个错误,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。
  3. 非阻塞地获取锁。如果尝试获取锁失败,并不进入阻塞状态,而是直接返回,那这个线程也有机会释放曾经持有的锁。这样也能破坏不可抢占条件。

相应的,lock接口提供了三个接口:

// 支持中断的API
void lockInterruptibly() 
  throws InterruptedException;
// 支持超时的API
boolean tryLock(long time, TimeUnit unit) 
  throws InterruptedException;
// 支持非阻塞获取锁的API
boolean tryLock();

此外,synchronized只支持一个条件变量,而Condition可以支持多个条件变量。

可见性原理

synchronized的解锁 Happens-Before 于后续对这个锁的加锁。

那么lock呢?以上面的程序为例。利用了volatile相关的Happens-Before规则:

  1. 顺序性规则:对于线程T1,value+=1 Happens-Before 释放锁的操作unlock();
  2. volatile变量规则:由于state = 1会先读取state,所以线程T1的unlock()操作Happens-Before线程T2的lock()操作;
  3. 传递性规则:线程 T1的value+=1 Happens-Before 线程 T2 的 lock() 操作。

两个条件变量实现阻塞队列

一个阻塞队列,需要两个条件变量,一个是队列不空(空队列不允许出队),另一个是队列不满(队列已满不允许入队)。

public class BlockedQueue{
  final Lock lock = new ReentrantLock();
  // 条件变量:队列不满  
  final Condition notFull = lock.newCondition();
  // 条件变量:队列不空  
  final Condition notEmpty = lock.newCondition();

  // 入队
  void enq(T x) {
    lock.lock();
    try {
      while (队列已满){
        // 等待队列不满
        notFull.await();
      }  
      // 省略入队操作...
      //入队后,通知可出队
      notEmpty.signal();
    }finally {
      lock.unlock();
    }
  }
  // 出队
  void deq(){
    lock.lock();
    try {
      while (队列已空){
        // 等待队列不空
        notEmpty.await();
      }  
      // 省略出队操作...
      //出队后,通知可入队
      notFull.signal();
    }finally {
      lock.unlock();
    }  
  }
}

synchronized不需要释放锁,lock必须要释放。

Semaphore信号量

信号量模型

在信号量模型里,计数器和等待队列对外是透明的,所以只能通过信号量模型提供的三个方法来访问。

img

  • init():设置计数器的初始值。
  • down():计数器的值减1;如果此时计数器的值小于0,则当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加1;如果此时计数器的值小于或者等于0,则唤醒等待队列中的一个线程,并将其从等待队列中移除。

而且,这三个方法都是原子性的,原子性由信号量模型来实现的。

class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 
  void down(){
    this.count--;
    if(this.count<0){
      //将当前线程插入等待队列
      //阻塞当前线程
    }
  }
  void up(){
    this.count++;
    if(this.count<=0) {
      //移除等待队列中的某个线程T
      //唤醒线程T
    }
  }
}
static int count;
//初始化信号量
static final Semaphore s = new Semaphore(1);
//用信号量保证互斥    
static void addOne() {
  s.acquire();
  try {
    count+=1;
  } finally {
    s.release();
  }
}

假设两个线程T1和T2同时访问addOne()方法,当它们同时调用acquire()的时候,由于acquire()是一个原子操作,所以只能有一个线程(假设T1)把信号量里的计数器减为0,另外一个线程(T2)则是将计数器减为-1。对于线程T1,信号量里面的计数器的值是0,大于等于0,所以线程T1会继续执行;对于线程T2,信号量里面的计数器的值是-1,小于0,按照信号量模型里对down()操作的描述,线程T2将被阻塞。所以此时只有线程T1会进入临界区执行count+=1;

当线程T1执行release()操作,也就是up()操作的时候,信号量里计数器的值是-1,加1之后的值是0,小于等于0,按照信号量模型里对up()操作的描述,此时等待队列中的T2将会被唤醒。于是T2在T1执行完临界区代码之后才获得了进入临界区执行的机会,从而保证了互斥性。

为什么需要信号量

上面例子,我们用Semaphore实现了互斥锁。那Semaphore还有什么功能呢?Semaphore可以允许多个线程访问一个临界区

应用:限流器

如数据库连接池,同一时刻,允许多个线程使用连接池。对象池也是如此,指的是一次性创建出N个对象,之后所有的线程重复利用这N个对象,当然对象在被释放前,也是不允许其他线程使用的。

class ObjPool<T, R> {

  final List<T> pool;

  // 用信号量实现限流器
  final Semaphore sem;

  // 构造函数
  ObjPool(int size, T t){
    pool = new Vector<T>(){};
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    sem = new Semaphore(size);
  }

  // 利用对象池的对象,调用func
  R exec(Function<T,R> func) {
    T t = null;
    sem.acquire();
    try {
      t = pool.remove(0);
      return func.apply(t);
    } finally {
      pool.add(t);
      sem.release();
    }
  }
}

// 创建对象池

ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通过对象池获取t,之后执行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

我们用一个List来保存对象实例,用Semaphore实现限流器。关键的代码是ObjPool里面的exec()方法,这个方法里面实现了限流的功能。在这个方法里面,我们首先调用acquire()方法(与之匹配的是在finally里面调用release()方法),假设对象池的大小是10,信号量的计数器初始化为10,那么前10个线程调用acquire()方法,都能继续执行,相当于通过了信号灯,而其他线程则会阻塞在acquire()方法上。对于通过信号灯的线程,我们为每个线程分配了一个对象 t(这个分配工作是通过pool.remove(0)实现的),分配完之后会执行一个回调函数func,而函数的参数正是前面分配的对象 t ;执行完回调函数之后,它们就会释放对象(这个释放工作是通过pool.add(t)实现的),同时调用release()方法来更新信号量的计数器。如果此时信号量里计数器的值小于等于0,那么说明有线程在等待,此时会自动唤醒等待的线程。

ReadWriteLock读写锁

读写锁,适用于读多写少的场景。

读写锁三大原则:

  1. 允许多个线程同时读共享变量;
  2. 只允许一个线程写共享变量;
  3. 如果一个写线程正在执行写操作,此时禁止读线程读共享变量。

读写锁和互斥锁的区别是:前者允许多个线程同时读共享变量,后者不允许。

读写锁实现缓存

class Cache {
  final Map m = new HashMap<>();
  final ReadWriteLock rwl = new ReentrantReadWriteLock();
  // 读锁
  final Lock r = rwl.readLock();
  // 写锁
  final Lock w = rwl.writeLock();
  // 读缓存
  V get(K key) {
    r.lock();
    try { 
      return m.get(key); 
    }
    finally { 
      r.unlock(); 
    }
  }
  // 写缓存
  V put(K key, V value) {
    w.lock();
    try { 
      return m.put(key, v);
    }
    finally { 
      w.unlock(); 
    }
  }
}

我们声明了一个Cache类,其中类型参数K代表缓存里key的类型,V代表缓存里value的类型。缓存的数据保存在Cache类内部的HashMap里面,HashMap不是线程安全的,这里我们使用读写锁ReadWriteLock 来保证其线程安全。ReadWriteLock 是一个接口,它的实现类是ReentrantReadWriteLock,通过名字你应该就能判断出来,它是支持可重入的。下面我们通过rwl创建了一把读锁和一把写锁。

StampedLock性能更高的读写锁

final StampedLock sl = new StampedLock();
  
// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try {
  //省略业务相关代码
} finally {
  sl.unlockRead(stamp);
}

// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try {
  //省略业务相关代码
} finally {
  sl.unlockWrite(stamp);
}

比ReadWriteLock更快的读写锁,始于Java8。

StampedLock和ReadWriteLock的区别:

  • StampedLock支持3种模式:写锁,悲观读和乐观读;支持多个读;乐观读的时候支持写;
  • ReadWriteLock支持2种模式:写锁,悲观读;支持多个读;读的时候不支持写;
class Point {
  private int x, y;
  final StampedLock sl = new StampedLock();
  //计算到原点的距离  
  int distanceFromOrigin() {
    // 乐观读
    long stamp = sl.tryOptimisticRead();
    // 读入局部变量,
    // 读的过程数据可能被修改
    int curX = x, curY = y;
    //判断执行读操作期间,
    //是否存在写操作,如果存在,
    //则sl.validate返回false
    if (!sl.validate(stamp)){
      // 升级为悲观读锁
      stamp = sl.readLock();
      try {
        curX = x;
        curY = y;
      } finally {
        //释放悲观读锁
        sl.unlockRead(stamp);
      }
    }
    return Math.sqrt(
      curX * curX + curY * curY);
  }
}

StampedLock不支持重入;悲观读锁、写锁都不支持条件变量;不支持中断,会造成CPU飙升;

final StampedLock lock= new StampedLock();
Thread T1 = new Thread(()->{
  // 获取写锁
  lock.writeLock();
  // 永远阻塞在此处,不释放写锁
  LockSupport.park();
});
T1.start();
// 保证T1获取写锁
Thread.sleep(100);
Thread T2 = new Thread(()->
  //阻塞在悲观读锁
  lock.readLock()
);
T2.start();
// 保证T2阻塞在读锁
Thread.sleep(100);
//中断线程T2
//会导致线程T2所在CPU飙升
T2.interrupt();
T2.join();

CountDownLatch和CyclicBarrier:线程同步工具类

我们有一个对账系统:

img

可以使用CountDownLatch让主线程等待2个子线程。

// 创建2个线程的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
  // 计数器初始化为2
  CountDownLatch latch = new CountDownLatch(2);
  // 查询未对账订单
  executor.execute(()-> {
    pos = getPOrders();
    latch.countDown();
  });
  // 查询派送单
  executor.execute(()-> {
    dos = getDOrders();
    latch.countDown();
  });
  
  // 等待两个查询操作结束
  latch.await();
  
  // 执行对账操作
  diff = check(pos, dos);
  // 差异写入差异库
  save(diff);
}

前面我们实现了getPOrders()和getDOrders()的并行;事实上,对账操作也可以和查询进行并行;

img

这个方案的难点有2个:一个是线程T1和T2要做到步调一致,另一个是要能够通知到线程T3。

// 订单队列
Vector pos;
// 派送单队列
Vector dos;
// 执行回调的线程池
// 1.使用线程池是为了异步操作,否则回掉函数是同步调用的,也就是本次对账操作执行完才能进行下一轮的检查。
// 2.线程数量固定为1,防止了多线程并发导致的数据不一致,因为订单和派送单是两个队列,只有单线程去两个队列中取消息才不会出现消息不匹配的问题。
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, ()->{
    executor.execute(()->check());
  });
void check(){
  P p = pos.remove(0);
  D d = dos.remove(0);
  // 执行对账操作
  diff = check(p, d);
  // 差异写入差异库
  save(diff);
}
void checkAll(){
  // 循环查询订单库
  Thread T1 = new Thread(()->{
    while(存在未对账订单){
      // 查询订单库
      pos.add(getPOrders());
      // 等待
      barrier.await();
    }
  });
  T1.start();  
  // 循环查询运单库
  Thread T2 = new Thread(()->{
    while(存在未对账订单){
      // 查询运单库
      dos.add(getDOrders());
      // 等待
      barrier.await();
    }
  });
  T2.start();
}

总结:CountDownLatch主要用来解决一个线程等待多个线程的场景,而CyclicBarrier是一组线程之间互相等待。CountDownLatch的计数器是不能循环利用的,也就是说一旦计数器减到0,再有线程调用await(),该线程会直接通过。但CyclicBarrier的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到0会自动重置到你设置的初始值。除此之外,CyclicBarrier还可以设置回调函数,可以说是功能丰富。

img

List里面只有一个实现类就是CopyOnWriteArrayList。写的时候会将共享变量新复制一份出来,这样做的好处是读操作完全无锁。

Map接口的两个实现是ConcurrentHashMap和ConcurrentSkipListMap,它们从应用的角度来看,主要区别在于ConcurrentHashMap的key是无序的,而ConcurrentSkipListMap的key是有序的。所以如果你需要保证key的顺序,就只能使用ConcurrentSkipListMap。

img

Set接口的两个实现是CopyOnWriteArraySet和ConcurrentSkipListSet,使用场景可以参考前面讲述的CopyOnWriteArrayList和ConcurrentSkipListMap。

Queue

  • 单端阻塞:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue和DelayQueue
  • 双端阻塞:LinkedBlockingDeque
  • 单端非阻塞:ConcurrentLinkedQueue
  • 双端非阻塞:ConcurrentLinkedDeque

原子类Unsafe

img

无锁方案的性能比较好,基本没有死锁问题,但是可能会有饥饿和活锁问题。此外,原子类只能解决单个共享变量的问题,多个共享变量的原子性问题,最好还是采用互斥锁方案。

Java魔法类:Unsafe应用解析

Executor与线程池

Java线程池实现原理及其在美团业务中的实践

Future:获取线程的执行结果

CompletableFuture回调机制的设计与实现

CPU相关并发能力

image-20220307230748609

原子操作实现原理

image-20220307232347611

Java实现原子操作

  1. CAS:Compare And Swap
  2. CAS三大问题
    1. ABA问题
    2. 循环时间开销大
    3. 只能保证一个共享变量的原子操作
  3. 使用锁机制实现原子操作

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK