5

Netty 学习(十):ChannelPipeline源码说明 - Grey Zeng

 1 year ago
source link: https://www.cnblogs.com/greyzeng/p/16780213.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Netty 学习(十):ChannelPipeline源码说明

作者: Grey

原文地址:

博客园:Netty 学习(十):ChannelPipeline源码说明

CSDN:Netty 学习(十):ChannelPipeline源码说明

ChannelPipeline可以看作一条流水线,原料(字节流)进来,经过加工,形成一个个Java对象,然后基于这些对象进行处理,最后输出二进制字节流。

ChannelPipeline 在创建 NioSocketChannel 的时候创建, 其默认实现是 DefaultChannelPipeline

    final AbstractChannelHandlerContext head;
    final AbstractChannelHandlerContext tail;
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

ChannelPipeline 中保存了 Channel 的引用,且其中每个节点都是一个 ChannelHandlerContext 对象。每个 ChannelHandlerContext 节点都保存了执行器(即:ChannelHandler)。

ChannelPipeline里有两种不同的节点,一种是 ChannelInboundHandler,处理 inbound 事件(例如:读取数据流),还有一种是 ChannelOutboundHandler,处理 Outbound 事件,比如调用writeAndFlush()类方法时,就会调用该 handler。

添加 handler 的逻辑如下

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查是否有重复的 handler
            checkMultiplicity(handler);
            // 创建 节点
            newCtx = newContext(group, filterName(name, handler), handler);
            // 添加节点
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        // 回调用户方法
        callHandlerAdded0(newCtx);
        return this;
    }

如上代码,整个添加过程见注释说明,其实主要就是四步:

第一步:检查是否有重复的 handler,核心逻辑见

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                // 非共享的且添加过,就抛出异常,反之,如果一个 handler 支持共享,就可以无限次被添加到 ChannelPipeline 中
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

第二步:创建节点,即把 handler 包裹成 ChannelHandlerContext,核心逻辑如下

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
            new FastThreadLocal<Map<Class<?>, String>>() {
        @Override
        protected Map<Class<?>, String> initialValue() {
            return new WeakHashMap<Class<?>, String>();
        }
    };
    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }

        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }

注:Netty 使用 FastThreadLocal 变量来缓存 Handler 的类和名称的映射关系,在生成 name 的时候,首先看缓存中有没有生成过默认 name,如果没有生成,就调用generateName0()生成默认名称,加入缓存。

第三步:把 ChannelHandlerContext 作为节点添加到 pipeline 中,核心代码如下

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

其本质就是一个双向链表的插入节点过程,而且,ChannelPipeline 删除 ChannelHandler 的方法,本质就是把这个双向链表的某个节点删掉!

第四步:回调用户方法,核心代码如下

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.callHandlerAdded();
        } catch (Throwable t) {
            boolean removed = false;
            try {
                atomicRemoveFromHandlerList(ctx);
                ctx.callHandlerRemoved();
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }
    final void callHandlerRemoved() throws Exception {
        try {
            // Only call handlerRemoved(...) if we called handlerAdded(...) before.
            if (handlerState == ADD_COMPLETE) {
                handler().handlerRemoved(this);
            }
        } finally {
            // Mark the handler as removed in any case.
            setRemoved();
        }
    }

其中ctx.callHandlerAdded();就是回调用户的handlerAdded方法,然后通过 CAS 方式修改节点的状态为 REMOVE_COMPLETE (说明该节点已经被移除),或者 ADD_COMPLETE (添加完成)。

完整代码见:hello-netty

本文所有图例见:processon: Netty学习笔记

更多内容见:Netty专栏

参考资料#

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK