6

JAVA语言异步非阻塞设计模式(原理篇)

 3 years ago
source link: https://segmentfault.com/a/1190000040347139
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

JAVA语言异步非阻塞设计模式(原理篇)

封面2.png

本系列文章共2篇,对 Java 语言的异步非阻塞模式进行科普。《原理篇》讲解异步非阻塞模型的原理,以及核心设计模式“Promise”的基本特性。《应用篇》会展示更加丰富的应用场景,介绍 Promise 的变体,如异常处理、调度策略等,并将 Promise 和现有工具进行对比。

限于个人水平和篇幅,本系列以科普为主,内容更偏重于原理、API 设计、应用实践,但是不会深入讲解并发优化的具体细节。

异步非阻塞[A]是一种高性能的线程模型,在 IO 密集型系统中得到广泛应用。在该模型下,系统发起耗时请求后不需要等待响应,期间可以执行其他操作;当收到响应后,系统收到通知并执行后续处理。由于消除了不必要的等待,这种模型能够充分利用 cpu、线程等资源,提高资源利用率。

然而,异步非阻塞模式在提升性能的同时,也带来了编码实现上的复杂性。请求和响应可能分离到不同线程中,需要编写额外代码完成响应结果的传递。Promise 设计模式可以降低这种复杂性,封装数据传递、时序控制、线程安全等实现细节,从而提供简洁的 API 形式。

本文首先介绍异步非阻塞模式,从线程模型的角度分析阻塞和非阻塞模式的区别。之后介绍 Promise 设计模式的应用场景及工作流程。最后,提供一种简易的 Java 实现,能够实现基本的功能需求,并做到线程安全。

在正式探索技术问题之前,我们先来看看什么是异步非阻塞模型。如图1-1所示,展示了两个小人通信的场景:

  1. 两个小人代表互相通信的两个线程,如数据库的客户端和服务端;他们可以部署在不同的机器上。
  2. 小人之间互相投递苹果,代表要传递的消息。根据具体业务场景,这些消息可能会称为 request、response、packet、document、record 等。
  3. 小人之间需要建立信道,消息才得以传递。根据场景,信道称为 channel、connection 等。

假设左侧小人发起请求,而右侧小人处理请求并发送响应:左侧小人先投出一个苹果request,被右侧小人接收到;右侧小人进行处理后,再投出苹果 response,被左侧小人接收到。我们考察左侧小人在等待响应期间的行为,根据他在等待 response 期间是否能处理其他工作,将其归纳为“同步阻塞”和“异步非阻塞”两种模式。

1-1.png
图1-1 两个小人通信

首先我们看看同步阻塞式通信的流程,如图1-2a所示。

  1. 投递。左侧小人投递 request,并等待接收 response。
  2. 等待。在等待接收 response 期间,左侧小人休息。不论是否还有其他 request需要投递、是否还有其他工作需要处理,他都视若无睹,绝对不会因此打断休息。
  3. 响应。在收到 response 后,小人从休息中唤醒并处理 response。

1-2.png
图1-2a 同步阻塞式通信

接下来我们看看异步非阻塞式通信的流程,如图1-2b所示。

  1. 缓存。左侧小人投递 request,并等待接收 response。和同步阻塞模式不同,小人并不需要亲手接住苹果 response,而是在地上放置一个盘子称为“buffer”;如果小人暂时不在场,那么所收到的苹果可以先存在盘子里,稍后再处理。
  2. 暂离。由于有盘子 buffer 的存在,小人投递 request 后就可以暂时离开,去处理其他工作,当然也可以去投递下一个 request;如果需要向不同的channel投递request,那么小人可以多摆放几个盘子,和 channel 一一对应。
  3. 响应。小人离开后,一旦某个盘子收到了 response,一只“大喇叭”就会响起,发出“channelRead”通知,呼唤小人回来处理 response 。如果要处理多个response 或多个 channel,那么 channelRead 通知还需要携带参数,以说明从哪个 channel 上收到了哪个 response。

这里的大喇叭可以用 NIO 或 AIO 来实现。简单来说,NIO 是指不停地轮询每个盘子,一旦看到苹果就发出通知;AIO 是指在收到苹果时直接触发通知,而没有轮询的过程。当然,本系列文章的读者并不需要了解更多实现细节,只需知道异步非阻塞模式依赖于“大喇叭”来实现,它替代小人等待接收 response,从而解放小人去处理其他工作。

1-2b.png
图1-2b 异步非阻塞式通信

根据上面的分析,同步模式具有下列严重缺点

  1. 同步阻塞模式的工作效率十分低下。小人大部分时间都在休息,仅当投递请求、处理响应时,才偶尔醒来工作一小会;而在异步非阻塞模式下,小人从不休息,马不停蹄地投递请求、处理响应,或处理其他工作。
  2. 同步阻塞模式会带来延迟

我们考虑下面两种情况,如图1-3所示。

  • channel 复用,即左侧小人在一个 channel 上连续发送多条消息。在同步阻塞模式下,一轮(即请求+响应)只能投递一个请求(苹果1),而后续请求(苹果2-4)都只能排队等待,右侧小人需要等待很多轮才能收到所期望的全部消息。此外,左侧小人在等待接收某个 response 期间,没有机会处理收到的其他消息,造成了数据处理的延迟。不得不感慨,左侧小人太懒惰了!
  • 线程复用,即一个线程(小人)向多个 channel 发送消息(苹果1-3,分别发向不同 channel)。左侧小人同一时刻只能做一件事,要么在工作,要么在休息;他投递了苹果1后就躺下休息,等待响应,全然不顾右侧小人2、3还在等待他们想要的苹果2、3。

1-3a.png
图1-3a channel复用

1-3b.png
图1-3b 线程复用

在这一章里我们用漫画的形式,初步体验了同步阻塞模式与异步非阻塞模式,并分析了两种模式的区别。接下来我们从Java线程入手,对两种模式进行更加正式、更加贴近实际的分析。

2.异步非阻塞模型

2.1 Java 线程状态

在 Java 程序中,线程是调度执行的单元。线程可以获得 CPU 使用权来执行代码,从而完成有意义的工作。工作进行期间,有时会因为等待获取锁、等待网络 IO 等原因而暂停,通称“同步”或“阻塞”;如果多项工作能够同时进行,之间不存在约束、不需要互相等待,这种情况就称为“异步”或“非阻塞”。
受限于内存、系统线程数、上下文切换开销,Java 程序并不能无限创建线程;因此,我们只能创建有限个线程,并尽量提高线程的利用率,即增加其工作时长、降低阻塞时长。异步非阻塞模型是减少阻塞、提高线程利用率的有效手段。当然,这种模型并不能消除所有的阻塞。我们首先来看看 Java 线程有哪些状态,其中哪些阻塞是必要的,哪些阻塞可以避免。

Java 线程状态包括:

  • RUNNABLE:线程在执行有意义的工作
    如图2-1a,线程如果在执行纯内存运算,那么处于RUNNABLE状态
    根据是否获得cpu使用权,又分为两个子状态:READY、RUNNING
  • BLOCKED/WAITING/TIMED_WAITING:线程正在阻塞
    如图2-1b、2-1c、2-1d,根据阻塞原因,线程处于下列状态之一
    BLOCKED:synchronized等待获取锁
    WAITING/TIMED_WAITING:Lock等待获取锁。两种状态的区别为是否设置超时时长

图2-1.png
图2-1 Java 线程状态

此外,如果 Java 线程正在进行网络 IO,则线程状态为 RUNNABLE,但是实际上也发生了阻塞。以 socket 编程为例,如图2-2所示,在收到数据之前InputStream.read() 会阻塞,此时线程状态为RUNNABLE。

图2-2.png
图2-2 网络IO

综上,Java 线程状态包括:RUNNABLE、BLOCKED、WAITING、TIMED_WAITING。其中,RUNNABLE 状态又分为内存计算(非阻塞)、网络IO(阻塞)两种情况,而其余状态都是阻塞的。
根据阻塞原因,本文将 Java 线程状态归纳为以下3类:RUNNABLE、IO、BLOCKED

  1. RUNNABLE:Java 线程状态为 RUNNABLE,并且在执行有用的内存计算,无阻塞
  2. IO:Java线程状态为RUNNABLE,但是正在进行网络IO,发生阻塞
  3. BLOCKED:Java线程状态为BLOCKED/WAITING/TIMED_WAITING,在并发工具的控制下,线程等待获取某一种锁,发生阻塞

要提高线程利用率,就要增加线程处于 RUNNABLE 状态的时长,降低处于 IO 和BLOCKED状态的时长。BLOCKED 状态一般是不可避免的,因为线程间需要通信,需要对临界区进行并发控制;但是,如果采用适当的线程模型,那么 IO 状态的时长就可以得到降低,而这就是异步非阻塞模型。

2.2 线程模型:阻塞 vs 非阻塞

异步非阻塞模型能够降低 IO 阻塞时长,提高线程利用率。下面以数据库访问为例,分析同步和异步 API 的线程模型。如图3所示,过程中涉及3个函数:

  1. writeSync()或writeAsync():数据库访问,发送请求
  2. process(result):处理服务器响应(以 result 表示)
  3. doOtherThings():任意其他操作,逻辑上不依赖服务器响应

同步 API 如图3-a 所示:调用者首先发送请求,然后在网络连接上等待来自服务器的响应数据。API 会一直阻塞,直至收到响应才返回;期间调用者线程无法执行其他操作,即使该操作并不依赖服务器响应。实际的执行顺序为:

  1. writeSync()
  2. process(result)
  3. doOtherThings() // 直至收到结果,当前线程才能执行其他操作

异步 API 如图2-3b所示:调用者发送请求并注册回调,然后 API 立刻返回,接下来调用者可以执行任意操作。稍后底层网络连接收到响应数据,触发调用者所注册的回调。实际的执行顺序为:

  1. writeAsync()
  2. doOtherThings() // 已经可以执行其他操作,并不需要等待响应
  3. process(result)

图2-3.png
图2-3 同步API & 异步API

在上述过程中,函数 doOtherThings() 并不依赖服务器响应,原则上可以和数据库访问同时执行。然而对于同步 API,调用者被迫等待服务器响应,然后才可以执行 doOtherThings();即数据库访问期间线程阻塞于 IO 状态,无法执行其他有用的操作,利用率十分低下。而异步 API 就没有这个限制,显得更加紧凑、高效。

在 IO 密集型系统中,适当使用异步非阻塞模型,可以提升数据库访问吞吐量。考虑这样一个场景:需要执行多条数据库访问请求,且请求之间互相独立,无依赖关系。使用同步 API 和异步 API,线程状态随时间变化的过程如图2-4所示。
线程交替处于 RUNNABLE 和 IO 状态。在 RUNNABLE 状态下,线程执行内存计算,如提交请求、处理响应。在 IO 状态下,线程在网络连接上等待响应数据。在实际系统中,内存计算的速度非常快,RUNNABLE 状态的时长基本可忽略;而网络传输的耗时会相对更长(几十到几百毫秒),IO 状态的时长更加可观。

a.同步 API:调用者线程一次只能提交一个请求;直到请求返回后,才能再提交下一个请求。线程利用率很低,大部分时间消耗在 IO 状态上。

b.异步 API:调用者线程可以连续提交多个请求,而之前提交的请求都还没有收到响应。调用者线程会注册一些回调,这些回调存储在内存中;稍后网络连接上收到响应数据,某个接收线程被通知处理响应数据,从内存中取出所注册的回调,并触发回调。这种模型下,请求可以连续地提交、连续的响应,从而节约 IO 状态的耗时。

图2-4.png
图2-4 线程时间线:数据库访问

异步非阻塞模式在IO密集型系统中应用非常广泛。常用的中间件,如 http 请求[D]、redis[E]、mongo DB[F]、elasticsearch[G]、influx DB[H],都支持异步 API。各位读者可以在参考文献中,查阅这些异步 API的样例代码。关于中间件的异步 API,下面有几个注意事项:

  1. redis 的常见客户端有 jedis 和 lettuce [E]。其中 lettuce 提供了异步 API,而 jedis 只能提供同步 API;二者对比参见文章[I]。
  2. kafka producer[J]的send()方法也支持异步API,但是该API实际上不是纯异步的[K]:当底层缓存满,或者无法获取服务器(broker)信息时,send()方法会发生阻塞。个人认为这是一个非常严重的设计缺陷。kafka 常用于低延迟日志采集场景,系统会将日志通过网络写入到 kafka 服务器,以减少线程内的阻塞,提升线程吞吐量;稍后其他进程会从 kafka 消费所写入的日志,进行持久存储。设想一个实时通信系统,单条线程每秒需要处理几万到几十万条消息,响应时间一般为几毫秒到几十毫秒。系统在处理期间需要经常调用 send() 来上报日志,如果每次调用都发生哪怕1秒的延迟(实际有可能达几十秒),延迟积累起来也会严重劣化吞吐量和延迟。

最后,异步 API 有多种实现,包括线程池、select(如netty 4.x[L])、epoll 等。其共同点是调用者不需要在某一条网络连接上阻塞,以等待接收数据;相反,API 底层常驻有限数目的线程,当收到数据后,某一线程得到通知并触发回调。这种模型也称为“响应式”模型,非常贴切。限于篇幅原因,本文主要关注异步 API 设计,而不深入讲解异步 API 的实现原理。

3.Promise设计模式

3.1 API形式:同步、异步 listener、异步 Promise

上一章介绍了异步非阻塞模式和异步 API 的函数形式。异步 API 具有以下特征:

  1. 在提交请求时注册回调;
  2. 提交请求后,函数立刻返回,不需要等待收到响应;
  3. 收到响应后,触发所注册的回调;根据底层实现,可以利用有限数目的线程来接收响应数据,并在这些线程中执行回调。

在保留异步特性的基础上,异步 API 的形式可以进一步优化。上一章图2-3b展示了异步 API 的 listener 版本,特点是在提交请求时必须注册恰好一个回调;因而在下列场景下,listener API 会难以满足功能需求,需要调用者做进一步处理:

  1. 多个对象都关注响应数据,即需要注册多个回调;但是 listener 只支持注册一个回调。
  2. 需要将异步调用转为同步调用。例如某些框架(如spring)需要同步返回,或者我们希望主线程阻塞直至操作完成,然后主线程结束、进程退出;但是 listener 只支持纯异步,调用者需要重复编写异步转同步的代码。

为了应对上述场景,我们可以使用 Promise 设计模式来重构异步 API,以支持多个回调和同步调用。下面对同步 API、异步listener API、异步Promise API 的函数形式进行对比,如图3-1所示:

  • a.同步:调用 writeSync() 方法并阻塞;收到响应后函数停止阻塞,并返回响应数据
  • b.异步listener:调用 writeAsync() 方法并注册 listener,函数立刻返回;收到响应后,在其他线程触发所注册的 listener;
  • c.异步Promise:调用 writeAsync(),但不需要在函数中注册 listener,函数立刻返回 Promise 对象。调用者可以调用异步的 Promise.await(listener),注册任意数目的 listener,收到响应后会按顺序触发;此外,也可以调用同步的 Promise.await(),阻塞直至收到响应。

图3-1.png
图3-1 API形式:同步、异步listener、异步Promise

综上,Promise API 在保持异步特性的前提下,提供了更高的灵活性。调用者可以自由选择函数是否阻塞,以及注册任意数目的回调。

3.2 Promise的特性与实现

上一节介绍了 Promise API 的使用样例,其核心是一个 Promise 对象,支持注册 listener,以及同步获取响应 result;而本节将对 Promise 的功能进行更加详细的定义。注意,本节并不限定 Promise 的某一具体实现(例:jdk CompletableFuture、netty DefaultPromise),只展示共有的、必须具备的特性;缺少这些特性,Promise 将无法完成异步传递响应数据的工作。

3.2.1 功能特性

  • Promise的基本方法

Promise的基本功能是传递响应数据,需要支持下列方法,如表3-1所示:

表格.jpg

下面以上一小节的数据库访问 API 为例,演示 Promise 的工作流程,如图3-2所示:

  • a.调用者调用 writeAsync() API,提交数据库访问请求并获取 Promise 对象;然后调用 Promise.await(listener),注册对响应数据的 listener。Promise 对象也可以传递给程序中其他地方,使得关心响应数据的其他代码,各自注册更多listener。
  • b.writeAsync()内部,创建 Promise 对象并和这次请求关联起来,假设以requestId 标识。
  • c.writeAsync()底层常驻有限数目的线程,用于发送请求和接收响应。以 netty为例,当从网络上收到响应据后,其中一个线程得到通知,执行 channelRead() 函数进行处理;函数取出响应数据和对应的 Promise 对象,并调用Promise.signalAll() 进行通知。注意这里是伪代码,和 netty 中回调函数的实际签名略有区别。

图3-2a.png
图3-2a 提交数据库访问请求

图3-2b.png
图3-2b 创建 Promise 对象

图3-2c.png
图3-2c 通知 Promise 对象

- Promise 的时序

Promise 的方法需要保证以下时序。此处以“A对B可见”来描述时序,即:如果先执行操作A(注册 listener )就会产生某种永久效应(永久记录这个 listener ),之后再执行操作B(通知 result )就必须考虑到这种效应,执行相应的处理(触发之前记录的listener)。

  1. await(listener) 对 signalAll(result) 可见:注册若干 listener 后,通知result 时必须触发每一个 listener,不允许遗漏。
  2. signalAll(result) 对 await(listener) 可见:通知 result 后,再注册listener 就会立刻触发。
  3. 首次 signalAll(result) 对后续 signalAll(result) 可见。首次通知 result后,result 即唯一确定,永不改变。之后再通知 result 就会忽略,不产生任何副作用。请求超时是该特性一种典型应用:在提交请求的同时创建一个定时任务;如果能在超时时长内正确收到响应数据,则通知 Promise 正常结束;否则定时任务超时,通知Promise 异常结束。不论上述事件哪个先发生,都保证只采纳首次通知,使得请求结果唯一确定。

此外,某次 await(listener) 最好对后续 await(listener) 可见,以保证listener 严格按照注册顺序来触发。

- Promise 的非线程安全实现

如不考虑线程安全,那么下列代码清单可以实现 Promise 的基本特性;线程安全的实现见下一小节。代码清单依次展示了 await(listener): void、signalAll(result)、await(): result 的实现。这里有几个注意事项

  1. 字段 listeners 存储 await(listener) 所注册的 listener。字段类型为LinkedList,以存储任意数目的 listener,同时维护 listener 的触发顺序。
  2. 字段 isSignaled 记录是否通知过 result。如果 isSignaled=true,则后续调用 await(listener) 时立刻触发 listener ,且后续调用 signalAll(result) 时直接忽略。此外,我们以 isSignaled=true 而不是 result=null 来判断是否通知过 result,因为某些情况下 null 本身也可以作为响应数据。例如,我们以Promise<Exception>表示数据库写入的结果,通知 null 表示写入成功,通知Exception 对象(或某一子类)表示失败原因。
  3. signalAll(T result)在末尾处调用 listeners.clear() 以释放内存,因为listeners 已经触发过,不再需要在内存中存储。
public class Promise<T> {

    private boolean isSignaled = false;
    private T result;

    private final List<Consumer<T>> listeners = new LinkedList<>();

    public void await(Consumer<T> listener) {
        if (isSignaled) {
            listener.accept(result);
            return;
        }

        listeners.add(listener);
    }

    public void signalAll(T result) {
        if (isSignaled) {
            return;
        }

        this.result = result;
        isSignaled = true;
        for (Consumer<T> listener : listeners) {
            listener.accept(result);
        }
        listeners.clear();
    }

    public T await() {
        // 适当阻塞,直至signalAll()被调用;实际实现见3.3节
        return result;
    }
}

3.2.2 线程安全特性

上一章3.2.1节讲解了 Promise 的功能,并提供了非线程安全的实现。本节展示如何使用并发工具,实现线程安全的 Promise,如下所示。有下列几个注意事项:

  1. 线程安全。各个字段均被多个线程访问,因此都属于临界区,需要使用适当的线程安全工具进行上锁,如 synchronized、Lock。一种最简单的实现,是将全部代码纳入临界区内,进入方法时上锁,离开方法时放锁。注意在使用 return 进行提前返回时,不要忘记放锁。
  2. 在临界区外触发 listener ,以减少在临界区内停留的时长,并减少潜在的死锁风险。
  3. 同步 await()。可以使用任何一种同步等待的工具来实现,如 CountDownLatch、Condition 。此处使用 Condition 实现,注意根据 java 语法,操作 Condition 时必须先获取 Condition 所关联的锁。
public class Promise<T> {

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition resultCondition = lock.newCondition();

    private boolean isSignaled = false;
    private T result;

    private final List<Consumer<T>> listeners = new LinkedList<>();

    public void await(Consumer<T> listener) {
        lock.lock();
        if (isSignaled) {
            lock.unlock(); // 不要忘记放锁
            listener.accept(result); // 在临界区外触发listener
            return;
        }

        listeners.add(listener);
        lock.unlock();
    }

    public void signalAll(T result) {
        lock.lock();
        if (isSignaled) {
            lock.unlock(); // 不要忘记放锁
            return;
        }

        this.result = result;
        isSignaled = true;

        // this.listeners的副本
        List<Consumer<T>> listeners = new ArrayList<>(this.listeners);
        this.listeners.clear();
        lock.unlock();

        for (Consumer<T> listener : listeners) {
            listener.accept(result); // 在临界区外触发listener
        }

/* 操作Condition时须上锁*/
        lock.lock();
        resultCondition.signalAll();
        lock.unlock();
    }

    public T await() {
        lock.lock();
        if (isSignaled) {
            lock.unlock(); // 不要忘记放锁
            return result;
        }

        while (!isSignaled) {
            resultCondition.awaitUninterruptibly();
        }
        lock.unlock();

        return result;
    }
}

上述实现仅做演示使用,仍有较大的改进空间。生产环境的实现原理,读者可以参考jdk CompletableFutre、netty DefaultPromise。可以改进的地方包括:

  1. 使用 CAS 设置响应数据。字段 isSignaled、result 可以合并为一个数据对象,然后使用 CAS 进行设值,从而进一步降低阻塞时长。
  2. 触发 listener 的时序。在上述代码中,Promise.signalAll() 会依次触发listener;在此期间,如果其他线程调用了异步 await(listener),由于 Promise 的响应数据已通知,该线程也会触发 listener 。上述过程中,两个线程同时触发listener ,因此没有严格保证触发顺序。作为改进,类似于 netty DefaultPromise,Promise.signalAll() 内部可以设置一个循环,不断触发 listener 直至 listeners 排空,以防期间注册了新的 listener;在此期间,新注册的 listener 可以直接加入到 listeners 中,而不是立刻触发。
  3. listener 的移除。在通知响应数据之前,Promise 长期持有 listener 的引用,导致 listener 对象无法被 gc。可以添加 remove(listener) 方法,或者允许仅持有 listener 的弱引用。

3.2.3 须避免的特性

前面的小节展示了 Promise 的特性与实现原理。纯正的 Promise 是异步传递响应数据的工具,其应当只实现必要的数据传递特性,而不应当夹杂请求提交、数据处理等逻辑。接下来我们来看一看,Promise 在实现时应避免哪些特性,以防限制调用者所能做出的决策。

1.异步 await() 发生阻塞;该规则不仅适用于 Promise,也适用于任何异步 API 。异步API常用于实时通信等延时敏感的场景,作用是减少线程阻塞,避免推迟后续其他操作。一旦发生阻塞,系统的响应速度和吞吐量就会受到严重冲击。

以连续提交数据库请求为例。如图3-3a 所示,调用者调用了一个异步 API,连续提交3次写入请求,并在所返回的 Promise 上注册回调。

我们考察 writeAsync()与await() 如发生阻塞阻塞,将会对调用者造成什么影响,如图3-3b 所示。提交请求是纯内存操作,线程处于 RUNNABLE 状态;writeAsync() 或 await() 如果发生阻塞,则线程处于 BLOCKED 状态,暂停工作而无法执行后续操作。当发生阻塞时,调用者每提交一个请求就不得不等待一段时间,从而降低了提交请求的频率,进而推迟了服务器对这些请求的响应,使得系统的吞吐量降低、延迟上升。特别地,如果系统采用了多路复用机制,即一个线程可以处理多个网络连接或多个请求,那么线程阻塞将会严重拖慢后续请求的处理,造成比较难排查的故障。

常见的阻塞原因包括:

  • Thread.sleep()
  • 向队列提交任务,调用了BlockingQueue.put()和take();应改为非阻塞的offer()和poll()
  • 向线程池提交任务,ExecutorService.submit(),如果线程池拒绝策略为CallerRunsPolicy,而任务本身又是耗时的。
  • 调用了阻塞的函数,包括:InputStream.read()、同步的 Promise.await()、KafkaProducer.send()。注意 KafkaProducer.send() 虽然形式上是异步 API,但是在底层缓存满或者无法获取服务器(broker)信息时,send()方法仍会发生阻塞。

图2-1a.png
图3-3a 连续提交请求

图3-3b.png
图3-3b 请求处理时间线

2.绑定线程池(ExecutorService),用于执行请求。如图3-4所示,线程池是异步API的一种可选模型,但并不是唯一实现。

  • 线程池模型。为了不阻塞调用者,API 内置了线程池来提交请求、处理响应;调用者可以向线程池连续提交多个请求,但是不需要等待响应。调用者提交一条请求后,线程池中的某条线程就会被独占,等待接收响应并进行处理,但在此之前无法再处理其他请求;完成处理后,该条线程重新变为空闲,可以继续处理后续请求。
  • 响应式模型。类似地,API 内置了发送和接收线程来提交请求、处理响应,调用者也不需要同步等待。调用者提交一条请求后,发送线程向网络发送请求;完成发送后,线程立刻变为空闲,可以发送后续请求。当收到响应数据时,接收线程得到通知以处理响应;完成处理后,线程立刻变为空闲,可以处理后续响应数据。上述过程中,任何一条线程都不会被某一请求独占,即线程随时都可以处理请求,而不需要等待之前的请求被响应。

综上,如果绑定了线程池,Promise 就实现了对其他模型(如响应式模型)的兼容性。

图3-4.png

图3-4 线程时间线:线程池 vs select

3.在构造方法创建 Promise 对象时,定义如何提交请求。这种方式只能定义如何处理单条请求,而无法实现请求的批量处理。

以数据库访问为例,现代数据库一般支持批量读写,以略微提升单次访问的延迟为代价,换来吞吐量显著提升;如果吞吐量得到提升,那么平均延迟反而会下降。下面的代码片段展示了一个批量请求 API:数据对象 BulkRequest 可以携带多条普通请求 Request,从而实现批量提交。

/* 提交单条请求*/
client.submit(new Request(1));
client.submit(new Request(2));
client.submit(new Request(3));

/* 提交批量请求*/
client.submit(new BulkRequest(
        new Request(1),
        new Request(2),
        new Request(3)
));

为了充分利用“批量请求”的特性,调用者需要进行跨越多条请求的“宏观调控”。请求产生后可以先缓存起来;等待一段时间后,取出所缓存的多条请求,组装一个批量请求来一并提交。因此,如下面的代码片段所示,在构造 Promise 时指定如何提交单条请求是没有意义的,这部分代码(client.submit(new Request(...)))并不会被执行;而实际希望执行的代码,其实是提交批量请求(client.submit(new BulkRequest(...)))。

/* Promise:提交单条请求*/
new Promise<>(() -> client.submit(new Request(1)));
new Promise<>(() -> client.submit(new Request(2)));
new Promise<>(() -> client.submit(new Request(3)));

4.在构造方法创建 Promise 对象时,定义如何处理响应数据,而不允许后续对响应数据注册回调。如下面的代码片段所示,在构造 Promise 对象时,注册了对响应数据的处理 process(result);但是除此以外,其他代码也有可能关心响应数据,需要注册回调 process1(result)、process2(result)。如果 Promise 只能在构造时注册唯一回调,那么其他关注者就无法注册所需回调函数,即 Promise API 退化回listener API。

/* 定义如何处理响应数据*/
Promise<String> promise = new Promise<>(result -> process(result));

/* 其他代码也关心响应数据*/
promise.await(result -> process1(result));
promise.await(result -> process2(result));

综上,Promise 应该是一个纯粹的数据对象,其职责是存储回调函数、存储响应数据;同时做好时序控制,保证触发回调函数无遗漏、保证触发顺序。除此以外,Promise 不应该和任何实现策略相耦合,不应该杂糅提交请求、处理响应的逻辑。

本文讲解了异步非阻塞设计模式,并对同步 API、异步 listener API、异步Promise API进行了对比。相比于其他两种API,Promise API 具有无可比拟的灵活性,调用者可以自由决定同步返回还是异步返回,并允许对响应数据注册多个回调函数。最后,本文讲解了Promise基本功能的实现,并初步实现了线程安全特性。

本系列共2篇文章,本文为第1篇《原理篇》。在下一篇《应用篇》中,我们将看到Promise 设计模式丰富的应用场景,将其和现有工具进行结合或对比,以及对Promise API 进行进一步变形和封装,提供异常处理、调度策略等特性。

[A] 异步非阻塞IO
https://en.wikipedia.org/wiki...

[B] Promise
https://en.wikipedia.org/wiki...

[C] java线程状态
https://segmentfault.com/a/11...

[D] http异步API样例:apache HttpAsyncClient
https://hc.apache.org/httpcom...

[E] redis异步API样例:lettuce
https://github.com/lettuce-io...

[F] mongo DB异步API样例:AsyncMongoClient
https://mongodb.github.io/mon...

[G] elasticsearch异步API样例:RestHighLevelClient
https://www.elastic.co/guide/...

[H] influx DB异步API样例:influxdb-java
https://github.com/influxdata...

[I] jedis vs lettuce
https://redislabs.com/blog/je...

[J] kafka
http://cloudurable.com/blog/k...

[K] KafkaProducer.send()阻塞
https://stackoverflow.com/que...

[L] netty
https://netty.io/wiki/user-gu...


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK