4

【JUC】可回调任务FutureTask原理

 2 years ago
source link: https://segmentfault.com/a/1190000041364381
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

【JUC】可回调任务FutureTask原理

发布于 2 月 4 日

上一篇观察ThreadPoolExecutor的submit方法的时候,发现了它是靠FutureTask实现结果回调的:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    // ## 声明一个可回调任务,本质是一个FutureTask
    RunnableFuture<T> ftask = newTaskFor(task);
    // 线程池篇分析过
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

一、FutureTask使用样例

// 1.声明一个可回调任务
FutureTask<String> task = new FutureTask(()->"hello world");
Thread threadA = new Thread(task);
threadA.start();

// 2.阻塞方式获取任务执行结果:threadA未执行完,当前线程TreadB会阻塞于此
System.out.println(task.get());

image.png

FutureTask实现了RunnableFuture接口,而RunnableFuture=Runnable接口+Future接口

  1. 线程A执行start方法时,会调用FutureTask的run()方法(Runnable接口)
    run()方法会触发FutureTask内部的state状态变更,并调用Callable的call()方法
  2. 此时线程B以及其它线程调用FutureTask的get()方法(Future接口),这些线程会阻塞等待run()方法完成
    源码层面会构建一个名为waiters的单项链表,以LockSupport.part的形式将线程阻塞在节点上
  3. call()方法执行完成,state状态最终变更为NORMAL,同时释放阻塞线程
// ## 状态
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

// 结果返回接口
private Callable<V> callable;
// 线程执行方法的返回结果
private Object outcome; // non-volatile, protected by state reads/writes
// 正在执行callable接口发的线程
private volatile Thread runner;
// 等待节点
private volatile WaitNode waiters;

二、run()

public void run() {
    if (state != NEW 
        // runner变量赋值
        || !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
        
    try {
        Callable<V> c = callable;
        // NEW状态下执行
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 调用Callable的call方法,获取返回值
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                // == call方法执行成功,设置结果
                set(result);
        }
    } 
}
protected void set(V v) {
    // 状态变更NEW->COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 执行结果赋值给outcome
        outcome = v;
        // 状态变更COMPLETING->NORMAL,表示执行完成
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        // == 释放等待队列中的阻塞线程
        finishCompletion();
    }
}
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        // cas方式将waiters变量设置为null
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // ## 遍历队列(单向链表)中的WaitNode节点,释放全部的等待线程
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }

    // 提供的监听方法,需用户自定义实现
    done();

    callable = null;        // to reduce footprint
}

三、get()

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // NEW和COMPLETING状态触发等待
    if (s <= COMPLETING)
        // == 等待完成
        s = awaitDone(false, 0L);
    return report(s);
}


private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        // -- 检查状态,如果此时已变成NORMAL则无需等待
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // -- 检查状态,如果此时是COMPLETING,切换其它线程执行
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // -- 新建等待节点
        else if (q == null)
            q = new WaitNode();
        // -- waiters变量赋值
        else if (!queued)
            // ## 头插
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // -- 有超时设置情况
        else if (timed) {
            nanos = deadline - System.nanoTime();
            // ## 如果已超时,移除节点
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            // ## 如果未超时,阻塞指定时间
            LockSupport.parkNanos(this, nanos);
        }
        // -- 线程阻塞
        else
            LockSupport.park(this);
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK