1

Netty 学习(五):服务端启动核心流程源码说明 - Grey Zeng

 1 year ago
source link: https://www.cnblogs.com/greyzeng/p/16745560.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Netty 学习(五):服务端启动核心流程源码说明

作者: Grey

原文地址:

博客园:Netty 学习(五):服务端启动核心流程源码说明

CSDN:Netty 学习(五):服务端启动核心流程源码说明

说明#

本文使用的 Netty 版本是 4.1.82.Final,

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.82.Final</version>
        </dependency>

服务端在启动的时候,主要流程有如下几个

  1. 创建服务端的 Channel

  2. 初始化服务端的 Channel

  3. 注册 Selector

我们可以写一个简单的服务端代码,通过 Debug 的方式查看这几个关键流程的核心代码。

package source;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * 代码阅读
 *
 * @author <a href="mailto:[email protected]">Grey</a>
 * @date 2022/9/12
 * @since
 */
public final class SimpleServer {
    public static void main(String[] args) throws InterruptedException {
        // EventLoopGroup: 服务端的线程模型外观类。这个线程要做的事情
        // 就是不停地检测IO事件,处理IO事件,执行任务。
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // 服务端的一个启动辅助类。通过给它设置一系列参数来绑定端口启动服务。
            ServerBootstrap b = new ServerBootstrap();
            b
                    // 设置服务端的线程模型。
                    // bossGroup 负责不断接收新的连接,将新的连接交给 workerGroup 来处理。
                    .group(bossGroup, workerGroup)
                    // 设置服务端的 IO 类型是 NIO。Netty 通过指定 Channel 的类型来指定 IO 类型。
                    .channel(NioServerSocketChannel.class)
                    // 服务端启动过程中,需要经过哪些流程。
                    .handler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) {
                            System.out.println("channelActive");
                        }

                        @Override
                        public void channelRegistered(ChannelHandlerContext ctx) {
                            System.out.println("channelRegistered");
                        }

                        @Override
                        public void handlerAdded(ChannelHandlerContext ctx) {
                            System.out.println("handlerAdded");
                        }
                    })
                    // 用于设置一系列 Handler 来处理每个连接的数据
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {

                        }
                    });
            // 绑定端口同步等待。等服务端启动完毕,才会进入下一行代码
            ChannelFuture f = b.bind(8888).sync();
            // 等待服务端关闭端口绑定,这里的作用是让程序不会退出
            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}
ChannelFuture f = b.bind(8888).sync();

bind方法,进入源码进行查看。

首先,进入的是AbstractBootstrap中,调用的最关键的方法是如下两个:

……
    private ChannelFuture doBind(final SocketAddress localAddress) {
        ……
        final ChannelFuture regFuture = initAndRegister();
        ……
        doBind0(regFuture, channel, localAddress, promise);
        ……
    }
……

进入initAndResgister()方法中

……
    final ChannelFuture initAndRegister() {
        ……
        // channel 的新建
        channel = channelFactory.newChannel();
        // channel 的初始化
        init(channel);
        ……
    }
……

这里完成了 Channel 的新建和初始化,Debug 进去,发现channelFactory.newChannel()实际上是调用了ReflectiveChannelFactorynewChannel方法,

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
……
    private final Constructor<? extends T> constructor;

    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        ……
        this.constructor = clazz.getConstructor();
        ……
    }

    @Override
    public T newChannel() {
        ……
        return constructor.newInstance();
        ……
    }
……
}

这里调用了反射方法,其实就是将服务端代码中的这一行.channel(NioServerSocketChannel.class)中的NioServerSocketChannel.class传入进行对象创建,创建一个NioServerSocketChannel实例。

在创建NioServerSocketChannel的时候,调用了NioServerSocketChannel的构造方法,构造方法的主要逻辑如下

……
    public NioServerSocketChannel(SelectorProvider provider, InternetProtocolFamily family) {
        this(newChannel(provider, family));
    }
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    private static ServerSocketChannel newChannel(SelectorProvider provider, InternetProtocolFamily family) {
        ……
            ServerSocketChannel channel =
                    SelectorProviderUtil.newChannel(OPEN_SERVER_SOCKET_CHANNEL_WITH_FAMILY, provider, family);
            return channel == null ? provider.openServerSocketChannel() : channel;
        ……
    }
……

其中provider.openServerSocketChannel()就是调用底层 JDK 的 API,获取了 JDK 底层的java.nio.channels.ServerSocketChannel

通过super(null, channel, SelectionKey.OP_ACCEPT);一路跟踪进去,进入AbstractNioChannel中,

   protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        ……
        ch.configureBlocking(false);
        ……
    }

关键代码是ch.configureBlocking(false),设置 I/O 模型为非阻塞模式。

通过super(parent)跟踪上去,

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

其中 id 是 Netty 中每条 Channel 的唯一标识。

以上就是服务端 Channel 的创建过程。

接下来是服务端 Channel 的初始化过程,回到AbstractBootstrap.initAndResgister()方法

……
    final ChannelFuture initAndRegister() {
        ……
        // channel 的新建
        channel = channelFactory.newChannel();
        // channel 的初始化
        init(channel);
        ……
    }
……

其中的init(channel)方法就是服务端的 Channel 的初始化过程,Debug 进入,发现是调用了ServerBootstrap.init(channel)方法,


    @Override
    void init(Channel channel) {
        ……
        // 设置一些 Channel 的属性和配置信息
        ……
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

其核心代码如上,主要用于定义服务端启动过程中需要执行哪些逻辑。主要分为两块:

  1. 一块是添加用户自定义的处理逻辑到服务端启动流程。

  2. 另一块是添加一个特殊的处理逻辑,ServerBootstrapAcceptor 是一个接入器,接受新请求,把新的请求传递给某个事件循环器。

以上就是服务端的 Channel 的初始化过程。接下来是服务端 Channel 的注册 Selector 的过程。

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

在这个步骤中,我们可以看到关于 JDK 底层的操作

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

首先拿到在前面过程中创建的 JDK 底层的 Channel,然后调用 JDK 的 register() 方法,将 this 也即 NioServerSocketChannel 对象当作 attachment 绑定到 JDK 的 Selector 上,这样后续从 Selector 拿到对应的事件之后,就可以把 Netty 领域的 Channel 拿出来。

接下来是服务端绑定端口的逻辑,见AbstractBootstrap中的doBind0方法

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

图例#

本文所有图例见:processon: Netty学习笔记

代码#

hello-netty

更多内容见:Netty专栏

参考资料#

跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

深度解析Netty源码


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK