3

各种各样的I/O

 9 months ago
source link: https://www.ttalk.im/2023/11/all-about-io.html?amp%3Butm_medium=Atom
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

介绍常见的I/O,根据操作的阻塞或非阻塞类型,以及IO的准备就绪、完成事件通知的同步和异步类型,一共有四种不同方式的IO。同时介绍IO常见的设计模式

根据操作的阻塞或非阻塞类型,以及IO的准备就绪、完成事件通知的同步和异步类型,一共有四种不同方式的IO。

同步阻塞IO

在许多web server上,典型的一个连接一个thread的基础,这种类型是IO操作阻塞着应用程序直到完成。 当阻塞式的read方法或write方法被调用时,将有一次上下文切换至kernel中,IO操作会发生,数据会被复制进kernel的 buffer中。然后,kernel buffer会把数据转给用户空间里的应用程序级别的buffer,并且应用程序的thread会被标识为runnable的,此时应用程序会解锁可以从 用户空间的buffer中读取数据。 一个连接一个thread的模型想尝试减少强制一个连接给一个thread的阻塞影响,需要掌握剩下的并发连接不再被IO操作在同一个连接上阻塞。 当连接些都很短且数据延迟都不是很坏的时候这工作得很好。尽管如此,一旦连接变长且数据连接高延迟,可能性就是,线程些被连接长时间抓住不放,因为新连接 的饥饿。如果使用定长的线程池,直到阻塞的线程在阻塞状态中不能被重用以服务新的连接,如果每个新的连接用一个新的线程服务,或者会导致大量的线程会在系 统中被产生,这会演变为漂亮的资源争抢,为了完成高并发的负载,而高上下切换消费。

ServerSocket server = new ServerSocket(port);
while(true) {
    Socket connection = server.accept();
    //spawn-Thread-and-process(connection);
}

代码结构简单,很容易就实现了一个连接一个线程的服务器。

同步非阻塞IO

这个模型下,设备(网卡)或者连接被设置为非阻塞的,read()和write()操作将不会被阻塞。通常意味着,如果操作不能立即得到结论,将会 返回,带一个error code以指出操作会阻塞(POSIX标准是EWOULDBLOCK)或者是设备临时不可用(POSIX标准是EAGAIN)。由应用程序去检测,直到设 备准备好了并且所有数据被读到。尽量如此,这也不是非常高效,因为每次调用都会激起一次上下文切换给kernel,并且不会考虑数据有没被读到。

带就绪事件的异步非阻塞IO

前面的模型的问题在于,应用程序不得不检测,会忙于等待任务完成。当设备准备好被读写时,有更好的办法通知应用程序吗?这的确就是本模型所提供的好 处。使用特殊的系统调用(因平台而变-linux下使用select()/poll()/epoll(),BSD使用kqueue(),Solaris使 用/dev/poll),应用程序注册感兴趣的点收集IO就绪的信息,从特定的设备(在Linux下使用文件描述符,所有的sockets都被抽象使用了 文件描述符),特定的IO操作(读或写)。然后,这个系统调用被调用,至少其中一个被注册的文件描述符变成ready之前,这调用会被阻塞。一旦这个文件 描述符准备好做IO操作了,就会被取来当作系统调用的返回,然后系统调用就可以在应用程序的loop中被顺序地调用。 准备好的连接处理逻辑经常包括一个用户提供的事件handler,此handler会一起发起非阻塞的read()/write()调用,目的是从 设备取数据给kernel,最终给用户空间的buffer,这会激起上下文切换到kernel。无论如何,通常没有绝对保证,有可能会发生,设备上预期的 由操作系统提供的IO,只是一个指示,设备有可能准备好感兴趣的IO操作了,但read()或write()却不行。尽管如此,与标准情况相比这应该算异 常了。 所以,总结的办法就是,在异步流中获取就绪事件,注册一些事件处理器,当有类似的事件通知被触发的时候抓住他们。正如你所见,所有的事情都可以在一 个单独的线程中完成,即便从多个不同连接过来的多路传输,主要因为select()(这里我选择了典型的系统调用),已经是可以同一时间返回多个 sockets准备就绪的类型。同一时间在多个sockets上返回就绪,这只是一部分好处。这种类型就是经常没提供的非阻塞IO模型。 Java已经抽象出来平台特殊性系统调用的不同,实现了NIO API。Socket 文件描述符被用Channels和Selector抽象,封装到selection系统调用中。应用程序感兴趣的收集就绪事件,注册到Channel(通 常在ServerSocketChannel上accept()就得到一个SocketChannel),注册的内容是Selector,会得到 SelectionKey,这个SelectionKey就是作为一个handle,这个handle的作用是hold住Channel和注册信息。然后 阻塞的select()调用被设置在Selector,它会返回一系列的SelectionKey,然后一个接一个地被程序所指定的事件处理器所处理。

    Selector selector = Selector.open();
	channel.configureBlocking(false);
	SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
	while(true) {
	  int readyChannels = selector.select();
	  if(readyChannels == 0) continue;

	  Set<SelectionKey> selectedKeys = selector.selectedKeys();
	  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

	  while(keyIterator.hasNext()) {
	    SelectionKey key = keyIterator.next();

	    if(key.isAcceptable()) {
	        // a connection was accepted by a ServerSocketChannel.
	    } else if (key.isConnectable()) {
	        // a connection was established with a remote server.
	    } else if (key.isReadable()) {
	        // a channel is ready for reading
	    } else if (key.isWritable()) {
	        // a channel is ready for writing
	    }
	    keyIterator.remove();
	  }
	}

带完成事件的异步非阻塞IO

就绪事件只能做到通知你设备\socket准备好做事情的程度。应用程序依然不得不做脏活,为了从设备/socket中读数据(更准确地说是通过系 统调用指示操作系统),通过设备的各种思路将数据扔到用户空间的buffer。把任务代理给操作系统在后台运行,一旦完成了让它再通知你,包括从设备到 kernel的buffer再最终到应用程序级别的buffer传送所有的数据,这样岂不是很爽?这就是经常被提到的异步IO模型背后的基础想法。所以需 要操作系统层支持AIO操作。在Linux下从2.6开始在aio POSIX API中被支持,Windows下用I/O Completion Ports支持。 JAVA NIO2在AsynchronousChannel API中一点点支持此模型。

操作系统支持

为了支持就绪和完成事件通知,不同的操作系统提供了各种各样的系统调用。就绪事件 select()和poll()可以在Linux类的系统中使用。尽管如此,更新的epoll()变种更好,因为它比select()和poll()更有效率。当监控的文件描述符增长时,选择时间在线性增长,这一点上导致了select()不行。在复写文件描述符数组这事上已经臭名昭著。所以每次一被调 用,描述符数组就需要从一个单独的拷贝上重新构建。无论如何这都不是一个优雅的解决方案。 epoll()变体可以按两种办法被配置,边沿触发和层级触发。在边沿触发情况下,只有在相关的描述符上事件被检测到才会发出通知。说了在一个事件 触发通知期间,你的应用程序触发器只会读一半kernel的输入buffer。现在在这个描述符上不会得到通知,甚至到下一个时间周期,除非设备准备好发 更多的数据,否则有一点点数据可读的时候也不会有通知,有足够的数据的时候会导致一次文件描述符的事件。层级触发用另一方式配置,每次数据可读了都会触发 通知。 相比的系统调用还有BSD口味的kqueue,Solaris由于版本不同有/dev/poll或者”Event Completion”。Windows下等价的是“I/O Completion Ports”。尽管如此,Windows看上去通过“I/O Completion Ports”支持这个得了第一名。

IO模式101

在软件开发中到处是设计模式。I/O不一样。只有两种I/O模式,NIO和AIO,下面进行介绍。

Reactor模式

Reactor模式,是我们最常用的IO设计模式。其中

Reactor启动器:这是会初始化非阻塞服务器的组件,主要是配置和初始化分配器(dispatcher)。首先,它会bind出服务器的 socket,并且通过事件分派器(demultiplexer)注册,事件分派器作用是客户端连接接收就绪事件。然后就绪事件(读写接收等)的每种类型的事件处 理器实现会被注册到分配器(dispatcher)。下一次分配器事件loop过程会被调起来,以处理事件通知。

dispatcher:为注册、删除定义接口,分发事件处理器起作用,作用是响应连接事件,包括连接被接受、数据输入输出、一组连接上的超时事件。 为了服务一个客户端连接,相关的事件处理器(比如接受事件处理器)会被注册给被接受的客户端通道(在client socket其下包装),注册内容是事件分派器(demultiplexer),就绪事件类的都被会注册,以监听此特定的channel。然后,分配器线程会 调出阻塞的就绪选择操作,这些操作在demultiplexer之上,主要为剩下的注册通道。一旦一个或多个被注册的通道准备好IO,分配器会服务给相关 的每个准备好的通道一对一的用注册的事件处理器返回“Handle”。很重要的是,这些事件处理器不会hold住分配器线程,但是会延迟分配器服务其他准 备好的连接。因为常见的在事件处理器里的逻辑,包括传送数据从/去准备好的连接,这些连接会阻塞,一直到所有的数据在用户空间和内核数据缓存中被送完,一 般情况下,这些处理器跑在一个线程池的不同的线程里。

Handle:当一个channel被注册了事件分派器(demultiplexer)就会返回一个handle,handle概括了连接通道和就绪信息。靠事件分派器就绪选择操作,一系列的准备好的Handle会被返回。Java NIO里对等的叫SelectionKey。

Demultiplexer:等待在一个或多个注册的连接通道里的就绪事件。Java NIO里叫Selector。

Event Handler:指接口具有的hook方法,以分配连接事件。这些方法需要被应用程序指定的事件处理器所实现。

Concrete Event Handler:(具体的事件处理器)包括从连接中读写数据的逻辑,并且要做一些必须的过程,或者初始化客户端连接传过的接收协议,这些协议来自通过的Handle。

all-about-io-1.jpg

事件处理器典型地跑在一个线程池的单独的线程中,下面的图片中显示了这一过程。

all-about-io-2.jpg

让我们使用这个模式实现一个简单的echo server(没有事件处理器线程池)。

    public class ReactorInitiator {
        private static final int NIO_SERVER_PORT = 9993;

        public void initiateReactiveServer(int port) throws Exception {
	        ServerSocketChannel server = ServerSocketChannel.open();
	        server.socket().bind(new InetSocketAddress(port));
	        server.configureBlocking(false);

            Dispatcher dispatcher = new Dispatcher();
            dispatcher.registerChannel(SelectionKey.OP_ACCEPT, server);
	        dispatcher.registerEventHandler(SelectionKey.OP_ACCEPT,
                new AcceptEventHandler(dispatcher.getDemultiplexer()));
            dispatcher.registerEventHandler(SelectionKey.OP_READ, 
                new ReadEventHandler(dispatcher.getDemultiplexer()));
            dispatcher.registerEventHandler(SelectionKey.OP_WRITE,
                new WriteEventHandler());

            dispatcher.run(); // Run the dispatcher loop
        }

        public static void main(String[] args) throws Exception {
            System.out.println("Starting NIO server at port : " + NIO_SERVER_PORT);
	        new ReactorInitiator().initiateReactiveServer(NIO_SERVER_PORT);
        }
	}

    public class Dispatcher {

      private Map<Integer, EventHandler> registeredHandlers = new ConcurrentHashMap<Integer, EventHandler>();
	  private Selector demultiplexer;

	  public Dispatcher() throws Exception {
	    demultiplexer = Selector.open();
	  }

	  public Selector getDemultiplexer() {
	    return demultiplexer;
	  }

	  public void registerEventHandler(
	    int eventType, EventHandler eventHandler) {
	    registeredHandlers.put(eventType, eventHandler);
	  }

	  // Used to register ServerSocketChannel with the
	  // selector to accept incoming client connections
	  public void registerChannel(
	    int eventType, SelectableChannel channel) throws Exception {
	    channel.register(demultiplexer, eventType);
	  }

	  public void run() {
	    try {
	      while (true) { // Loop indefinitely
	        demultiplexer.select();

	        Set<SelectionKey> readyHandles = demultiplexer.selectedKeys();
	        Iterator<SelectionKey> handleIterator = readyHandles.iterator();

	        while (handleIterator.hasNext()) {
	          SelectionKey handle = handleIterator.next();

	          if (handle.isAcceptable()) {
	            EventHandler handler = registeredHandlers.get(SelectionKey.OP_ACCEPT);
                handler.handleEvent(handle);
	           // Note : Here we don't remove this handle from
	           // selector since we want to keep listening to
	           // new client connections
	          }

	          if (handle.isReadable()) {
	            EventHandler handler = registeredHandlers.get(SelectionKey.OP_READ);
	            handler.handleEvent(handle);
	            handleIterator.remove();
	          }

	          if (handle.isWritable()) {
	            EventHandler handler = registeredHandlers.get(SelectionKey.OP_WRITE);
	            handler.handleEvent(handle);
	            handleIterator.remove();
	          }
	        }
	      }
	    } catch (Exception e) {
	      e.printStackTrace();
	    }
	  }
	}

	public interface EventHandler {
        public void handleEvent(SelectionKey handle) throws Exception;
	}

	public class AcceptEventHandler implements EventHandler {
	  private Selector demultiplexer;
	  public AcceptEventHandler(Selector demultiplexer) {
	    this.demultiplexer = demultiplexer;
	  }

	  @Override
	  public void handleEvent(SelectionKey handle) throws Exception {
	    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) handle.channel();
	    SocketChannel socketChannel = serverSocketChannel.accept();
	    if (socketChannel != null) {
	      socketChannel.configureBlocking(false);
	      socketChannel.register(demultiplexer, SelectionKey.OP_READ);
	    }
	  }
	}

	public class ReadEventHandler implements EventHandler {

	  private Selector demultiplexer;
	  private ByteBuffer inputBuffer = ByteBuffer.allocate(2048);

	  public ReadEventHandler(Selector demultiplexer) {
	    this.demultiplexer = demultiplexer;
	  }

	  @Override
	  public void handleEvent(SelectionKey handle) throws Exception {
	    SocketChannel socketChannel = (SocketChannel) handle.channel();
	    socketChannel.read(inputBuffer); // Read data from client
	    inputBuffer.flip();
	    // Rewind the buffer to start reading from the beginning
	    byte[] buffer = new byte[inputBuffer.limit()];
	    inputBuffer.get(buffer);
	    System.out.println("Received message from client : " + new String(buffer));
	    inputBuffer.flip();
	    // Rewind the buffer to start reading from the beginning
	    // Register the interest for writable readiness event for
	    // this channel in order to echo back the message
	    socketChannel.register(demultiplexer, SelectionKey.OP_WRITE, inputBuffer);
	  }
	}

	public class WriteEventHandler implements EventHandler {
	  @Override
	  public void handleEvent(SelectionKey handle) throws Exception {
	    SocketChannel socketChannel =
	      (SocketChannel) handle.channel();
	    ByteBuffer inputBuffer = (ByteBuffer) handle.attachment();
	    socketChannel.write(inputBuffer);
	    socketChannel.close(); // Close connection
	  }
	}

Proactor模式

此模式基于异步IO模型。主要的组件如下。

Proactive启动器:这是初始化异步操作接收客户端连接的实体。经常是服务器应用程序的主线程。注册一个完成处理器,附着在完成分发器上,以逮到连接接收时的异步事件通知。

Asynchronous Operation Processor:异步操作处理器。其职责是异步地抓出IO操作,提供完成事件通知给应用层的完成处理器。操作系统通常会暴露异步IO接口。

Asynchronous Operation:异步操作的运行在独立的内核线程中,靠异步操作处理器来完成。

Completion Dispatcher:其职责是在异步操作完成时,唤回应用程序的完成处理器。当异步操作处理器完成了一次异步初始化操作,完成分发器会进行应用程序自行维护的回调。通常,委派事件通知处理给相对的事件合适的完成处理器。

Completion Handler:这是被实用程序实现的接口,用于处理异步事件完成events。

all-about-io-3.jpg

同样,让我们使用Java 7中的NIO2 API使用该模式实现一个echo server。

public class ProactorInitiator {
	  static int ASYNC_SERVER_PORT = 4333;

	  public void initiateProactiveServer(int port) throws IOException {

	    final AsynchronousServerSocketChannel listener =
            AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));

	     AcceptCompletionHandler acceptCompletionHandler = new AcceptCompletionHandler(listener);

	     SessionState state = new SessionState();
	     listener.accept(state, acceptCompletionHandler);
	  }

	  public static void main(String[] args) {
	    try {
            System.out.println("Async server listening on port : " + ASYNC_SERVER_PORT);
            new ProactorInitiator().initiateProactiveServer(ASYNC_SERVER_PORT);
	    } catch (IOException e) {
            e.printStackTrace();
	    }

	    // Sleep indefinitely since otherwise the JVM would terminate
	    while (true) {
	      try {
	        Thread.sleep(Long.MAX_VALUE);
	      } catch (InterruptedException e) {
	        e.printStackTrace();
	      }
	    }
	  }
	}

	public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, SessionState> {

	  private AsynchronousServerSocketChannel listener;

	  public AcceptCompletionHandler(AsynchronousServerSocketChannel listener) {
	    this.listener = listener;
	  }

	  @Override
	  public void completed(AsynchronousSocketChannel socketChannel,SessionState sessionState) {
	   // accept the next connection
       SessionState newSessionState = new SessionState();
	   listener.accept(newSessionState, this);

	   // handle this connection
	   ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
	   ReadCompletionHandler readCompletionHandler = new ReadCompletionHandler(socketChannel, inputBuffer);
	   socketChannel.read(inputBuffer, sessionState, readCompletionHandler);
	  }

	  @Override
	  public void failed(Throwable exc, SessionState sessionState) {
	   // Handle connection failure...
	  }

	}

	public class ReadCompletionHandler implements
	  CompletionHandler<Integer, SessionState> {

	   private AsynchronousSocketChannel socketChannel;
	   private ByteBuffer inputBuffer;

	   public ReadCompletionHandler(AsynchronousSocketChannel socketChannel,ByteBuffer inputBuffer) {
	     this.socketChannel = socketChannel;
	     this.inputBuffer = inputBuffer;
	   }

	   @Override
	   public void completed(
	     Integer bytesRead, SessionState sessionState) {

	     byte[] buffer = new byte[bytesRead];
	     inputBuffer.rewind();
	     // Rewind the input buffer to read from the beginning

	     inputBuffer.get(buffer);
	     String message = new String(buffer);

	     System.out.println("Received message from client : " + message);

	     // Echo the message back to client
	     WriteCompletionHandler writeCompletionHandler = new WriteCompletionHandler(socketChannel);

	     ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);

	     socketChannel.write(
	       outputBuffer, sessionState, writeCompletionHandler);
	  }

	  @Override
	  public void failed(Throwable exc, SessionState attachment) {
	    //Handle read failure.....
	   }

	}

	public class WriteCompletionHandler implements CompletionHandler<Integer, SessionState> {

	  private AsynchronousSocketChannel socketChannel;

	  public WriteCompletionHandler(AsynchronousSocketChannel socketChannel) {
	    this.socketChannel = socketChannel;
	  }

	  @Override
	  public void completed(Integer bytesWritten, SessionState attachment) {
	    try {
	      socketChannel.close();
	    } catch (IOException e) {
	      e.printStackTrace();
	    }
	  }

	  @Override
	  public void failed(Throwable exc, SessionState attachment) {
	   // Handle write failure.....
	  }

	}

	public class SessionState {

	  private Map<String, String> sessionProps = new ConcurrentHashMap<String, String>();

	   public String getProperty(String key) {
	     return sessionProps.get(key);
	   }

	   public void setProperty(String key, String value) {
	     sessionProps.put(key, value);
	   }
	}

每种类型的事件完成(接受、读、写)都会被一个单独的完成处理器handle,这个处理器实现了CompletionHandler接口(Accept/Read/WriteCompletionHandler等)。状态过渡被管理在这些连接处理器中。额外SessionState参数可以被用于hold客户端的session,待定的状态就可以跨这一系列的完成事件了。以NIO为基础实现的服务器,可以承载大量的客户端,但是相应的也会有一定的内存消耗,因此在使用的时候需要作好内存控制。

IO上有许多的选项可以做,可影响到服务器的扩展性和性能。上面的每种IO都有利有弊,做决定时要考虑扩展性和性能特征,以及利于管理。得到结论,长篇一张关于IO。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK