3

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关...

 1 year ago
source link: https://blog.51cto.com/alex4dream/7112189
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

FutureTask的基本介绍

FutureTask是Java中的一个类,它实现了Future接口和Runnable接口,并且被用作线程执行的任务。FutureTask可以在多线程环境下异步执行一个任务并获取其结果。

FutureTask的特点用法

  1. 异步执行:通过将耗时的任务交给FutureTask,在一个单独的线程中执行,当前线程可以继续执行其他任务,不会被阻塞,从而提高程序的并发性。
  2. 获取结果:FutureTask提供了get()方法,用于在任务完成后获取其执行结果。如果任务尚未完成,get()方法将阻塞当前线程,直到任务完成并返回结果。
  3. 取消任务:可以使用cancel()方法取消任务的执行。如果任务尚未开始执行,则会取消任务的执行;如果任务已经开始执行,则根据传入的参数决定是否中断正在执行的任务。
  4. 线程安全:FutureTask是线程安全的,可以在多个线程中共享使用。多个线程可以同时调用FutureTask的get()方法等待任务结果。

FutureTask常见的应用场景是在并发编程中,将计算密集型的任务交给FutureTask后台执行,并在需要时获取结果。这样可以充分利用系统资源,提高程序的性能和响应能力。

FutureTask的执行流程

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_异常处理

在之前的线程池分析中,我们介绍了AbstractExecutorService的实现。AbstractExecutorService是Java中的一个抽象类,它实现了ExecutorService接口,并提供了一些通用的逻辑和方法来简化线程池的实现。

AbstractExecutorService的主要特点和用法
  1. 线程池管理:AbstractExecutorService提供了默认的线程池管理逻辑。我们可以通过继承AbstractExecutorService并实现其中的抽象方法来定制自己的线程池。
  2. 任务执行:通过调用submit()或invokeAll()等方法,可以将任务提交给AbstractExecutorService来执行。它会在适当的时候自动创建线程并执行任务。
  3. 异常处理:AbstractExecutorService提供了一些默认的异常处理机制,比如可以捕获任务执行过程中抛出的异常,并进行相应的处理操作。我们也可以在继承AbstractExecutorService时自定义异常处理逻辑。
  4. 生命周期管理:AbstractExecutorService定义了线程池的生命周期方法,如shutdown()和isShutdown()等,用于管理线程池的启动和关闭。
对象模型转换传递流程
【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_Java_02
submit方法提交任务

AbstractExecutorService主要用于实现ExecutorService接口中的submit()和invokeAll()等方法。它提供了一些默认的实现,使得我们可以更方便地创建和管理线程池,并执行任务。

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
RunnableFuture任务模型

通过使用RunnableFuture,我们可以更加灵活地管理线程池,处理任务的提交和执行,并在需要时进行异常处理和线程池的关闭。最终进行构建一个FutureTask对象。

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
FutureTask

对于通过submit方法提交的任务,无论是Runnable还是Callable接口实现,都会被统一封装为FutureTask对象,并传递给execute方法。

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // 确保 callable 的可见性
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;      // 确保可调用的可见性
}

注意,AbstractExecutorService是一个抽象类,不能直接实例化。我们可以继承AbstractExecutorService类并实现其中的抽象方法,或者使用Java提供的具体实现类如ThreadPoolExecutor来创建并使用线程池

RunnableAdapter适配器模型

RunnableAdapter的作用是将一个Runnable对象包装成一个Callable对象。在适配器的call()方法中,会调用任务的run()方法来执行任务,并返回预先指定的结果。

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
 }

通过使用RunnableAdapter,我们可以将Runnable任务在提交给ExecutorService时,以Callable的方式进行处理和执行。

static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    
    public T call() {
        task.run();
        return result;
    }
}

该适配器的设计使得我们可以更加方便地将Runnable任务与Callable任务一同处理,充分利用ExecutorService的统一接口,并且无需对Runnable任务进行额外的修改。

FutureTask的状态定义,说明了FutureTask对象可能处于的不同状态。初始状态为NEW,后续可能会演化为COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED等状态。

private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_线程池_03

这些状态描述了FutureTask在执行过程中的不同情况,包括正常完成、遇到异常、被取消、被中断等。了解FutureTask的状态有助于我们理解和处理FutureTask对象在多线程环境下的行为。

首先,方法会检查FutureTask的状态是否为NEW,并尝试使用compareAndSwapObject方法将执行线程设置为当前线程。然后,方法将执行并处理相关任务,并根据任务是否成功完成设置相应的结果。最后,清除执行线程(runner)并重新获取FutureTask的状态,根据状态处理可能的取消操作。

public void run() {
    if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) {
        return;
    }
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran) {
                set(result);
            }
        }
    } finally {
        // 让runner非空直到state被稳定设置,以防止并发调用run()
        runner = null;
        // 在将runner设置为空之后必须重新读取state以防止泄漏中断
        int s = state;
        if (s >= INTERRUPTING) {
            handlePossibleCancellationInterrupt(s);
        }
    }
}
【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_异常处理_04

设置线程任务数据结果

对于Callable对象,FutureTask会直接执行它的call方法。如果call方法执行成功,FutureTask会调用set方法来设置执行结果;如果遇到异常,FutureTask会调用setException方法来设置异常。

protected void set(V v) {
    // 首先使用CAS(比较并交换)将状态设置为中间状态COMPLETING
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        // 将状态设置为正常状态
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最终状态
        finishCompletion();
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        // 将状态设置为异常状态
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // 最终状态
        finishCompletion();
    }
}

下面是对于Callable直接执行其call方法的处理过程。

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_异常处理_05
  1. 通过CAS操作将FutureTask的状态设置为中间状态COMPLETING。
  2. 根据执行的结果(成功或异常),将结果或异常存储在outcome变量中,并将状态设置为相应的最终状态(正常状态或异常状态)。
  3. 完成执行并处理相关的完成操作。

获取线程任务数据结果

下面的这两个方法都是用来对全局变量outcome进行赋值的。而当我们通过get方法在另一个线程中获取结果时,将会执行以下过程:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

如果任务尚未完成,则会等待任务完成:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    // 使用for循环阻塞当前线程
    for (;;) {
        // 响应中断
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        int s = state;
        // 任务已经完成或者已经抛出异常,直接返回
        if (s > COMPLETING) {
            // WaitNode已经创建,当前可以设置为null了
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // 不能超时
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

下面是我们通过get方法在另一个线程中获取结果的过程。

【深度挖掘Java并发编程底层源码】「底层技术原理体系」带你零基础认识和分析学习相关的异步任务提交机制FutureTask的底层原理_异常处理_06
  1. 首先,通过调用get方法获取FutureTask的状态值。
  2. 如果状态值小于或等于COMPLETING,则调用awaitDone方法来等待任务的完成。
  3. 在awaitDone方法中,根据是否设置了超时时间计算出等待的截止时间,并使用for循环来阻塞当前线程。
  4. 在循环中,对中断请求进行响应,并检查任务的状态。
  5. 如果设置了超时时间,则根据剩余的等待时间使用LockSupport.parkNanos进行阻塞。
  6. 如果没有设置超时时间,则使用LockSupport.park进行阻塞。
  7. 循环会一直执行,直到任务完成或超时,才会返回任务的状态值。
  8. 最后,通过调用report方法来返回相应的结果。总之,该过程描述了在另一个线程中通过get方法等待任务完成的过程。它通过阻塞当前线程并监测任务状态来实现等待,直到任务完成后才返回结果。

上报线程任务数据结果

如果任务已完成或者等待任务直到完成后,将会调用report方法来返回结果, report 方法,发现状态为异常的话将包装成 ExecutionException((Throwable)x); 这个异常就是我们在使用 get 的时候需要捕获的异常。

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V) x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable) x);
}

如果状态s等于NORMAL,表示任务正常完成,将会返回实际的结果。如果状态s大于等于CANCELLED,则抛出CancellationException异常。否则,抛出ExecutionException异常。通过这样的设计,无论是任务正常完成还是抛出异常,都能够在线程池中执行的任务中被感知到。

通过调用cancel方法可以取消FutureTask的执行,并在任务完成后通过finishCompletion方法来唤醒等待的线程。被唤醒的线程在awaitDone方法中继续循环,检查任务的状态以获取结果。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果任务不是刚创建或者是刚创建但是更改为指定状态失败则返回 false
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  1. FutureTask类中有一个cancel方法,接受一个布尔类型参数mayInterruptIfRunning,用于表示在执行中的任务是否可以中断。
  2. cancel方法首先检查任务的状态,如果状态不是NEW或者尝试将状态更改为INTERRUPTING或CANCELLED失败,则返回false。
  3. 如果允许中断正在运行的任务(mayInterruptIfRunning为true),则调用Thread的interrupt方法来中断任务的线程,并将任务的状态设置为INTERRUPTED。
  4. 然后,调用finishCompletion方法来完成任务的完成操作。
  5. 在finishCompletion方法中,首先尝试将waiters设置为null,以便其他线程无法再加入等待队列。然后循环遍历等待队列,唤醒等待的线程,并将等待队列中节点的thread属性设置为null。
  6. 在唤醒线程后,被唤醒的线程会在awaitDone方法中继续循环,检查任务的状态。
  7. 如果任务的状态大于COMPLETING,则表示任务已完成或抛出异常,直接返回状态。
  • FutureTask是一个用于异步执行任务并获取结果的Java类。它提供了获取结果、取消任务等功能,帮助我们更好地处理多线程编程中的任务执行和结果获取。
  • AbstractExecutorService是Java中用于简化线程池实现的一个抽象类。通过继承并实现其中的方法,我们可以更方便地管理线程池的创建、任务的提交和执行。它提供了默认的异常处理和生命周期管理,并可以根据需要进行定制和扩展。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK