23

LockSupport源码分析

 4 years ago
source link: http://yizhanggou.top/javazhong-de/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

concurrent包基于AQS框架,AQS框架基于两个类:

  • Unsafe(提供CAS操作)
  • LockSupport(提供park/unpark操作)

LockSupport

public static void park() {
        UNSAFE.park(false, 0L);
    }
public static void unpark(Thread thread) {
        if (thread != null)
            UNSAFE.unpark(thread);
    }

Unsafe

Unsafe类中对应的方法:
park是将当前调用Thread阻塞,unpark是将指定线程Thread唤醒。

    //park
    public native void park(boolean isAbsolute, long time);
    
    //unpack
    public native void unpark(Object var1);

与Object的wait/notify机制对比,park/unpack的优点

  • 以Thread为操作对象更符合阻塞线程的直观定义;
  • 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性。

“许可”的概念

  1. park/unpark的设计原理核心是“许可”,park是等待一个许可,unpark是为某线程提供一个许可。

如果某个线程A调用park,则需要等待另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

  1. unpark可以在park之前。

先提供许可,当某线程调用park时,已经有许可了,就消费这个许可,然后可以继续运行。

  1. “许可”不可叠加,是一次性的。

如果线程B运行了三次unpark,当线程A调用park消费掉这个“许可”之后,再次调用park需要进入等待状态。

mutex和condition保护了一个_counter的变量,当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。

每个Java线程都有一个Parker实例

private:
  Parker*    _parker;
public:
  Parker*     parker() { return _parker; }

void JavaThread::initialize() {
  // Initialize fields
  _parker = Parker::Allocate(this) ;
}

Park类: _counter就是“许可”

class Parker : public os::PlatformParker {  
private:  
  volatile int _Event ;  
  ...  
public:  
  void park(bool isAbsolute, jlong time);  
  void unpark();  
  ...  
}  
class PlatformParker : public CHeapObj<mtInternal> {  
  protected:  
    pthread_mutex_t _mutex [1] ;  
    pthread_cond_t  _cond  [1] ;  
    ...  
}  

先尝试能否直接获得“许可”,即_counter>0。如果成功,则把_counter设置为0并返回。如果不成功,则获取锁,等待信号发生。获取许可后,设置可用许可数为0,unlock mutex并返回。

void os::PlatformEvent::park() { 
//_Event是个int变量,如果CAS更新成功,即成功将_Event减1,则退出循环
  int v ;
  for (;;) {
      v = _Event ;
      //使用原子方法xchg将0放入寄存器,与_Event所指的内容交换,即_Event=0,然后返回_Event原先的值
      if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
  }
  guarantee (v >= 0, "invariant") ;
 //v=_Event,v=0表示无许可,则需要堵塞等待获得许可;
  if (v == 0) {
     int status = pthread_mutex_lock(_mutex);//获取锁
     assert_status(status == 0, status, "mutex_lock");
     guarantee (_nParked == 0, "invariant") ;
     ++ _nParked ;
    //等待许可,调用pthread_cond_wait进行等待
     while (_Event < 0) {
       //pthread_cond_wait会加入等待队列,同时释放_mutex锁,
      //等待signal方法唤醒,唤醒之后需要重新获取_mutex锁,方法才能返回
        status = pthread_cond_wait(_cond, _mutex);
        if (status == ETIME) { status = EINTR; }
        assert_status(status == 0 || status == EINTR, status, "cond_wait");
     }
     -- _nParked ;
     //获取许可之后,设置可用许可数为0;由此可见许可数最大为1
    _Event = 0 ;
     status = pthread_mutex_unlock(_mutex);//释放锁
     assert_status(status == 0, status, "mutex_unlock");
    OrderAccess::fence();//内存屏障语句
  }
  guarantee (_Event >= 0, "invariant") ;
}

unpark

设置_Event值后,如果原始值大于0表示之前没有park在等待,直接返回。

否则需要通知park线程:
如果WorkAroundNPTLTimedWaitHang=true,会先调用singal再释放锁。如果为false会先释放锁再调用signal。

void os::PlatformEvent::unpark() {
 //使用原子方法xchg将1放入寄存器,与_Event所指的内容交换,即_Event=1,然后返回_Event原先的值,
 //如果返回值大于等于0,表示有许可有用,方法直接返回;
  if (Atomic::xchg(1, &_Event) >= 0) return;
 //如果原来的_Event小于0,说明有park方法进入了pthread_cond_wait堵塞
  int status = pthread_mutex_lock(_mutex);//获取锁
  assert_status(status == 0, status, "mutex_lock");
  int AnyWaiters = _nParked;
  assert(AnyWaiters == 0 || AnyWaiters == 1, "invariant");
   //NPTL存在瑕疵,当pthread_cond_timedwait() 方法时间参数为过去时间,
   //会导致_cond变量被破坏或者线程被hang住;
   //WorkAroundNPTLTimedWaitHang 是JVM的运行参数,默认为1
  if (AnyWaiters != 0 && WorkAroundNPTLTimedWaitHang) {
    AnyWaiters = 0;
    //调用signal唤醒pthread_cond_wait调用线程,不判断方法执行结果
    pthread_cond_signal(_cond);
  }
  //然后释放_mutex锁
  status = pthread_mutex_unlock(_mutex);
  assert_status(status == 0, status, "mutex_unlock");
  if (AnyWaiters != 0) {
   // //调用signal唤醒pthread_cond_wait调用线程,并判断方法执行结果
    status = pthread_cond_signal(_cond);
    assert_status(status == 0, status, "cond_signal");
  }
}

Atomic方法

上面使用到了Atomic的xchg和cmpxchg方法,这两个方法是采用汇编实现的:

asm ( assembler template  
    : output operands                   (optional)  
    : input operands                    (optional)  
    : clobbered registers list          (optional)  
    );  

output operands和inpupt operands指定参数,它们从左到右依次排列,用','分割,编号从0开始。

inline jint     Atomic::xchg    (jint     exchange_value, volatile jint*     dest) {
  __asm__ volatile (  "xchgl (%2),%0"
                    : "=r" (exchange_value)
                    : "0" (exchange_value), "r" (dest)
                    : "memory");
  return exchange_value;
}

将exchange_value(%0)的值放入通用寄存器,与dest(%2)所指的内容进行交换,返回dest指针原指向内容的值;

  • %0为exchange_value,%1为exchange_value,%2为dest;
  • "r"表示将dest的值读到一个通用寄存器;
  • "0"表示和%0使用同样的通用寄存器,此处表示将exchange_value值读入通用寄存器;
  • "=r"表示将结果写入到exchange_value,而且要使用通用寄存器,由于通用寄存器的内容已经被设置为dest所指向的内容,因此exchange_value等于原dest所指向的内容;
  • asm指示编译器在此插入汇编语句;
  • volatile告诉编译器,严禁将此处的汇编语句与其它的语句重组合优化,即原原本本按原来的样子处理这里的汇编;
  • memory强制gcc编译器假设RAM所有内存单元均被汇编指令修改,这样cpu中的registers和cache中已缓存的内存单元中的数据将作废。cpu将不得不在需要的时候重新读取内存中的数据。
int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
static inline bool is_MP() {
    assert(_processor_count > 0, "invalid processor count");
    return _processor_count > 1;
  }
  • mp表示是否属于多核cpu环境,如果是则LOCK_IF_MP会插入lock指令;
  • %0为exchange_value,%1为exchange_value,%2为compare_value,%3为dest,%4为mp;
  • "a" (compare_value)表示将compare_value读入eax寄存器,与%3(dest)进行比较,如果相等则将%1(exchange_value)写入dest;
  • "=a" (exchange_value)表示将eax寄存器的内容写入到exchange_value;

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK