3

完整版分享-C++从0实现百万并发Reactor服务器

 7 months ago
source link: https://studygolang.com/articles/36520
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

C++从0实现百万并发Reactor服务器

//xia仔ke:百度网盘

单线程Reactor形式流程: clipboard.png

①效劳器端的Reactor是一个线程对象,该线程会启动事情循环,并运用Selector(选择器)来完成IO的多路复用。channel注册一个Acceptor事情处置器到Reactor中,Acceptor事情处置器所关注的事情是ACCEPT事情,这样Reactor会监听客户端向效劳器端发起的衔接恳求事情(ACCEPT事情)。 ②客户端向效劳器端发起一个衔接恳求,Reactor监听到了该ACCEPT事情的发作并将该ACCEPT事情派发给相应的Acceptor处置器来停止处置。Acceptor处置器经过accept()办法得到与这个客户端对应的衔接(SocketChannel),然后将该衔接所关注的READ事情以及对应的READ事情处置器注册到Reactor中,这样一来Reactor就会监听该衔接的READ事情了。 ③当Reactor监听到有读或者写事情发作时,将相关的事情派发给对应的处置器停止处置。比方,读处置器会经过SocketChannel的read()办法读取数据,此时read()操作能够直接读取到数据,而不会梗塞与等候可读的数据到来。 ④每当处置完一切就绪的感兴味的I/O事情后,Reactor线程会再次执行select()阻塞等候新的事情就绪并将其分派给对应处置器停止处置。 留意,Reactor的单线程形式的单线程主要是针关于I/O操作而言,也就是一切的I/O的accept()、read()、write()以及connect()操作都在一个线程上完成的。

基于单线程反响器形式手写一个NIO通讯 先简单引见NIO中几个重要对象: Selector

Selector的英文含义是“选择器”,也能够称为为“轮询代理器”、“事情订阅器”、“channel容器管理机”都行。 事情订阅和Channel管理: 应用程序将向Selector对象注册需求它关注的Channel,以及详细的某一个Channel会对哪些IO事情感兴味。Selector中也会维护一个“曾经注册的Channel”的容器。 Channels 通道,被树立的一个应用程序和操作系统交互事情、传送内容的渠道(留意是衔接到操作系统)。那么既然是和操作系统停止内容的传送,那么阐明应用程序能够经过通道读取数据,也能够经过通道向操作系统写数据。

一切被Selector(选择器)注册的通道,只能是继承了SelectableChannel类的子类。 ServerSocketChannel:应用效劳器程序的监听通道。只要经过这个通道,应用程序才干向操作系统注册支持“多路复用IO”的端口监听。同时支持UDP协议和TCP协议。 ScoketChannel:TCP Socket套接字的监听通道,一个Socket套接字对应了一个客户端IP:端口 到 效劳器IP:端口的通讯衔接。 DatagramChannel:UDP 数据报文的监听通道。 通道中的数据总是要先读到一个Buffer,或者总是要从一个Buffer中写入。

效劳端处置器:

  • 类阐明:nio通讯效劳端处置器 / public class NioServerHandle implements Runnable{ private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean started; /*
    • @param port 指定要监听的端口号 */ public NioServerHandle(int port) { try {
       selector = Selector.open();
       serverChannel = ServerSocketChannel.open();
       serverChannel.configureBlocking(false);
       serverChannel.socket().bind(new InetSocketAddress(port));
       serverChannel.register(selector,SelectionKey.OP_ACCEPT);
       started = true;
       System.out.println("效劳器已启动,端口号:"+port);
      
      } catch (IOException e) {
       e.printStackTrace();
      
      } } public void stop(){ started = false; } @Override public void run() { //循环遍历selector while(started){
       try{
           //阻塞,只要当至少一个注册的事情发作的时分才会继续.
           selector.select();
           Set keys = selector.selectedKeys();
           Iterator it = keys.iterator();
           SelectionKey key = null;
           while(it.hasNext()){
               key = it.next();
               it.remove();
               try{
                   handleInput(key);
               }catch(Exception e){
                   if(key != null){
                       key.cancel();
                       if(key.channel() != null){
                           key.channel().close();
                       }
                   }
               }
           }
       }catch(Throwable t){
           t.printStackTrace();
       }
      
      } //selector关闭后会自动释放里面管理的资源 if(selector != null)
       try{
           selector.close();
       }catch (Exception e) {
           e.printStackTrace();
       }
      
      } private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){
       //处置新接入的恳求音讯
       if(key.isAcceptable()){
           //取得关怀当前事情的channel
           ServerSocketChannel ssc = (ServerSocketChannel)key.channel();
           //经过ServerSocketChannel的accept创立SocketChannel实例
           //完成该操作意味着完成TCP三次握手,TCP物理链路正式树立
           SocketChannel sc = ssc.accept();
           System.out.println("======socket channel 树立衔接" );
           //设置为非阻塞的
           sc.configureBlocking(false);
           //衔接曾经完成了,能够开端关怀读事情了
           sc.register(selector,SelectionKey.OP_READ);
       }
       //读音讯
       if(key.isReadable()){
           System.out.println("======socket channel 数据准备完成," +
                   "能够去读==读取=======");
           SocketChannel sc = (SocketChannel) key.channel();
           //创立ByteBuffer,并开拓一个1M的缓冲区
           ByteBuffer buffer = ByteBuffer.allocate(1024);
           //读取恳求码流,返回读取到的字节数
           int readBytes = sc.read(buffer);
           //读取到字节,对字节停止编解码
           if(readBytes>0){
               //将缓冲区当前的limit设置为position=0,
               // 用于后续对缓冲区的读取操作
               buffer.flip();
               //依据缓冲区可读字节数创立字节数组
               byte[] bytes = new byte[buffer.remaining()];
               //将缓冲区可读字节数组复制到新建的数组中
               buffer.get(bytes);
               String message = new String(bytes,"UTF-8");
               System.out.println("效劳器收到音讯:" + message);
               //处置数据
               String result = response(message) ;
               //发送应对音讯
               doWrite(sc,result);
           }
           //链路曾经关闭,释放资源
           else if(readBytes<0){
               key.cancel();
               sc.close();
           }
       }
      
      } } //发送应对音讯 private void doWrite(SocketChannel channel,String response)
       throws IOException {
      
      //将音讯编码为字节数组 byte[] bytes = response.getBytes(); //依据数组容量创立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } } public class NioServer { private static NioServerHandle nioServerHandle; public static void start(){ if(nioServerHandle !=null)
       nioServerHandle.stop();
      
      nioServerHandle = new NioServerHandle(DEFAULT_PORT); new Thread(nioServerHandle,"Server").start(); } public static void main(String[] args){ start(); } } 客户端处置器:

public class NioClientHandle implements Runnable{ private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean started; public NioClientHandle(String ip, int port) { this.host = ip; this.port = port; try { //创立选择器 selector = Selector.open(); //翻开通道 socketChannel = SocketChannel.open(); //假如为 true,则此通道将被置于阻塞形式; // 假如为 false,则此通道将被置于非阻塞形式 socketChannel.configureBlocking(false); started = true; } catch (IOException e) { e.printStackTrace(); } } public void stop(){ started = false; } @Override public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } //循环遍历selector while(started){ try { //阻塞,只要当至少一个注册的事情发作的时分才会继续 selector.select(); //获取当前有哪些事情能够运用 Set keys = selector.selectedKeys(); //转换为迭代器 Iterator it = keys.iterator(); SelectionKey key = null; while(it.hasNext()){ key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { e.printStackTrace(); if(key!=null){ key.cancel(); if(key.channel()!=null){ key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //selector关闭后会自动释放里面管理的资源 if(selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } //详细的事情处置办法 private void handleInput(SelectionKey key) throws IOException{ if(key.isValid()){ //取得关怀当前事情的channel SocketChannel sc = (SocketChannel)key.channel(); if(key.isConnectable()){//衔接事情 if(sc.finishConnect()){} else{System.exit(1);} } //有数据可读事情 if(key.isReadable()){ //创立ByteBuffer,并开拓一个1M的缓冲区 ByteBuffer buffer = ByteBuffer.allocate(1024); //读取恳求码流,返回读取到的字节数 int readBytes = sc.read(buffer); //读取到字节,对字节停止编解码 if(readBytes>0){ //将缓冲区当前的limit设置为position,position=0, // 用于后续对缓冲区的读取操作 buffer.flip(); //依据缓冲区可读字节数创立字节数组 byte[] bytes = new byte[buffer.remaining()]; //将缓冲区可读字节数组复制到新建的数组中 buffer.get(bytes); String result = new String(bytes,"UTF-8"); System.out.println("accept message:"+result); }else if(readBytes<0){ key.cancel(); sc.close(); } } } } //发送音讯 private void doWrite(SocketChannel channel,String request) throws IOException { //将音讯编码为字节数组 byte[] bytes = request.getBytes(); //依据数组容量创立ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); //将字节数组复制到缓冲区 writeBuffer.put(bytes); //flip操作 writeBuffer.flip(); //发送缓冲区的字节数组 channel.write(writeBuffer); } private void doConnect() throws IOException { /假如此通道处于非阻塞形式, 则调用此办法将启动非阻塞衔接操作。 假如立刻树立衔接,就像本地衔接可能发作的那样,则此办法返回true。 否则,此办法返回false, 稍后必需经过调用finishConnect办法完成衔接操作。/ if(socketChannel.connect(new InetSocketAddress(host,port))){} else{ //衔接还未完成,所以注册衔接就绪事情,向selector表示关注这个事情 socketChannel.register(selector,SelectionKey.OP_CONNECT); } } //写数据对外暴露的API public void sendMsg(String msg) throws Exception{ socketChannel.register(selector,SelectionKey.OP_READ); doWrite(socketChannel,msg); } } public class NioClient { private static NioClientHandle nioClientHandle; public static void start(){ if(nioClientHandle !=null) nioClientHandle.stop(); nioClientHandle = new NioClientHandle(DEFAULT_SERVER_IP,DEFAULT_PORT); new Thread(nioClientHandle,"Client").start(); } //向效劳器发送音讯 public static boolean sendMsg(String msg) throws Exception{ nioClientHandle.sendMsg(msg); return true; } public static void main(String[] args) throws Exception { start(); System.out.println("请输入恳求信息:"); Scanner scanner = new Scanner(System.in); while(NioClient.sendMsg(scanner.next())); } } 效劳端过程:

启动效劳端,完成一些初始化工作,ServerSocketChannel绑定端口并且注册承受衔接事情. 循环里selector.select()阻塞,只要当至少一个注册的事情发作的时分才会继续,循环里面处置发作的注册事情 注册事情发作时交给处置器,若为承受衔接则accept取出socketChannel并完成衔接,然后就是关注read读取事情即注册,有数据读取了则处置器读取恳求数据并返回. 客户端过程:

启动客户端,完成一些初始化工作. 依据效劳端ip及端口发起衔接. 往效劳端发送数据,并注册read读取事情 循环里selector.select()阻塞,只要当至少一个注册的事情发作的时分才会继续,循环里面处置发作的注册事情. 注册事情发作时交给处置器,若为衔接事情并且衔接胜利则跳过即不予处置等候读取事情发送. 初始化工作如翻开selector,channel,设置通道形式能否阻塞.

单线程Reactor,工作者线程池 但在单线程Reactor形式中,不只I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上停止处置了,这可能会大大延迟I/O恳求的响应。所以我们应该将非I/O的业务逻辑操作从Reactor线程上卸载,以此来加速Reactor线程对I/O恳求的响应.

clipboard.png

添加了一个工作者线程池,并将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样可以进步Reactor线程的I/O响应,不至于由于一些耗时的业务逻辑而延迟对后面I/O恳求的处置。

改良的版本中,所以的I/O操作照旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作。 关于一些小容量应用场景,能够运用单线程模型。但是关于高负载、大并发或大数据量的应用场景却不适宜,主要缘由如下:

① 一个NIO线程同时处置成百上千的链路,性能上无法支撑,即使NIO线程的CPU负荷到达100%,也无法满足海量音讯的读取和发送; ②当NIO线程负载过重之后,处置速度将变慢,这会招致大量客户端衔接超时,超时之后常常会停止重发,这愈加重了NIO线程的负载,最终会招致大量音讯积压和处置超时,成为系统的性能瓶颈; 多Reactor线程形式 clipboard.png

Reactor线程池中的每一Reactor线程都会有本人的Selector、线程和分发的事情循环逻辑。 mainReactor能够只要一个,但subReactor普通会有多个。mainReactor线程主要担任接纳客户端的衔接恳求,然后将接纳到的SocketChannel传送给subReactor,由subReactor来完成和客户端的通讯。

①注册一个Acceptor事情处置器到mainReactor中,Acceptor事情处置器所关注的事情是ACCEPT事情,这样mainReactor会监听客户端向效劳器端发起的衔接恳求事情(ACCEPT事情)。启动mainReactor的事情循环。 ②客户端向效劳器端发起一个衔接恳求,mainReactor监听到了该ACCEPT事情并将该ACCEPT事情派发给Acceptor处置器来停止处置。Acceptor处置器经过accept()办法得到与这个客户端对应的衔接(SocketChannel),然后将这个SocketChannel传送给subReactor线程池。 ③subReactor线程池分配一个subReactor线程给这个SocketChannel,即,将SocketChannel关注的READ事情以及对应的READ事情处置器注册到subReactor线程中。当然你也注册WRITE事情以及WRITE事情处置器到subReactor线程中以完成I/O写操作。Reactor线程池中的每一Reactor线程都会有本人的Selector、线程和分发的循环逻辑。 ④当有I/O事情就绪时,相关的subReactor就将事情派发给响应的处置器处置。留意,这里subReactor线程只担任完成I/O的read()操作,在读取到数据后将业务逻辑的处置放入到线程池中完成,若完成业务逻辑后需求返回数据给客户端,则相关的I/O的write操作还是会被提交回subReactor线程来完成。 留意,所以的I/O操作(包括,I/O的accept()、read()、write()以及connect()操作)照旧还是在Reactor线程(mainReactor线程 或 subReactor线程)中完成的。Thread Pool(线程池)仅用来处置非I/O操作的逻辑。 多Reactor线程形式将“承受客户端的衔接恳求”和“与该客户端的通讯”分在了两个Reactor线程来完成。mainReactor完成接纳客户端衔接恳求的操作,它不担任与客户端的通讯,而是将树立好的衔接转交给subReactor线程来完成与客户端的通讯,这样一来就不会由于read()数据量太大而招致后面的客户端衔接恳求得不到即时处置的状况。并且多Reactor线程形式在海量的客户端并发恳求的状况下,还能够经过完成subReactor线程池来将海量的衔接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK