1

netty系列之:channelPipeline详解 - InfoQ 写作平台

 2 years ago
source link: https://xie.infoq.cn/article/48d9138eba2f6982af757ce4b
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

我们在介绍 channel 的时候提到过,几乎 channel 中所有的实现都是通过 channelPipeline 进行的,作为一个 pipline,它到底是如何工作的呢?

一起来看看吧。

ChannelPipeline

ChannelPipeline 是一个 interface,它继承了三个接口,分别是 ChannelInboundInvoker,ChannelOutboundInvoker 和 Iterable:

public interface ChannelPipeline        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 

继承自 ChannelInboundInvoker,表示 ChannelPipeline 可以触发 channel inboud 的一些事件,比如:

ChannelInboundInvoker fireChannelRegistered();ChannelInboundInvoker fireChannelUnregistered();ChannelInboundInvoker fireChannelActive();ChannelInboundInvoker fireChannelInactive();ChannelInboundInvoker fireExceptionCaught(Throwable cause);ChannelInboundInvoker fireUserEventTriggered(Object event);ChannelInboundInvoker fireChannelRead(Object msg);ChannelInboundInvoker fireChannelReadComplete();ChannelInboundInvoker fireChannelWritabilityChanged();

继承自 ChannelOutboundInvoker,表示 ChannelPipeline 可以进行一些 channel 的主动操作,如:bind,connect,disconnect,close,deregister,read,write,flush 等操作。

继承自 Iterable,表示 ChannelPipeline 是可遍历的,为什么 ChannelPipeline 是可遍历的呢?

因为 ChannelPipeline 中可以添加一个或者多个 ChannelHandler,ChannelPipeline 可以看做是一个 ChannelHandler 的集合。

比如 ChannelPipeline 提供了一系列的添加 ChannelHandler 的方法:

ChannelPipeline addFirst(String name, ChannelHandler handler);ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);ChannelPipeline addFirst(ChannelHandler... handlers);ChannelPipeline addLast(String name, ChannelHandler handler);ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);ChannelPipeline addLast(ChannelHandler... handlers);ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

可以从前面添加,也可以从后面添加,或者从特定的位置添加 handler。

另外还可以从 pipeline 中删除特定的 channelHandler,或者移出和替换特定位置的 handler:

ChannelPipeline remove(ChannelHandler handler);ChannelHandler remove(String name);ChannelHandler removeFirst();ChannelHandler removeLast();ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

当然,更少不了对应的查询操作:

ChannelHandler first();ChannelHandler last();ChannelHandler get(String name);List<String> names();

还可以根据传入的 ChannelHandler 获得 handler 对应的 ChannelHandlerContext。

ChannelHandlerContext context(ChannelHandler handler);

ChannelPipeline 中还有一些触发 channel 相关的事件,如:

    ChannelPipeline fireChannelRegistered();    ChannelPipeline fireChannelUnregistered();    ChannelPipeline fireChannelActive();    ChannelPipeline fireChannelInactive();    ChannelPipeline fireExceptionCaught(Throwable cause);    ChannelPipeline fireUserEventTriggered(Object event);    ChannelPipeline fireChannelRead(Object msg);    ChannelPipeline fireChannelReadComplete();    ChannelPipeline fireChannelWritabilityChanged();

那么有些朋友可能会问了,既然 ChannelPipeline 中包含了很多个 handler,那么 handler 中的事件是怎么传递的呢?

其实这些事件是通过调用 ChannelHandlerContext 中的相应方法来触发的。

对于 Inbound 事件来说,可以调用下面的方法,进行事件的传递:

ChannelHandlerContext.fireChannelRegistered()ChannelHandlerContext.fireChannelActive()ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.fireChannelReadComplete()ChannelHandlerContext.fireExceptionCaught(Throwable)ChannelHandlerContext.fireUserEventTriggered(Object)ChannelHandlerContext.fireChannelWritabilityChanged()ChannelHandlerContext.fireChannelInactive()ChannelHandlerContext.fireChannelUnregistered()

对于 Outbound 事件来说,可以调用下面的方法,进行事件的传递:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)ChannelHandlerContext.write(Object, ChannelPromise)ChannelHandlerContext.flush()ChannelHandlerContext.read()ChannelHandlerContext.disconnect(ChannelPromise)ChannelHandlerContext.close(ChannelPromise)ChannelHandlerContext.deregister(ChannelPromise)

具体而言,就是在 handler 中调用 ChannelHandlerContext 中对应的方法:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {        @Override       public void channelActive(ChannelHandlerContext ctx) {           System.out.println("Connected!");           ctx.fireChannelActive();       }   }   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {        @Override       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {           System.out.println("Closing ..");           ctx.close(promise);       }   }

DefaultChannelPipeline

ChannelPipeline 有一个官方的实现叫做 DefaultChannelPipeline,因为对于 pipeline 来说,主要的功能就是进行 handler 的管理和事件传递,相对于而言功能比较简单,但是他也有一些特别的实现地方,比如它有两个 AbstractChannelHandlerContext 类型的 head 和 tail。

我们知道 ChannelPipeline 实际上是很多 handler 的集合,那么这些集合是怎么进行存储的呢?这种存储的数据结构就是 AbstractChannelHandlerContext。每个 AbstractChannelHandlerContext 中都有一个 next 节点和一个 prev 节点,用来组成一个双向链表。

同样的在 DefaultChannelPipeline 中使用 head 和 tail 来将封装好的 handler 存储起来。

注意,这里的 head 和 tail 虽然都是 AbstractChannelHandlerContext,但是两者有稍许不同。先看下 head 和 tail 的定义:

    protected DefaultChannelPipeline(Channel channel) {        this.channel = ObjectUtil.checkNotNull(channel, "channel");        succeededFuture = new SucceededChannelFuture(channel, null);        voidPromise =  new VoidChannelPromise(channel, true);        tail = new TailContext(this);        head = new HeadContext(this);        head.next = tail;        tail.prev = head;    }

在 DefaultChannelPipeline 的构造函数中,对 tail 和 head 进行初始化,其中 tail 是 TailContext,而 head 是 HeadContext。

其中 TailContext 实现了 ChannelInboundHandler 接口:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler

而 HeadContext 实现了 ChannelOutboundHandler 和 ChannelInboundHandler 接口:

final class HeadContext extends AbstractChannelHandlerContext            implements ChannelOutboundHandler, ChannelInboundHandler 

下面我们以 addFirst 方法为例,来看一下 handler 是怎么被加入 pipline 的:

    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {        final AbstractChannelHandlerContext newCtx;        synchronized (this) {            checkMultiplicity(handler);            name = filterName(name, handler);            newCtx = newContext(group, name, handler);            addFirst0(newCtx);            // If the registered is false it means that the channel was not registered on an eventLoop yet.            // In this case we add the context to the pipeline and add a task that will call            // ChannelHandler.handlerAdded(...) once the channel is registered.            if (!registered) {                newCtx.setAddPending();                callHandlerCallbackLater(newCtx, true);                return this;            }            EventExecutor executor = newCtx.executor();            if (!executor.inEventLoop()) {                callHandlerAddedInEventLoop(newCtx, executor);                return this;            }        }        callHandlerAdded0(newCtx);        return this;    }

它的工作逻辑是首先根据传入的 handler 构建一个新的 context,然后调用 addFirst0 方法,将 context 加入 AbstractChannelHandlerContext 组成的双向链表中:

    private void addFirst0(AbstractChannelHandlerContext newCtx) {        AbstractChannelHandlerContext nextCtx = head.next;        newCtx.prev = head;        newCtx.next = nextCtx;        head.next = newCtx;        nextCtx.prev = newCtx;    }

然后调用 callHandlerAdded0 方法来触发 context 的 handlerAdded 方法。

channelPipeline 负责管理 channel 的各种 handler,在 DefaultChannelPipeline 中使用了 AbstractChannelHandlerContext 的 head 和 tail 来对多个 handler 进行存储,同时借用这个链式结构对 handler 进行各种管理,非常方便。

本文已收录于 http://www.flydean.com/04-3-netty-channelpipeline/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK