背景介绍介绍介绍
周末随便学习一些东西,在屈定老兄的博客上看到更新了一篇文章《Netty – Reactor模型的应用》,内容分析的很到位,对于 Reactor
模式,我了解到的主要还是在 Netty
框架中的线程模式使用的是 Reactor
模式,有时会想这个东西在在我们的业务系统中会有什么样的应用场景,是不是有机会在某些功能中落地到我们的项目中,而不是一直高高在上,为此还和屈定老兄交流了一下。
看来这个东西也只是“此曲只应天上有,人间能得几回闻”,业务系统落地机会看来不多,也可能还需要慢慢探索,不过还是想在学习一下,不要只是肤浅的理解。
如果在谷歌上搜索 Reactor
模式的知识,搜索结果里面一定会有 Doug Lea
大师《Scalable IO in Java》文档内容,看这个文档内容应该是一次知识分享的 PPT
内容,而且很多分析 Reactor
模式的博客文章大部分在引用里面都会提到《Scalable IO in Java》内容,这次的学习也集中在这个文档的内容。
可扩展的网络服务
在这一节中,Doug Lea
大师总结了在 web
应用服务、分布式服务中一般包括如下一些基本的处理流程:
Read request
(读请求,比如说 web
请求中的 HttpRequest
)
Decode request
(解析 Request
数据)
Process service
(业务逻辑处理)
Encode reply
(包装响应数据)
Send reply
(发送响应结果,比如说 web
请求中的 HttpReponse
)
这个处理流程中不同的是每次需要进行的 XML
解析,文件传输、web
网页生成,服务计算等内容… …
经典服务设计
每一个 handler
处理流程可能会启动一个自己独享的线程。
| class Server implements Runnable {
@Override public void run() { try { ServerSocket ss = new ServerSocket(PORT); while (!Thead.interrupted()) { new Thread(new Handler(ss.accept())).start(); // or, single-threaded, or a thread pool } } catch (IOException ex) { /* ... */ } }
static class Handler implements Runnable { final Socket socket; Handler(Socket s) { socket = s; }
@Override public void run() { try { byte[] input = new byte[MAX_INPUT]; socket.getInputStream().read(input); byte[] output = process(input); socket.getOutputStream().write(output); } catch (IOException ex) { /* ... */ } }
private byte[] process(byte[] cmd) { /* ... */ } } }
//注意:代码示例中的异常处理内容都被忽略掉了
|
可扩展性目标
分而治之的思想
将整个处理流程分割成为一些小的任务,每个任务执行一个动作而不会产生阻塞。
当某个时刻任务被启用时,开始执行这个任务;在整个过程中,IO
事件作为任务启用的触发器。
java.nio
实现的基本机制:
非阻塞的读操作和写操作
监测 IO
相关事件的调度任务程序
无限变化的可能
事件驱动设计
事件驱动的设计通常比替代方案更加有效:
事件驱动的设计通常比替代方案程序实现更加复杂:
必须分解成简单的非阻塞动作
类似于 GUI
事件驱动的操作
无法消除所有阻塞:GC
,页面错误等
必须跟踪逻辑服务状态
背景介绍:AWT 中的事件
事件驱动的 IO
使用相似的想法,但设计不同
Reactor 模式
Reactor
通过调度适当的处理程序来响应 IO
事件(和 AWT
中的线程作用非常类似)
Handler
用于完成非阻塞的动作(和 AWT
中 ActionListeners
作用类似)
通过将处理程序绑定到事件进行管理(和 AWT
中 addActionListener
作用类似)
参见:Schmidt et al, Pattern-Oriented Software Architecture, Volume 2 (POSA2)
或者 Richard Stevens 的网络编程书籍, Matt Welsh 的 SEDA 框架书籍等内容。
Reactor 模式基本实现
单线程版本实现
java.nio 支持
Channels
:Channel
用于实现非阻塞读操作,可以连接到文件、Socket
等;
Buffers
:Buffer
类似于对象数组,能够直接通过 Channel
进行读写操作;
Selectors
:判断一组 Channel
中的哪些发生了 IO
事件;
SelectionKeys
:维护 IO
事件状态和绑定状态
Reactor 模式第一步:启动
| class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
public Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); }
/* Alternatively, use explicit SPI provider: SelectorProvider p = SelectorProvider.provider(); selector = p.openSelector(); serverSocket = p.openServerSocketChannel(); */ }
|
Reactor 模式第二步:循环调度
| @Override public void run() { // normally in a new Thread try { while (!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator it = selected.iterator(); while (it.hasNext()) { dispatch((SelectionKey)(it.next())); selected.clear(); } } } catch (IOException ex) { /* ... */ } }
void dispatch(SelectionKey k) { Runnable r = (Runnable) (k.attachment()); if (r != null) { r.run(); } }
|
Reactor 模式第三步:Acceptor
| class Acceptor implements Runnable { // inner
@Override public void run() { try { SocketChannel c = serverSocket.accept(); if (c != null) { new Handler(selector, c); } } catch (IOException ex) { /* ... */ } } }
|
Reactor 模式第四步:Handler 启动
| final class Handler implements Runnable { final SocketChannel socket; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAXIN); ByteBuffer ouput = ByteBuffer.allocate(MAXOUT); static final int READING = 0, SENDING = 1; int state = READING;
public Handler(Selector sel, SocketChannel c) throws IOException { socket = c; c.configureBlocking(false); // Optionally try first read now sk = socket.register(sel, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); sel.wakeup(); }
boolean inputIsComplete() { /* ... */ } boolean outputIsComplete() { /* ... */ } void process() { /* ... */ }
|
Reactor 模式第五步:请求处理
| @Override public void run() { try { if (state = READING) { read(); } else if (state = SENDING) { send(); } } catch (IOException ex) { /* ... */ } }
void read() throws IOException { socket.read(input); if (inputIsComplete()) { process(); state = SENDING; // Normally also do first write now sk.interestOps(SelectionKey.OP_WRITE); } }
void send() throws IOException { socket.write(ouput); if (inputIsComplete()) { sk.cancel(); } }
|
单个状态处理程序
GoF
中状态模式的简单应用:重新绑定适当的处理程序作为附件
| class Handler { // ...
public void run() { // initial state is reader socket.read(input); if (inputIsComplete()) { process(); sk.attach(new Sender()); sk.interest(SelectionKey.OP_WRITE); sk.selector().wakeup(); } }
class Sender implements Runnable {
@Override public void run(){ // ... socket.write(output); if (outputIsComplete()) sk.cancel(); } } }
|
多线程设计实现
- 卸载非
IO
处理以加速 Reactor
线程:与 POSA2 Proactor 设计类似
PS. 工作线程用于处理 IO 事件,Reactor
线程不用关心 IO
事件,这样可以提升 Reactor
线程 处理速度
比将计算绑定处理重新加工成事件驱动的形式更简单:应该仍然是纯非阻塞计算,足够的处理胜过开销
但是很难将处理与 IO
重叠:最好某个时刻可以先将所有输入读入缓冲区
使用线程池,因此可以进行调整和控制:通常需要比客户端少的线程
工作线程池
线程池处理器
| class Handler { // ...
// uses util.concurrent thread pool static PooledExecutor pool = new PooledExecutor(...); static final int PROCESSING = 3;
// ... synchronized void read() { // ... socket.read(input); if (inputIsComplete()) { state = PROCESSING; pool.execute(new Processer()); } }
synchronized void processAndHandOff() { process(); state = SENDING; // or rebind attachment sk.interest(SelectionKey.OP_WRITE); }
class Processer implements Runnable { public void run() { processAndHandOff(); } } }
|
Handoffs
每个任务都会启用,触发或调用下一个任务
通常最快,但可能很脆弱
Callbacks
:回调每个处理程序的调度程序
设置状态,附件等
GoF
Mediator
模式的变体
Queues
Futures
当每个任务产生结果时
协作位于联接或等待 / 通知之上
使用线程池执行器
多个 Reactor 线程模式
使用 Reactor
对象池
主 acceptor
负责分配到其他的 Reactor
| Selector[] selectors; // also create threads int next = 0; class Acceptor { // ... public synchronized void run() { ... Socket connection = serverSocket.accept(); if (connection != null) { new Handler(selectors[next], connection); } if (++next == selectors.length) { next = 0; } } }
|
使用多 Reactor 模式
使用其他的 java.nio 特性
每个 Reactor
支持多个 Selector
将不同的处理程序绑定到不同的 IO
事件
需要仔细处理同步操作来进行协调
内存映射文件
有时可以实现零拷贝传输
但是启动和完成会产生额外的开销
最适合连接时间较长的应用
基于连接的扩展
非单个服务请求处理
客户端连接
客户端发送一系列的消息 / 请求
断开客户端连接
可扩展的基础网络服务模式
处理许多存活时间相对较长的客户端请求
跟踪客户端和会话状态(包括丢弃)
跨多个主机分配服务
API 练习
Buffer
| abstract class Buffer {
int capacity(); int position(); Buffer position(int newPosition); int limit(); Buffer limit(int newLimit); Buffer mark(); Buffer reset(); Buffer clear(); Buffer flip(); Buffer rewind(); int remaining(); boolean hasRemaining(); boolean isReadOnly(); }
|
ByteBuffer
| abstract class ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity); static ByteBuffer allocate(int capacity); static ByteBuffer wrap(byte[] src, int offset, int len); static ByteBuffer wrap(byte[] src); boolean isDirect(); ByteOrder order(); ByteBuffer order(ByteOrder bo); ByteBuffer slice(); ByteBuffer duplicate(); ByteBuffer compact(); ByteBuffer asReadOnlyBuffer(); byte get(); byte get(int index); ByteBuffer get(byte[] dst, int offset, int length); ByteBuffer get(byte[] dst); ByteBuffer put(byte b); ByteBuffer put(int index, byte b); ByteBuffer put(byte[] src, int offset, int length); ByteBuffer put(ByteBuffer src); ByteBuffer put(byte[] src); char getChar(); char getChar(int index); ByteBuffer putChar(char value); ByteBuffer putChar(int index, char value); CharBuffer asCharBuffer(); short getShort(); short getShort(int index); ByteBuffer putShort(short value); ByteBuffer putShort(int index, short value); ShortBuffer asShortBuffer(); int getInt(); int getInt(int index); ByteBuffer putInt(int value); ByteBuffer putInt(int index, int value); IntBuffer asIntBuffer(); long getLong(); long getLong(int index); ByteBuffer putLong(long value); ByteBuffer putLong(int index, long value); LongBuffer asLongBuffer(); float getFloat(); float getFloat(int index); ByteBuffer putFloat(float value); ByteBuffer putFloat(int index, float value); FloatBuffer asFloatBuffer(); double getDouble(); double getDouble(int index); ByteBuffer putDouble(double value); ByteBuffer putDouble(int index, double value); DoubleBuffer asDoubleBuffer(); }
|
Channel
| interface Channel { boolean isOpen(); void close() throws IOException; }
interface ReadableByteChannel extends Channel { int read(ByteBuffer dst) throws IOException; }
interface WritableByteChannel extends Channel { int write(ByteBuffer src) throws IOException; }
interface ScatteringByteChannel extends ReadableByteChannel { int read(ByteBuffer[] dsts, int offset, int length) throws IOException; int read(ByteBuffer[] dsts) throws IOException; }
interface GatheringByteChannel extends WritableByteChannel { int write(ByteBuffer[] srcs, int offset, int length) throws IOException; int write(ByteBuffer[] srcs) throws IOException; }
|
SelectableChannel
| abstract class SelectableChannel implements Channel { int validOps(); boolean isRegistered(); SelectionKey keyFor(Selector sel); SelectionKey register(Selector sel, int ops) throws ClosedChannelException; void configureBlocking(boolean block) throws IOException; boolean isBlocking(); Object blockingLock(); }
|
SocketChannel
| abstract class SocketChannel implements ByteChannel ... { static SocketChannel open() throws IOException; Socket socket(); int validOps(); boolean isConnected(); boolean isConnectionPending(); boolean isInputOpen(); boolean isOutputOpen(); boolean connect(SocketAddress remote) throws IOException; boolean finishConnect() throws IOException; void shutdownInput() throws IOException; void shutdownOutput() throws IOException; int read(ByteBuffer dst) throws IOException; int read(ByteBuffer[] dsts, int offset, int length) throws IOException; int read(ByteBuffer[] dsts) throws IOException; int write(ByteBuffer src) throws IOException; int write(ByteBuffer[] srcs, int offset, int length) throws IOException;int write(ByteBuffer[] srcs) throws IOException; }
|
ServerSocketChannel
| abstract class ServerSocketChannel extends ... { static ServerSocketChannel open() throws IOException; int validOps(); ServerSocket socket(); SocketChannel accept() throws IOException; }
|
FileChannel
| abstract class FileChannel implements ... { int read(ByteBuffer dst); int read(ByteBuffer dst, long position); int read(ByteBuffer[] dsts, int offset, int length); int read(ByteBuffer[] dsts); int write(ByteBuffer src); int write(ByteBuffer src, long position); int write(ByteBuffer[] srcs, int offset, int length); int write(ByteBuffer[] srcs); long position(); void position(long newPosition); long size(); void truncate(long size); void force(boolean flushMetaDataToo); int transferTo(long position, int count, WritableByteChannel dst); int transferFrom(ReadableByteChannel src, long position, int count); FileLock lock(long position, long size, boolean shared); FileLock lock(); FileLock tryLock(long pos, long size, boolean shared); FileLock tryLock(); static final int MAP_RO, MAP_RW, MAP_COW; MappedByteBuffer map(int mode, long position, int size); } NOTE: ALL methods throw IOException
|
Selector
| abstract class Selector { static Selector open() throws IOException; Set keys(); Set selectedKeys(); int selectNow() throws IOException; int select(long timeout) throws IOException; int select() throws IOException; void wakeup(); void close() throws IOException; }
|
SelectionKey
| abstract class SelectionKey { static final int OP_READ, OP_WRITE, OP_CONNECT, OP_ACCEPT; SelectableChannel channel(); Selector selector(); boolean isValid(); void cancel(); int interestOps(); void interestOps(int ops); int readyOps(); boolean isReadable(); boolean isWritable(); boolean isConnectable(); boolean isAcceptable(); Object attach(Object ob); Object attachment(); }
|