5

java | nio-selector read 事件 消息边界

 1 year ago
source link: https://benpaodewoniu.github.io/2023/01/08/java182/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

java | nio-selector read 事件 消息边界

如果,ByteBuffer 的长度小于客户端发送数据长度,就会出现消息边界问题。

引入自写的方法类。

下载 JAVA 文件

netty 包

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
package com.redisc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8090));
System.out.println("waiting...");
}
}
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
ByteBuffer buffer = ByteBuffer.allocate(4);
int read = channel.read(buffer);
if (read > 0) {
buffer.flip();
System.out.println(Charset.defaultCharset().decode(buffer));
debugRead(buffer);
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

}

}

}
}

}
  • 服务端开启
  • 客户端,第 11 行断点,debug 开启
    • sc 变量执行
    • sc.write(Charset.defaultCharset().encode("中国"))
  • 服务端 乱码

这是因为 jvm 默认为 UTF-8 编码,中文字每个字是 3 字节,所以,4 字节下国字解析不出来。

处理消息边界

  • 消息长度和 ByteBuffer 一样大
    • 客户端和服务端约定同一大小,大小不够的,客户端补齐
      • 所以,可能造成空间的浪费
  • 按分隔符拆分
  • 协议方式
    • 数据由两部分组成,比如前 4 个字节存储数据长度,后面就是数据本身

这里举例是根据 「处理消息边界的第二种方式:按分隔符拆分」。

package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugAll;
import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
SelectionKey scKey = sc.register(selector, 0, null);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
ByteBuffer buffer = ByteBuffer.allocate(16);
int read = channel.read(buffer);
if (read > 0) {
split(buffer);
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

}

}

}
}

private static void split(ByteBuffer source) {
// 读模式
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
int length = i + 1 - source.position();// 一条消息的长度
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
// 写模式
source.compact();
}

}
  • 开启服务端
  • 开启 debug11 行断点的客户端
    • sc 变量发送
    • sc.write(Charset.defaultCharset().encode("hello\nworld\n"))

如过 sc 变量发送

sc.write(Charset.defaultCharset().encode("123456789012345qwert\n"))
+--------+-------------------- all ------------------------+----------------+
position: [5], limit: [5]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 77 65 72 74 0a |wert. |
+--------+-------------------------------------------------+----------------+

前面的 16 字符没有了。

解决方案: 附件

之前的代码

SelectionKey scKey = sc.register(selector, 0, null);

代码中,那个 null 就是附件,这个附件可以是一个 bytebuff,每一个 SelectKey 对应自己的附件。

如果数据过大,那么,可以进行附件扩容,直到把所有的数据读取完毕。

修改服务器代码

package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

import static com.redisc.ByteBufferUtil.debugAll;
import static com.redisc.ByteBufferUtil.debugRead;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 附件
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
} else if (key.isReadable()) {
try {
// 读取事件
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件
// 获取 selectionkey 关联附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer);
if (read > 0) {
split(buffer);
// 如果 position 和 limit 相同说明已经执行了 compact 了
// 说明没有遇到 '\n'
if (buffer.position() == buffer.limit()) {
// 进行扩容
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
// 重新绑定 bytebuff
key.attach(newBuffer);
}
} else {
log.debug("客户端关闭");
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要把 key 取消
}

}

}

}
}

private static void split(ByteBuffer source) {
// 读模式
source.flip();
for (int i = 0; i < source.limit(); i++) {
if (source.get(i) == '\n') {
int length = i + 1 - source.position();// 一条消息的长度
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
// 写模式
source.compact();
}

}

客户端再次执行相同的动作。

+--------+-------------------- all ------------------------+----------------+
position: [21], limit: [21]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 35 36 37 38 39 30 31 32 33 34 35 71 |123456789012345q|
|00000010| 77 65 72 74 0a |wert. |
+--------+-------------------------------------------------+----------------+

bytebuffer 大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 Bytebuff 是线程不安全的,所以,不能被多个 channel 使用,因为需要为每个 channel 维护一个 ByteBuffer「附件」
  • Bytebuffer 不能太大
    • 一种思路是,Bytebuffer 默认为 4K,不够尽兴扩容,但是,拷贝数据耗费时间
    • 一个思路是,多个数组组成 buffer,一个数组不够,把多出的内容写到数组中,不需要拷贝,但是,数据不连续,解析复杂

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK