7

Go 中 TCP 连接读写 Deadline 实现

 8 months ago
source link: https://liqiang.io/post/how-go-implement-readwrite-deadline-with-tcp
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在写 Go 网络程序的时候,有个 TCP 连接的方法系列是我很常用的:SetDeadlineSetReadDeadlineSetWriteDeadline,他们的作用就是等待 TCP 连接的读写操作,如果在预设的时间点还没有读或者写操作的话,读的方法就会直接返回,并且返回一个 os.ErrDeadlineExceeded 类型的错误,这通常用于在判定连接是否已经失联的情况。

为什么要用 Deadline

对 TCP 有一些了解的同学可能都会问,TCP 不是有 Keepalive 么,为什么还需要单独用 Connection 的 Deadline。很明显,我们知道 TCP 的 keepalive 是系统级的,它的配置路径在 /proc/sys/net/ipv4/ 目录下:



  1. [[email protected]]# ls -al /proc/sys/net/ipv4/tcp_keepalive_*
  2. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_intvl
  3. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_probes
  4. -rw-r--r-- 1 root root 0 Oct 29 11:56 /proc/sys/net/ipv4/tcp_keepalive_time

如果选择用 TCP 的 keepalive 配置,那么你只能通过系统的全局配置来生效,但是这在企业级网络编程中都是不太实际的,每个应用可能都有自己的独特需求,所以不能一概而论使用同一份配置。

如何使用 Deadline

一个简单的使用 Deadline 的 demo 为:



  1. [[email protected]]# cat test.go
  2. func (c *ApplicationConnection) readLoop() {
  3. var err error
  4. c.conn, err = net.Dial("tcp", c.GetDialerAddr())
  5. if err != nil {
  6. c.logger.Error(c.ctx, "connect to %s: %v", d.GetDialerAddr(), err)
  7. return
  8. }
  9. c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  10. for {
  11. bytes, err := io.ReadAll(c.conn)
  12. if err != nil {
  13. if errors.Is(err, os.ErrDeadlineExceeded) {
  14. c.logger.Trace(c.ctx, "Read timeout, send a heartbeat message")
  15. c.heartbeat()
  16. continue
  17. }
  18. c.logger.Error(c.ctx, "Failed to copy data from listener to Dialer: %v", err)
  19. return
  20. }
  21. c.conn.SetReadDeadline(time.Now().Add(time.Second * 10))
  22. c.process(bytes)
  23. }
  24. }

这里我们可以设置一个等待时间,如果在预期时间内连接没有数据可以读取,那么就会返回一个错误,我们即可根据需要对连接进行处理;如果在这期间有数据可以读取,那么我们需要注意要重置一下 Deadline 的值,不然,这个值还是会有效的。

如何实现 Deadline

在知道如何使用之后,接下来的问题就是 Go 又是如何实现 Connection 的 Deadline 的呢?在之前 Go 的源码分析中,我们知道了在网络上 Go 底层还是使用的 epoll,那么丢与 Deadline ,Go runtime 又是如何做到的?

我可以想象的一个解决方法是将 Connection 包装成一个 struct 结构,struct 结构里面包含原始的连接信息,deadline 信息(一个 timer),以及 timer 处理函数,然后在 epoll 中添加对应的 timer,这样,当 timer 被 trigger 的时候,就表示这期间都没有可读的事件,所以可以直接调用处理函数,从而达到 deadline 的效果。

但是,Go 如何实现的,还是要具体地看代码(这里我依照之前的 Go 源码分析逻辑,看的是 1.12 的代码):



  1. [[email protected]]# internal/poll/fd_poll_runtime.go
  2. func (fd *FD) SetReadDeadline(t time.Time) error {
  3. return setDeadlineImpl(fd, t, 'r')
  4. ---> internal/poll/fd_poll_runtime.go
  5. func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
  6. ... ...
  7. runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
  8. ---> runtime/netpoll.go
  9. func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
  10. ... ...
  11. netpollgoready(rg, 3)
  12. ---> runtime/netpoll.go
  13. ... ...
  14. pd.rt.f = rtf
  15. pd.rt.when = pd.rd
  16. pd.rt.arg = pd
  17. pd.rt.seq = pd.rseq
  18. addtimer(&pd.rt)

可以看到,Go 的实现大体上和我的设想差不多,只不过多了更多的条件保障之类的。

C++ 如何实现

最近在尝试 C++ 编写一些网络程序,所以自然而然我也想看下 C++ 是否支持类似的功能,但是好像没有直接支持的方式,于是我就基于 libevent 自己实现了一下,代码主要分为几部分:

  • 连接封装结构


  1. [[email protected]]# cat echo_server.cpp
  2. struct TimeoutConnection {
  3. long int last_read_ts;
  4. std::string name;
  5. evutil_socket_t fd;
  6. struct event_base *base;
  7. struct bufferevent *bev;
  8. struct event *timeout_event;
  9. };
  • 连接建立部分


  1. static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) {
  2. std::cout << get_current_time() << " listener_cb" << std::endl;
  3. struct event_base *base = (event_base *) user_data;
  4. struct bufferevent *bev;
  5. bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
  6. if (!bev) {
  7. std::cerr << get_current_time() << " failed to constructing bufferevent!" << std::endl;
  8. event_base_loopbreak(base);
  9. return;
  10. }
  11. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) malloc(sizeof(struct TimeoutConnection));
  12. timeoutConn->last_read_ts = std::time(nullptr);
  13. timeoutConn->name = "test";
  14. timeoutConn->fd = fd;
  15. timeoutConn->base = base;
  16. timeoutConn->bev = bev;
  17. timeoutConn->timeout_event = event_new(base, -1, EV_PERSIST, timeout_cb, timeoutConn);
  18. bufferevent_setcb(bev, conn_readcb, NULL, conn_eventcb, timeoutConn);
  19. bufferevent_enable(bev, EV_READ);
  20. struct timeval tv = {.tv_sec = 5, .tv_usec = 0};
  21. auto result = event_add(timeoutConn->timeout_event, &tv);
  22. if (result != 0) {
  23. std::cout << "event_add failed" << std::endl;
  24. }
  25. }
  • 数据读取部分


  1. [[email protected]]# cat echo_server.cpp
  2. static void conn_readcb(struct bufferevent *bev, void *ptr) {
  3. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) ptr;
  4. timeoutConn->last_read_ts = std::time(nullptr);
  5. char buf[1024];
  6. int n;
  7. struct evbuffer *input = bufferevent_get_input(bev);
  8. while ((n = evbuffer_remove(input, buf, sizeof(buf))) > 0) {
  9. std::cout << get_current_time() << " connection " << timeoutConn->name << " recv: " << buf << std::endl;
  10. bufferevent_write(bev, buf, n);
  11. }
  12. }
  • timer 实现部分


  1. [[email protected]]# cat echo_server.cpp
  2. static void timeout_cb(evutil_socket_t fd, short what, void *arg) {
  3. std::cout << get_current_time() << " connection timeout invoked" << std::endl;
  4. auto currTs = std::time(nullptr);
  5. struct TimeoutConnection *timeoutConn = (struct TimeoutConnection *) arg;
  6. if (currTs - timeoutConn->last_read_ts > 10) {
  7. std::cout << get_current_time() << " connection " << timeoutConn->name << " timeout" << std::endl;
  8. std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
  9. bufferevent_free(timeoutConn->bev);
  10. event_free(timeoutConn->timeout_event);
  11. free(timeoutConn);
  12. return;
  13. }
  14. std::cout << get_current_time() << " connection " << timeoutConn->name << " last read at: " << timeoutConn->last_read_ts << std::endl;
  15. }

到这里差不多就了解了这部分的实现了,倒是也不怎么复杂。但是,可以深究的事情也很多,例如 Go 底层的 timer 是如何实现的这些,但是这些我以前也写过类似的(Linux 实现定时器),所以就不过多深究了。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK