3

netty系列之:channelPipeline详解

 2 years ago
source link: https://segmentfault.com/a/1190000041473300
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

我们在介绍channel的时候提到过,几乎channel中所有的实现都是通过channelPipeline进行的,作为一个pipline,它到底是如何工作的呢?

一起来看看吧。

ChannelPipeline

ChannelPipeline是一个interface,它继承了三个接口,分别是ChannelInboundInvoker,ChannelOutboundInvoker和Iterable:

public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> 

继承自ChannelInboundInvoker,表示ChannelPipeline可以触发channel inboud的一些事件,比如:

ChannelInboundInvoker fireChannelRegistered();
ChannelInboundInvoker fireChannelUnregistered();
ChannelInboundInvoker fireChannelActive();
ChannelInboundInvoker fireChannelInactive();
ChannelInboundInvoker fireExceptionCaught(Throwable cause);
ChannelInboundInvoker fireUserEventTriggered(Object event);
ChannelInboundInvoker fireChannelRead(Object msg);
ChannelInboundInvoker fireChannelReadComplete();
ChannelInboundInvoker fireChannelWritabilityChanged();

继承自ChannelOutboundInvoker,表示ChannelPipeline可以进行一些channel的主动操作,如:bind,connect,disconnect,close,deregister,read,write,flush等操作。

继承自Iterable,表示ChannelPipeline是可遍历的,为什么ChannelPipeline是可遍历的呢?

因为ChannelPipeline中可以添加一个或者多个ChannelHandler,ChannelPipeline可以看做是一个ChannelHandler的集合。

比如ChannelPipeline提供了一系列的添加ChannelHandler的方法:

ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addFirst(ChannelHandler... handlers);

ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

可以从前面添加,也可以从后面添加,或者从特定的位置添加handler。

另外还可以从pipeline中删除特定的channelHandler,或者移出和替换特定位置的handler:

ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

当然,更少不了对应的查询操作:

ChannelHandler first();
ChannelHandler last();
ChannelHandler get(String name);
List<String> names();

还可以根据传入的ChannelHandler获得handler对应的ChannelHandlerContext。

ChannelHandlerContext context(ChannelHandler handler);

ChannelPipeline中还有一些触发channel相关的事件,如:

    ChannelPipeline fireChannelRegistered();
    ChannelPipeline fireChannelUnregistered();
    ChannelPipeline fireChannelActive();
    ChannelPipeline fireChannelInactive();
    ChannelPipeline fireExceptionCaught(Throwable cause);
    ChannelPipeline fireUserEventTriggered(Object event);
    ChannelPipeline fireChannelRead(Object msg);
    ChannelPipeline fireChannelReadComplete();
    ChannelPipeline fireChannelWritabilityChanged();

那么有些朋友可能会问了,既然ChannelPipeline中包含了很多个handler,那么handler中的事件是怎么传递的呢?

其实这些事件是通过调用ChannelHandlerContext中的相应方法来触发的。

对于Inbound事件来说,可以调用下面的方法,进行事件的传递:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

对于Outbound事件来说,可以调用下面的方法,进行事件的传递:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
ChannelHandlerContext.deregister(ChannelPromise)

具体而言,就是在handler中调用ChannelHandlerContext中对应的方法:

   public class MyInboundHandler extends ChannelInboundHandlerAdapter {
        @Override
       public void channelActive(ChannelHandlerContext ctx) {
           System.out.println("Connected!");
           ctx.fireChannelActive();
       }
   }
  
   public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
        @Override
       public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
           System.out.println("Closing ..");
           ctx.close(promise);
       }
   }
   

DefaultChannelPipeline

ChannelPipeline有一个官方的实现叫做DefaultChannelPipeline,因为对于pipeline来说,主要的功能就是进行handler的管理和事件传递,相对于而言功能比较简单,但是他也有一些特别的实现地方,比如它有两个AbstractChannelHandlerContext类型的head和tail。

我们知道ChannelPipeline实际上是很多handler的集合,那么这些集合是怎么进行存储的呢?这种存储的数据结构就是AbstractChannelHandlerContext。每个AbstractChannelHandlerContext中都有一个next节点和一个prev节点,用来组成一个双向链表。

同样的在DefaultChannelPipeline中使用head和tail来将封装好的handler存储起来。

注意,这里的head和tail虽然都是AbstractChannelHandlerContext,但是两者有稍许不同。先看下head和tail的定义:

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

在DefaultChannelPipeline的构造函数中,对tail和head进行初始化,其中tail是TailContext,而head是HeadContext。

其中TailContext实现了ChannelInboundHandler接口:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler

而HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler接口:

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler 

下面我们以addFirst方法为例,来看一下handler是怎么被加入pipline的:

    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            name = filterName(name, handler);

            newCtx = newContext(group, name, handler);

            addFirst0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }

它的工作逻辑是首先根据传入的handler构建一个新的context,然后调用addFirst0方法,将context加入AbstractChannelHandlerContext组成的双向链表中:

    private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;
        newCtx.prev = head;
        newCtx.next = nextCtx;
        head.next = newCtx;
        nextCtx.prev = newCtx;
    }

然后调用callHandlerAdded0方法来触发context的handlerAdded方法。

channelPipeline负责管理channel的各种handler,在DefaultChannelPipeline中使用了AbstractChannelHandlerContext的head和tail来对多个handler进行存储,同时借用这个链式结构对handler进行各种管理,非常方便。

本文已收录于 http://www.flydean.com/04-3-netty-channelpipeline/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK