6

JUC源码学习笔记7——FutureTask源码解析,人生亦如是,run起来才有结果 - Cuzzz

 1 year ago
source link: https://www.cnblogs.com/cuzzz/p/17021025.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

系列文章目录和关于我

一丶我们在哪里会使用到FutureTask#

基本上工作中和Future接口 打交道比较多,比如线程池ThreadPoolExecutor#sumbit方法,返回值就是一个Future(实际上基本上就是一个FutureTask)。ThreadPoolExecutor#sumbit需要传入一个Callable,我们作为调用方法,在其中的call方法编写业务逻辑,然后ThreadPoolExecutor会将其包装为一个FutureTask 提交到线程池中(异步),并立马返回FutureTask,从而让调用方可以取消任务,查看任务是否运行结束,获取异步任务结果
FutureTask就如同一个纽带,连接了任务 和 任务的结果

二丶何为FutureTask#

FutureTask源码注释中写到:

FutureTask是可取消的异步任务。此类提供Future的基本实现,包括启动,取消、查询以查看任务是否完成以及获取任务结果的方法。只有在任务完成后才能检索结果,如果尚未完成,get方法将阻塞。任务完成后,无法重新启动或取消任务(除非使用runAndReset调用任务)。

FutureTask可用于包装Callable或Runnable对象。因为FutureTask实现了Runnable,所以FutureTask可以提交给Executor执行。

三丶FutureTask继承关系#

image-20230102213501620

1.Future#

Future 表示异步任务的运行结果,它定义了取消、查询以查看任务是否完成以及获取任务结果的方法。只有在任务完成后才能检索结果,如果尚未完成,get方法将阻塞。任务完成后,无法重新启动或取消任务(除非使用runAndReset调用任务)

方法 解释
boolean cancel(boolean mayInterruptIfRunning) 尝试取消此任务的执行。如果任务已完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。如果成功且在调用取消时此任务尚未启动,则任务不会运行。如果任务已经开始,则 mayInterruptIfRunning 参数确定是否应该中断执行该任务的线程以尝试停止该任务。此方法返回后,对 isDone 的后续调用将始终返回 true。如果此方法返回 true,则对 isCancelled 的后续调用将始终返回 true。
boolean isCancelled() 如果此任务在正常完成之前被取消,则返回 true
boolean isDone() 如果此任务完成,则返回 true。完成可能是由于正常终止、异常或取消——在这些情况下,此方法都将返回 true。
V get() throws InterruptedException, ExecutionException 如果任务没有执行完,那么一直阻塞直到任务完成,直到任务运行成功,或者运行失败,或者运行任务线程被中断
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException get()的超时等待版本,指定当前线程等待时间,如果超时任然没有执行结束,那么抛出TimeoutException

2.RunnableFuture#

"可以运行的未来",即表示一个可以运行的任务(是一个Runnable),也表示异步任务的结果(是一个Future)

四丶带着问题阅读源码#

小小FutureTask牛在哪儿昵,为什么是doug lea写的,我不行?

  • get方法调用的时候,如果任务没有成功,怎么实现调用线程的阻塞
  • get方法出现异常,或者正常结束的时候,怎么唤醒调用get方法的线程
  • 在任务运行之前,如果任务被取消那么将无法运行,已经开始运行的任务只能尝试中断运行任务的线程,无法取消,那么doug lea 怎么判断运行和取消的先后顺序(注意运行和取消可以是多个线程调用)怎么处理这里的线程不安全问题

五丶FutureTask的属性和内部类#

1.状态#

为多个线程对FutureTask运行状态的可见性,FutureTask具备属性private volatile int state,使用volatile修饰,可取以下值(Future定义了对应的常量):

常量 解释
NEW 任务处于新建状态
COMPLETING 任务正在完成,意味着异步计算结束,但是结果没有写回到 outcome属性上
NORMAL 任务正常结束,说明程序员定义的业务逻辑,没有抛出异常
EXCEPTIONAL 任务运行失败,说明程序员定义的业务逻辑抛出异常
CANCELLED 任务被取消,说明调用方法调用了cancel方法,在任务运行之前取消了任务
INTERRUPTING 任务正在被中断,是一个瞬态,使用cancel(true),调用方正在中断运行任务的线程
INTERRUPTED 任务已经被中断

1.1什么是COMPLETING,正在完成是什么鬼#

FutureTask使用outcome属性记录任务运行结果,或者运行失败的抛出的异常,在我们定义的业务逻辑(`ThreadPoolExecutor#sumbit传入的Callable)成功运行结束,到运行的结果写回到outcome,并不是一个瞬间动作,在运行结果赋值到outcome的时候,会先将FutureTask状态修改为COMPLETING

这样做的目的是,任务处于COMPLETING,这是一个线程调用get获取任务的时候,不会让调用线程阻塞而是让这个线程yield放弃cpu稍等片刻,任务结果马上写回了。

1.2什么是INTERRUPTING,正在中断是什么鬼#

考虑一个情况,线程A正在执行任务,线程BCD都调用了get,线程E调用了FutureTask#cancel(true),这时候需要中断线程A,并且我们在这个FutureTask的业务逻辑中定义了,如果被中断,那么任务结束,抛出异常等逻辑

这时候FutureTask#run会抛出一个异常(什么异常取决于FutureTask的业务逻辑抛出什么),那么线程BCD调用get造成的阻塞如何处理,需要去唤醒BCD,在修改状态到中断其实是存在短暂的时间的,在这短暂的时间内线程A会yield,放弃CPU,等待线程E中断方法调用结束,然后线程E会唤醒BCD。

2.任务,运行任务的线程,任务执行结果#

image-20230102222555133

任务被包装为Callable对象,任务结果使用outcome属性记录,运行任务的线程使用runner属性记录。

需要注意的是这里的outcome,没有使用volatile,后面的注释写到non-volatile, protected by state reads/writes(不使用volatile修饰,可见性由state属性的写入和读取保证),为什么不需要volatile关键字修饰,我们看源码的时候解释

3.WaitNode ——等待任务运行结束的线程#

我们上面说到,调用get方法的时候,如果任务没有结束,那么调用线程,将阻塞到任务结束。这意味着任务结束的时候,调用线程将被唤醒,那么哪些线程需要唤醒?FutureTask使用WaitNode类型的属性记录

image-20230102223036435

通过WaitNode属性记录调用线程,并且使用next属性串联起其他等待的线程,使用调用线程的记录。

image-20230102223650870

看完这些属性,我们来拜读doug lea大师的代码

六丶源码解读#

1.构造方法#

image-20230102224012912

可以看到如果传入Runnable,那么将被使用RunnableAdapter包装成Callable,典型的适配器模式,通过RunnableAdapter适配RunnableCallable

需要注意callable没有使用volatile修饰,doug lea 不担心重排序的问题么

如果执行FutureTask的构造方法的时候,发生重排序,this.callable的赋值重排序到外部获取到构造方法生成的FutureTask的后面,并且立马有另外一个线程调用了FutureTask的任务执行方法,这时候this.callable还来不及赋值,调用执行方法抛出空指针异常。那么为什么不用volatile修饰callable还能保证其可见性昵,能让源码写上// ensure visibility of callable这行注释昵?

在《JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法》的学习笔记中,我们说过volatile变量写具备如下内存屏障

img

这里的store store屏障防止了this.callable的赋值重排序到this.state = NEW之后,且后续的store屏障会保证当前线程(构造FutureTask的线程)工作内存会立马写回到主内存,并让其他线程关于此FutureTask的缓存无效,从而保证了callable的线程可见性。

2.run#

我们从run方法看起,run方法中的许多逻辑,牵扯到其他的方法,可能需要总览全局才能彻底理解

public void run() {
    //如果不是初始,说明有其他线程启动了,或者说其他线程取消了任务 那么不需要运行
    //如果是new 但是cas runner失败了,说明同时有多个线程执行此cas,当前线程没有抢过,那么不能执行此任务,
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
	
        //再次校验下是否为初始状态 如果不是 说明在当前线从第一个if到此存在其他线程取消任务
        //任务启动之前可以取消任务的运行
        if (c != null && state == NEW) {
            V result;
	        //记录当前任务是否成功执行,如果Callable代码写错了,
            //或者说Callable响应中断,执行的途中被中断那么为false
            boolean ran;
            try {
                //业务逻辑执行
                result = c.call();
                //成功执行
                ran = true;
            } catch (Throwable ex) {
                //这里可能是Callable本身代码逻辑错误异常 也可能是响应中断抛出异常
                result = null;
                ran = false;
           		//记录异常
                setException(ex);
            }
            if (ran)
                //设置任务正常执行结果
                set(result);
        }
    } finally {
        runner = null;
        int s = state;
        //处理中断
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
  • if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))

    任务不是new,或者cas设置当前线程为runner失败,那么直接返回false

    • 如果任务当前状态不是new,说明有其他线程运行了,或者说其他线程取消了任务
    • 如果是new 但是cas runner失败了,说明同时有多个线程执行此cas,当前线程没有抢过,那么不能执行此任务,这里使用CAS确保任务不会被其他线程再次执行
  • if (c != null && state == NEW) 为什么上面state!=NEW都false了还要再次判断这里state==NEW

    因为state != NEWUNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) 并非是一个原子性操作,这里是为了确保前线程从第一个if到第二个if,这一段代码执行时间内,没有其他线程取消任务。如果存在其他线程取消了任务,那么state == NEW就不成立——任务执行前可以取消任务

2.1记录任务执行结果setExceptionset方法#

  • 如果运行出现异常

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
  • 正常运行结束

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    

这两个方法都差不多,都是上来一个CAS将state从new转变为COMPLETING,然后用outcome记录异常或者记录成功返回值,然后使用UNSAFE.putOrderedInt改变state,如果是出现异常,那么设置状态为EXCEPTIONAL,如果正常结束设置为NORMAL

  • 为什么使用UNSAFE.putOrderedInt 为什么outcome没有使用volatile修饰,doug lea都不担心可见性的问题么

    UNSAFE.putOrderedInt这个方法我在《JUC源码学习笔记4——原子类,CAS,Volatile内存屏障,缓存伪共享与UnSafe相关方法》中关于AtomicInteger lazySet中说过,Store load屏障可以让后续的load指令对其他处理器可见,但是需要将其他处理器的缓存设置成无效让它们重新从主内存读取,putOrderedInt提供一个store store屏障,然后写数据,store store是保证putOrderedInt之前的普通写入和putOrderedInt的写入不会重排序,但是不保证下面的volatile读写不被重排序,省去了store load内存屏障,提高了性能,但是后续的读可能存在可见性的问题。putOrderedInt的store store屏障保证了outcome回立即刷新回主存

    //也就是说
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
    		//outcome是刷新回主存的,且不会重排序到putOrderedInt后面
            //这也是outcome没有使用volatile修饰的原因之一,
            //有后续调用putOrderedInt方法保证其对其他线程的可见性
            outcome = v;
            
            //state字段使用putOrderedInt写入 其他线程存在可见性的问题
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            
            finishCompletion();
        }
    }
    

    那么state的可见性问题doug lea 如何解决?接着看下去

2.2finishCompletion 完成对等待线程的唤醒#

private void finishCompletion() {
    // 等待的节点
    for (WaitNode q; (q = waiters) != null;) {
        
        //将当前futureTask的waiters属性cas为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            //唤醒所有get方法阻塞的线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //唤醒
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                //直到一个为null的节点,意味着遍历结束
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            //结束
            break;
        }
    }
	//钩子方法 留给我们自己扩展
    done();
	
    //将任务置为null
    callable = null;        // to reduce footprint
}

这里拿到waiters然后进行自旋遍历所有等待的节点线程,然后唤醒它们,有意思的点在UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)为何这里要使用CAS更新waiters为null昵?

因为这里存在线程A执行完FutureTask调用finishCompletion的同时线程B调用get进行等待,调用get方法进行排队(排队时也是CAS设置自己为waiters)这两个CAS必定有一个成功,有一个失败

  • 如果A失败,说明B在A唤醒之前进行排队,挂起自己,那么A在自旋唤醒的时候会唤醒B
  • 如果B失败,那么说明B在A唤醒之后进行排队,那么这时候不需要排队了,因为任务已经完成了,B只需要进行自旋获取返回结果即可

这一点我们在get方法的源码分析的时候,会深有体会

其次在取消任务的时候也会调用finishCompletion唤醒等待的线程,所有finishCompletion的调用存在线程安全问题,需要使用cas保证线程安全

2.3处理因为cancel造成的中断#handlePossibleCancellationInterrupt#

在run方法的finally块中存在

//运行完设置runner为空
runner = null;
//重新获取状态
int s = state;
 //如果是INTERRUPTING 或者INTERRUPTED
if (s >= INTERRUPTING)
    //handlePossibleCancellationInterrupt
    handlePossibleCancellationInterrupt(s);

private void handlePossibleCancellationInterrupt(int s) {
    if (s == INTERRUPTING)
        //如果是打断中 那么等待直到结束打断
        while (state == INTERRUPTING)
            Thread.yield(); 

cancel方法可以选择传入true表示,如果任务还在运行那么调用运行任务线程的interrupt方法进行中断,如果是调用cancel的线程还没有完成中断那么当前运行的线程会让步,为什么这么做,我们上面说到过,A线程运行任务,B线程cancel任务,B中断线程A其实是需要时间的,B会先修改任务状态为INTERRUPTING,然后中断线程A,然后修改状态为INTERRUPTED并唤醒等待的线程,从INTERRUPTING - > INTERRUPTED 这段时间,线程A只需要让出cpu等待即可

3.get 获取任务执行结果#

  • get()无限等待任务执行完
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    //任务为NEW 和 COMPLETING 那么调用那么会调用awaitDone
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    //此方法如果发现FuturetTask调用异常那么抛出异常
    return report(s);
}
  • get(long timeout, TimeUnit unit)超时等待任务完成
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    //如果状态小于等于COMPLETING ( NEW 和 COMPLETING) 那么会调用awaitDone
    //如果awaitDone结束的时候返回的状态还是 NEW or COMPLETING 抛出超时异常
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    //此方法如果发现FuturetTask调用异常那么抛出异常
    return report(s);
}

二者最终都调用了awaitDone(是否超时等待,等待时长)

3.1如果状态小于等于COMPLETING#

这意味着 状态为new,任务都运行完业务逻辑,或者状态为COMPLETING,业务逻辑运行完了但是outcome正在赋值为执行结果,对outcome赋值后,会修改状态为NORMAL(任务正常完成),或者EXCEPTIONAL(任务执行抛出异常),所有get方法只有状态小于等于COMPLETING,才会调用awaitDone挂起线程进行等待

3.2 awaitDone等待直到任务完成或者超时#

调用此方法的前提是,任务逻辑没有执行完,或者逻辑执行完但是结果还没有赋值给outcome,那么这两种情况doug lea如何处理

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    //等待结束时间,如果非超时等待,那么为0
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
	//当前线程如果进入等待任务完成队列,此变量记录等待节点
    WaitNode q = null;
    //是否入队(等待任务完成队列)Waiters 使用next组成的队列
    boolean queued = false;
    for (;;) {
        //如果等待的过程中被中断,
        //那么把自己从等待waiters中删除
        //并且抛出中断异常
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        //读取state,volatile保证可见性
        int s = state;
        //如果当前大于COMPLETING 说明任务执行完成 outcome已经赋值了,
        //或者取消了,或者由于取消而被中断 直接返回当前状态,不需要再等了
        if (s > COMPLETING) {
            //节点线程置为null,后续执行任务线程唤醒等待线程的时候不会唤醒到此线程
            if (q != null)
                q.thread = null;
            return s;
        }
        //如果任务正在完成,进行线程让步
        //后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,
        //也许修改状态为NORMAL或者EXCEPTIONAL会导致线程A多yield几下(使用的是UNSAFE.putOrderedInt存在线程可见性问题)
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        //当前线程的节点
        else if (q == null)
            q = new WaitNode();
        //如果没有入队(等待任务完成的队列)那么入队(等待任务完成的队列)
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        //如果是超时等待
        else if (timed) {
            nanos = deadline - System.nanoTime();
			//等待超时 把自己从等待任务完成队列中移除
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            //等待指定时间
            LockSupport.parkNanos(this, nanos);
        }
        else
            //无限期等待
            LockSupport.park(this);
    }
}

此方法分支很多,我们慢慢看

  1. 如果线程执行awaitDone之前就被中断,那么会直接抛出中断异常

  2. 如果线程执行awaitDone并且成功使用LockSupport.parkNanos(this, nanos)或者LockSupport.park(this)挂起,这时候被中断会从这两个方法中返回,继续自旋,Thread.interrupted()为true,会重置中断标识并且抛出中断异常

    1和2其实都是FutureTask#get对于中断的响应,get方法如果被中断是会抛出中断异常并且重置中断标识的

  3. 如果自旋的时候发现任务状态大于COMPLETING

    说明当前任务执行完成了,或者说任务被取消,或者由于取消已经中断了,那么直接返回即可,从这里返回有三种情况第一次自旋发现任务完成了超时等待指定时间结束发现任务完成了,任务完成的时候被任务执行线程唤醒,继续自旋发现任务完成了

  4. 如果自旋的时候发现任务状态等于COMPLETING

    那么调用yield让出cpu,而不是挂起自己,因为后续FutureTask执行的线程写回outcome改变状态为NORMAL或者EXCEPTIONAL是很快的,也许修改状态为NORMAL或者EXCEPTIONAL会导致线程A多yield几下(使用的是UNSAFE.putOrderedInt存在线程可见性问题)(这里就是我们说的为什么COMPLETING不担心可见性问题)

    注意如果超时等待指定时间结束,继续自旋,如果进入此分支,那么让出cpu,再次获得时间片后,继续执行,下一次自旋,而不会进入到下面的超时异常分支,也就是说COMPLETING意味着任务执行完了,但是在做一些善后工作(写入任务返回值,唤醒等待线程)不会由于此状态导致超时

  5. q == null

    意味着当前线程没有被包装成WaitNode,当前线程也没有被中断,任务没有完成也不是Completing状态,这时候调用构造方法然后继续自旋

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
    
  6. !queued

    这是在5的基础上,当前q已经有了节点,但是还没有进入等待任务完成队列,下面通过CAS让当前线程入队

    else if (!queued)
        queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                             q.next = waiters, q);
    

    这里首先q.next = waiters让当前节点的next指向waiters,然后CAS设置waiters为当前节点,也就是说最后入队的节点使用waiters记录,使用next串联每一个等待的线程节点,q.next = waiters不需要考虑线程安全,和AQS中的入队类似,这是改变当前节点的next引用指向,但是修改waiters需要考虑线程安全问题,如果这里CAS失败了(意味着存在其他线程调用get入队等待),那么queued为false 继续自旋尝试CAS自己为waiters

  7. 挂起当前线程

    //超时等待
    else if (timed) {
        //需要等待的时间
        nanos = deadline - System.nanoTime();
        //已经超时
        if (nanos <= 0L) {
            //把自己从waiters等待任务完成队列中移除
            removeWaiter(q);
            return state;
        }
        //挂起指定时间
        LockSupport.parkNanos(this, nanos);
    }
    //等待直到被中断或者唤醒
    else
        LockSupport.park(this);
    

    这里超时部分多一个removeWaiter,将自己从等待任务完成队列中移除,这个方法的执行需要考虑线程安全问题,同样使用自旋+CAS保证线程安全,这里不做过多分析。

4 report 如果任务执行失败抛出异常,如果成功返回执行结果#

private V report(int s) throws ExecutionException {
    Object x = outcome;
    //如果任务正常结束
    if (s == NORMAL)
        //强转
        return (V)x;
    //如果任务取消了 或者由于取消被中断了,抛出取消异常
    if (s >= CANCELLED)
        throw new CancellationException();
    //反之抛出ExecutionException 包装 原始的异常
    throw new ExecutionException((Throwable)x);
}

只有任务正常执行的时候,才会返回结果,如果被取消那么抛出取消异常。

5 取消任务#

取消有一个比较有趣的点,如果取消在任务开始之前,那么说明取消成功,后续任务完成调用set或者setException应该是什么都不做。如果取消在任务执行之后,那么取消的这个动作应该失败,下面我们看下doug lea如果处理这个细节。

//mayInterruptIfRunning 表示需要中断任务执行线程
public boolean cancel(boolean mayInterruptIfRunning) {
    //任务不是初始,或者CAS修改状态从new 到INTERRUPTING 或者CANCELLED 失败
    //直接返回false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        //如果需要中断
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                //执行中断
                if (t != null)
                    t.interrupt();
            } finally { // final state
                //修改状态为INTERRUPTED
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        //唤醒所有等待任务执行的线程
        finishCompletion();
    }
    return true;
}
  1. 第一个if

    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    
    • 如果 state == NEW不成立,说明任务在执行此判断之前已经结束了(Completing,或者已经到了NORMAL,或者EXCEPTIONAL)说明取消在任务结束之前,那么直接返回false。或者说当前线程A对任务的取消在其他线程B取消任务之后,这时候就A线程取消方法返回false

    • 如果UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))不成立

      意味着state == NEW判读的时候成立,但是执行这句CAS的时候之前有老六线程抢先一步,或者说存在并发当前线程没有抢过,那么也直接返回false,这里保证了cancel的执行是串行的,不存在线程安全问题。注意这里如果需要中断任务执行线程那么CAS修改状态到INTERRUPTING ,反之直接修改到CANCELLED

  2. 尝试中断任务执行线程

    if (mayInterruptIfRunning) {
        try {
            Thread t = runner;
            if (t != null)
                t.interrupt();
        } finally { // final state
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
        }
    }
    

    调用interrupt具体如何响应中断,得看程序员定义的业务逻辑是啥样的。调用putOrderedInt修改状态为INTERRUPTED,表示已经完成了中断

  3. 唤醒等待任务结束的线程

    直接调用finishCompletion,这个方法前面分析过。这里假如线程A取消了任务,那么线程B任务执行完后调用set或者setException 会如何昵——什么都不做

    protected void set(V v) {
        //此时state 是INTERRUPTING 或者INTERRUPTED if为false
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
                outcome = v;
                UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
                finishCompletion();
            }
        }
    
  4. run方法对中断的处理

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            //省略任务的执行
        } finally {
            runner = null;
            int s = state;
            //进入这个if 必须是INTERRUPTING,或者INTERRUPTED
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    private void handlePossibleCancellationInterrupt(int s) {
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); 
    }
    

这里为INTERRUPTING ,可能是取消任务的线程还没来得及执行UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED),也可能是可见性导致运行任务的线程没有读取到最新的state,handlePossibleCancellationInterrupt会让运行任务的线程等待

七丶问题解答#

  • get方法调用的时候,如果任务没有成功,怎么实现调用线程的阻塞

    等待的线程修改Waiter next指向 CAS自己为waiters,线程安全的入队,并进行park 挂起实现阻塞

  • get方法出现异常,或者正常结束的时候,怎么唤醒调用get方法的线程

    在finishCompletion中进行唤醒,如果任务没有被取消,运行完,那么运行任务的线程进行唤醒,如果任务被取消那么cancel任务的线程进行唤醒

  • 在任务运行之前,如果任务被取消那么将无法运行,已经开始运行的任务只能尝试中断运行任务的线程,无法取消,那么doug lea 怎么判断运行和取消的先后顺序(注意运行和取消可以是多个线程调用)怎么处理这里的线程不安全问题

    使用CAS,运行前保证为new,取消的时候也要保证为new,且运行完成修改new使用cas,取消也是使用cas,保证线程安全


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK