27

7. 彤哥说netty系列之Java NIO核心组件之Selector

 4 years ago
source link: https://segmentfault.com/a/1190000021210716
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

<p align="right">——日拱一卒,不期而至!</p>

E3mem2J.png!web

你好,我是彤哥,本篇是netty系列的第七篇。

简介

上一章我们一起学习了Java NIO的核心组件Buffer,它通常跟Channel一起使用,但是它们在网络IO中又该如何使用呢,今天我们将一起学习另一个NIO核心组件—— Selector ,没有它可以说就干不起来网络IO。

概念

我们先来看两段Selector的注释,见类 java.nio.channels.Selector

注释I

A multiplexor of {@link SelectableChannel} objects.

它是 SelectableChannel 对象的多路复用器 ,从这里我们也可以知道Java NIO实际上是多路复用IO。

SelectableChannel 有几个子类,你会非常熟悉:

  • DatagramChannel,UDP协议连接
  • SocketChannel,TCP协议连接
  • ServerSocketChannel,专门处理TCP协议Accept事件

我们有必要复习一下 多路复用IO的流程

AFJzy2B.png!web

第一阶段通过select去轮询检查有没有连接准备好数据,第二阶段把数据从内核空间拷贝到用户空间。

在Java中,就是通过 Selector 这个多路复用器来实现第一阶段的。

注释II

A selector may be created by invoking the {@link #open open} method of this class, which will use the system's default {@link java.nio.channels.spi.SelectorProvider selector provider} to create a new selector. A selector may also be created by invoking the {@link java.nio.channels.spi.SelectorProvider#openSelector openSelector} method of a custom selector provider. A selector remains open until it is closed via its {@link #close close} method.

Selector 可以通过它自己的 open() 方法创建,它将通过默认的 java.nio.channels.spi.SelectorProvider 类创建一个新的Selector。也可以通过实现 java.nio.channels.spi.SelectorProvider 类的抽象方法 openSelector() 来自定义实现一个Selector。Selector一旦创建将会一直处于open状态直到调用了 close() 方法为止。

那么,默认使用的Selector究竟是哪个呢?

通过跟踪源码:

> java.nio.channels.Selector#open()
  1> java.nio.channels.spi.SelectorProvider#provider()
    1.1> sun.nio.ch.DefaultSelectorProvider#create() // 返回WindowsSelectorProvider
  2> sun.nio.ch.WindowsSelectorProvider#openSelector() // 返回WindowsSelectorImpl

可以看到,在Windows平台下,默认实现的Provider是 WindowsSelectorProvider ,它的 openSelector() 方法返回的是 WindowsSelectorImpl ,它就是Windows平台默认的Selector实现。

为什么要提到在Windows平台呢,难道在Linux下面实现不一样?

是滴,因为网络IO是跟操作系统息息相关的,不同的操作系统的实现可能都不一样,Linux下面JDK的实现完全不一样,那么我们为什么没有感知到呢?我的代码在Windows下面写的,拿到Linux下面不是一样运行?那是Java虚拟机(或者说Java运行时环境)帮我们把这个事干了,它屏蔽了跟操作系统相关的细节,这也是Java代码可以“Write Once, Run Anywhere”的精髓所在。

Selector与Channel的关系

上面我们说了selector是多路复用器,它是在网络IO的第一阶段用来轮询检查有没有连接准备好数据的,那么它和Channel是什么关系呢?

AZr2emY.png!web

Selector通过不断轮询的方式同时监听多个Channel的事件,注意,这里是 同时监听 ,一旦有Channel准备好了,它就会返回这些准备好了的Channel,交给处理线程去处理。

所以,在NIO编程中,通过Selector我们就实现了一个线程同时处理多个连接请求的目标,也可以一定程序降低服务器资源的消耗。

基本用法

创建Selector

通过调用 Selector.open() 方法是我们常用的方式:

Selector selector = Selector.open();

当然,也可以通过实现 java.nio.channels.spi.SelectorProvider.openSelector() 抽象方法自定义一个Selector。

将Channel注册到Selector上

为了将Channel跟Selector绑定在一起,需要将Channel注册到Selector上,调用Channel的 register() 方法即可:

channel.configureBlocking(false);

SelectionKey key = channel.register(selector, SelectionKey.OP_READ);

Channel必须是非阻塞模式才能注册到Selector上,所以,无法将一个FileChannel注册到Selector,因为FileChannel没有所谓的阻塞还是非阻塞模式,本文来源于工从号彤哥读源码。

注册的时候第二个参数传入的是监听的事件,一共有四种事件:

  • Connect
  • Accept
  • Read
  • Write

当Channel触发了某个事件,通常也叫作那个事件就绪了。比如,数据准备好可以读取了就叫作读就绪了,同样地,还有写就绪、连接就绪、接受就绪,当然后面两个不常听到。

在Java中,这四种监听事件是定义在 SelectionKey 中的:

  • SelectionKey.OP_READ,值为 1 << 0 = 0000 0001
  • SelectionKey.OP_WRITE,值 为 1 << 2 = 0000 0100
  • SelectionKey.OP_CONNECT,值为 1 << 3 = 0000 1000
  • SelectionKey.OP_ACCEPT,值为 1 << 4 = 0001 0000

所以,也可以通过 位或 命令监听多个感兴趣的事件:

int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

SelectionKey

正如上面所看到的,Channel注册到Selector后返回的是一个 SelectionKey ,所以 SelectionKey 又可以看作是Channel和Selector之间的一座桥梁,把两者绑定在了一起。

SelectionKey 具有以下几个重要属性:

  • interest set,感兴趣的事件集
  • ready set,就绪的事件集
  • 保存着的Channel
  • 保存着的Selector
  • attached object,附件

interest set

里面保存了注册Channel到Selector时传入的第二个参数,即感兴趣的事件集。

int interestSet = selectionKey.interestOps();

boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;

可以通过 位与 运算查看是否注册了相应的事件。

ready set

里面保存了就绪了的事件集。

int readySet = selectionKey.readyOps();
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();

可以通过 readyOps() 方法获取所有就绪了的事件,也可以通过 isXxxable() 方法检查某个事件是否就绪。

保存的Channel和Selector

Channel  channel  = selectionKey.channel();

Selector selector = selectionKey.selector();

通过 channel()selector() 方法可以获取绑定的Channel和Selector。

attachment

可以调用 attach(obj) 方法绑定一个对象到 SelectionKey 上,并在后面需要用到的时候通过 attachment() 方法取出绑定的对象,也可以翻译为 附件 ,它可以看作是数据传递的一种媒介,跟ThreadLocal有点类似,在前面绑定数据,在后面使用。

selectionKey.attach(theObject);

Object attachedObj = selectionKey.attachment();

当然,也可以在注册Channel到Selector的时候就绑定附件:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

Selector.select()

一旦将一个或多个Channel注册到Selector上了,我们就可以调用它的 select() 方法了,它会返回注册时感兴趣的事件中就绪的事件,本文来源于工从号彤哥读源码。

select()方法有三种变体:

  • select(),无参数,阻塞直到某个Channel有就绪的事件了才返回(当然是我们注册的感兴趣的事件)
  • select(timeout),带超时,阻塞直到某个Channel有就绪的事件了,或者超时了才返回
  • selectNow(),立即返回,不会阻塞,不管有没有就绪的Channel都立即返回

select()的返回值为int类型,表示两次select()之间就绪的Channel,即使上一次调用select()时返回的就绪Channel没有被处理,下一次调用select()也不会再返回上一次就绪的Channel。比如,第一次调用select()返回了一个就绪的Channel,但是没有处理它,第二次调用select()时又有一个Channel就绪了,那也只会返回1,而不是2。

Selector.selectedKeys()

一旦调用select()方法返回了有就绪的Channel,我们就可以使用 selectedKeys() 方法来获取就绪的Channel了。

Set<SelectionKey> selectedKeys = selector.selectedKeys();

然后,就可以遍历这些SelectionKey来查看感兴趣的事件是否就绪了:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {
    
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
}

最后,一定要记得调用 keyIterator.remove(); 移除已经处理的SelectionKey。

Selector.wakeup()

前面我们说了调用select()方法时,调用者线程会进入阻塞状态,直到有就绪的Channel才会返回。其实也不一定,wakeup()就是用来破坏规则的,可以在另外一个线程调用wakeup()方法强行唤醒这个阻塞的线程,这样select()方法也会立即返回。

如果调用wakeup()时并没有线程阻塞在select()上,那么,下一次调用select()将立即返回,不会进入阻塞状态。这跟LockSupport.unpark()方法是比较类似的。

Selector.close()

调用close()方法将会关闭Selector,同时也会将关联的SelectionKey失效,但不会关闭Channel。

举个栗子

UviuIjJ.jpg!web

public class EchoServer {
    public static void main(String[] args) throws IOException {
        // 创建一个Selector
        Selector selector = Selector.open();
        // 创建ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        // 绑定8080端口
        serverSocketChannel.bind(new InetSocketAddress(8080));
        // 设置为非阻塞模式,本文来源于工从号彤哥读源码
        serverSocketChannel.configureBlocking(false);
        // 将Channel注册到selector上,并注册Accept事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            // 阻塞在select上
            selector.select();

            // 如果使用的是select(timeout)或selectNow()需要判断返回值是否大于0

            // 有就绪的Channel
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            // 遍历selectKeys
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                // 如果是accept事件
                if (selectionKey.isAcceptable()) {
                    // 强制转换为ServerSocketChannel
                    ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                    SocketChannel socketChannel = ssc.accept();
                    System.out.println("accept new conn: " + socketChannel.getRemoteAddress());
                    socketChannel.configureBlocking(false);
                    // 将SocketChannel注册到Selector上,并注册读事件
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (selectionKey.isReadable()) {
                    // 如果是读取事件
                    // 强制转换为SocketChannel
                    SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                    // 创建Buffer用于读取数据
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    // 将数据读入到buffer中
                    int length = socketChannel.read(buffer);
                    if (length > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        // 将数据读入到byte数组中
                        buffer.get(bytes);

                        // 换行符会跟着消息一起传过来
                        String content = new String(bytes, "UTF-8").replace("\r\n", "");
                        if (content.equalsIgnoreCase("quit")) {
                            selectionKey.cancel();
                            socketChannel.close();
                        } else {
                            System.out.println("receive msg: " + content);
                        }
                    }
                }
                iterator.remove();
            }
        }
    }
}

总结

今天我们学习了Java NIO核心组件Selector,到这里,NIO的三个最重要的核心组件我们就学习完毕了,说实话,NIO这块最重要的还是思维的问题,时刻记着在NIO中一个线程是可以处理多个连接的。

看着Java原生NIO实现网络编程似乎也没什么困难的吗?那么为什么还要有Netty呢?下一章我们将正式进入Netty的学习之中,我们将在其中寻找答案。

最后,也欢迎来我的工从号 彤哥读源码 系统地学习 源码&架构 的知识。

uEv2IvV.png!web


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK