1

java | nio-selector 写入事件

 1 year ago
source link: https://benpaodewoniu.github.io/2023/01/08/java183/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

java | nio-selector 写入事件

可写事件。向客户端发送数据。

引入自写的方法类。

下载 JAVA 文件

netty 包

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.86.Final</version>
</dependency>
package com.redisc;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8090));

//接收数据
int count = 0;
while (true) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
System.out.println(count);
buffer.clear();
}
}
}
package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);

// 想客户端发送数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值代表实际写入的字节数
// 因为有时候数据太大,不能一次性写入
while (buffer.hasRemaining()) {
// 返回值代表实际写入的字节数
int wirte = sc.write(buffer);
System.out.println(wirte);
}
}

}

}
}

}
  • 开启服务端
  • 开启客户端

服务端输出

14:17:05.106 [main] DEBUG c.Test - register key:sun.nio.ch.SelectionKeyImpl@497470ed
14:17:09.349 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@497470ed
261676
1136400
0
0
1080096
0
0
0
0
0
0
0
0
0
0
521828

之所以有这么多 0,是因为缓冲区已经写满了,所以需要等待。

所以,这里有一个改进的地方,就是缓冲区满了,应该去干别的,而不是一直等待。

如果数据量太大,增加一个可写事件,进行后续触发。

package com.redisc;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

@Slf4j(topic = "c.Test")
public class Run {

public static void main(String[] args) throws IOException {
// 创建 selector,管理多个 channel
Selector selector = Selector.open();
// 创建服务器
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将阻塞模式切换为非阻塞模式

// 建立 selector 和 channel 的联系「注册」
// SelectionKey 事件发生后,通过它知道事件和哪个 channel 事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("register key:{}", sscKey);

//绑定端口
ssc.bind(new InetSocketAddress(8090));
while (true) {
// selector 方法,没有事件发生,线程阻塞,有事件发生,线程才会恢复运行
// selector 在事件未处理的时候,不会阻塞
selector.select();
// 处理事件,selectorKeys 内部包含了所有发生事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理 key 时,要从 selectKeys 集合中删除,否则下次处理就会出问题
iter.remove();
log.debug("key:{}", key);
// 区分事件类型
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, 0, null);

// 想客户端发送数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
// 返回值代表实际写入的字节数
int wirte = sc.write(buffer);
System.out.println(wirte);

// 判断是否有剩余内容
if (buffer.hasRemaining()) {
// 关注可写事件
// sckey.interestOps() 之前的事件,防止 OP_WRITE 覆盖了之前的事件
// 类似于文件的权限,多少代表读、写、读写
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
//将未写完的数据挂到 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println(write);

// buffer 清理,如果不清理,数据太大的话,一直占据内存
if (!buffer.hasRemaining()) {
key.attach(null);
// 不需要关注可写事件
// 因为只有数据量非常大的时候才会关注这个
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
}
}

}

}
}

}
15:23:52.898 [main] DEBUG c.Test - register key:sun.nio.ch.SelectionKeyImpl@497470ed
15:23:57.351 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@497470ed
261676
15:23:57.425 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b
1120068
15:23:57.426 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b
1071020
15:23:57.427 [main] DEBUG c.Test - key:sun.nio.ch.SelectionKeyImpl@61e4705b
547236

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK