3

Redis(5.0.3)事件驱动与连接管理

 2 years ago
source link: http://cbsheng.github.io/posts/redis%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E4%B8%8E%E8%BF%9E%E6%8E%A5%E7%AE%A1%E7%90%86/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Redis(5.0.3)事件驱动与连接管理


事件驱动原理和设计

A simple event-driven programming library. Originally I wrote this code for the Jim’s event-loop (Jim is a Tcl interpreter) but later translated it in form of a library for easy reuse.

— redis/src/ae.c Redis的事件驱动实现采用了实现与接口分离。针对不同系统平台,使用了不同的IO多路复用函数。共四种,文件分别是:

  1. ae_evport.c
  2. ae_epoll.c
  3. ae_kqueue.c
  4. ae_select.c

平台同时支持多种IO函数呢?按照性能Redis定了一个引入顺序,按照性能从高到低include。

#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

ae在Redis里是事件驱动器的名字,取自A simple event-driven?

事件驱动有个核心数据类型aeEventLoop。它就像一个管理器,所有ae对上层代码提供的接口,包括对不同IO复用函数封装的方法,都需要操作aeEventLoop。

// ae.h
// 这些就是ae上层代码提供的所有接口
// 上层代码只管调用,不用关心底层用了哪种IO多路复用方案
aeEventLoop *aeCreateEventLoop(int setsize);
void aeDeleteEventLoop(aeEventLoop *eventLoop);
void aeStop(aeEventLoop *eventLoop);
// 文件事件相关
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
// 时间事件相关
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);

int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize)

事件驱动器在Redis全局只有一个,它被创建后,放在server.el里,server也是全局唯一对象。总得有个地方放置服务器相关的所有信息。

// src/server.h
struct redisServer {
  // ...
  aeEventLoop *el
  // ...
}

// src/server.c
void initServer(void) {
  // ...
  // 传参是最大允许多少个client建连,默认是不限制
  server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
  // ...
}

不同的IO复用方案进行封装,对ae提供一套一致的接口。以ae_epoll.c举例

// 类似构造函数
static int aeApiCreate(aeEventLoop *eventLoop) {}
// 调整大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize) {}
// 类似析构函数,释放资源
static void aeApiFree(aeEventLoop *eventLoop) {}
// 针对某个文件描述符添加关注的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {}
// 删除关心的文件描述符 或 删除文件描述符上某个事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {}
// 带timeout形式,阻塞等待获取可以读/写的文件描述符
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {}
// 获取IO复用函数名称
static char *aeApiName(void) {}

就是这样:

  1. (封装好的)IO复用函数 对 ae提供服务;
  2. 而(封装好的)ae 对 server提供服务;

常见的分层设计。

aeEventLoop

开始也说到这是个核心类型。可以看到无论封装的IO复用函数还是ae,第一参数都是aeEventLoop类型。

typedef struct aeEventLoop {
    int maxfd;   // 当前注册的最大文件描述符
    int setsize; // 关注的文件描述符上限
    long long timeEventNextId; // 时间事件唯一ID递增器
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; // 监听读写事件IO事件的文件描述符列表
    aeFiredEvent *fired; // 事件就绪有可读写IO事件的文件描述符列表
    aeTimeEvent *timeEventHead; // 时间事件(定时任务)列表(链表)
    int stop; // 是否停止事件循环器
    void *apidata; // 不同的IO复用函数,poll方法需要参数类型不一样。apidata专门放置这些传参类型
    aeBeforeSleepProc *beforesleep; // 事件循环器 新一轮循环前的钩子函数
    aeBeforeSleepProc *aftersleep; // 事件循环器 一轮循环后的钩子函数
} aeEventLoop;

它实际也是一个事件循环器。Redis中支持两种事件类型,分别是IO事件和时间事件。时间事件其实就是定时任务。

这个事件循环器一旦通过aeMain()启动后,就只能调用aeStop()才能停下来

// 启动事件循环器
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 停止后就退出循环
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        // 处理IO事件与时间事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

循环器就在一个forever的while里,执行aeProcessEvents()

aeProcessEvents()干了几件事:(代码比较长就不贴了)

  1. 先检查有没有等待执行的时间事件(定时任务),离现在最近的一个时间事件还要多久才执行。
  2. 如果有这样的时间事件,记录下还要多久执行它。取为timeout。
  3. 带着这个timeout,通过aeApiPoll()阻塞等待可读写的IO事件。如果第一步找不到时间事件,这里就没有timeout了,一直阻塞直至有可读性的IO事件。
  4. aeApiPoll()返回后,如果有IO事件的话。就挨个处理。对该IO事件是读还是写,都有flag标志。并且读写的回调处理函数,也通过aeCreateFileEvent()注册进来了。
  5. 处理完所有IO事件后,就可以执行时间事件了。因为时间事件是定时任务,所有执行完毕后,还需要设置好下一次执行的时间点。
  6. 结束了。可以开始新一轮循环。

那么beforesleep和aftersleep这两个钩子在哪里被调用?beforesleep在aeMain()函数里。aftersleep在第三步,aeApiPoll()一结束就执行。

从执行逻辑可看出,定时任务不是准时执行的。可能会有一些延时。

连接的生命周期管理

一般连接生命周期,就是accept -> read/writer -> close。比较简单,redis也差不多。按这个顺序来看看redis的处理方式。

起初,redis服务器肯定监听在某个端口上。有新连接到来时,就会被accept掉。有socket编程经验不会陌生。

简单的socket编程例子一般是listen将socket绑定成功后,就有一个文件描述符。然后在这个文件描述符上阻塞地accept就行。

redis肯定不会阻塞等。而是把这个文件描述符也通过aeCreateFileEvent()放进事件循环器中。这样只要有新连接,就等于有可读事件。这时再去accept,就不用傻瓜地阻塞等待了。

void initServer(void) {
  // ...
	for (j = 0; j < server.ipfd_count; j++) {
    // 将监听client请求的IO文件描述符加到事件循环器
    if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR)
      {
        serverPanic(
          "Unrecoverable error creating server.ipfd file event.");
      }
    }
}

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    // ...

    while(max--) {
        // 通过accept系统调用获取新连接的文件描述符
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(cfd,0,cip);
    }
}

新连接的文件描述符拿到后,也将它放进事件循环器中。毕竟不能阻塞地傻等数据从client发过来。

为这个新连接创建一个对应的client时,顺便把它加到事件循环器中。

client *createClient(int fd) {
    // 为这个连接创建一个client
    client *c = zmalloc(sizeof(client));

    if (fd != -1) {
        // 将文件描述符设置为noblocking
        anetNonBlock(NULL,fd);
        // 设置TCP_NODELAY,关闭Nagle算法
        anetEnableTcpNoDelay(NULL,fd);
        if (server.tcpkeepalive)
            // 开启tcp keepalive
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        // ok,可以将文件描述符加到事件循环器里了
        if (aeCreateFileEvent(server.el,fd,AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
        {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
  // ...
}

接下来,就正常等待client发命令到来,触发文件描述符的可读事件,在事件循环器里响应即可。

redis进行响应命令,执行对应的命令处理函数,把处理结果放在client的输出缓冲区里,同时通过aeCreateFileEvent()监听当前client对应文件描述符的写事件。只要可写了,就把输出缓冲区的内容刷给client。

在哪儿询问是否描述符可写呢?就在上面提到的钩子函数beforesleep里。

void beforeSleep(struct aeEventLoop *eventLoop) {
    // ...
    // 如果某client的输出缓冲区不为空,则监听client对应的文件描述符的可写状态
    // 等于将文件描述符注册到事件循环器中
    handleClientsWithPendingWrites();
    // ...
}

写完了也不能主动close掉,毕竟每次执行一个命令就开一个连接成本太大了。

常见策略就是定时把空闲的client连接断掉。这个工作主要放在定时任务中。

而client最近活跃的时间就是有命令到达,读取命令时。通过c->lastinteraction = server.unixtime记录。

// 注册定时任务
void initServer(void) {
  // ...
  // serverCron就是定时任务
  if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
    serverPanic("Can't create event loop timers.");
    exit(1);
  }
  // ...
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
  // ...
  // 断开空闲过久的client
  clientsCron()
  // ...
}

void clientsCron(void) {
    // ...
    while(listLength(server.clients) && iterations--) {
        // ...
        // 超出server.idletime,free掉
        if (clientsCronHandleTimeout(c,now)) continue;
    }
}

int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
  time_t now = now_ms/1000;

  if (server.maxidletime &&
      !(c->flags & CLIENT_SLAVE) &&    /* no timeout for slaves */
      !(c->flags & CLIENT_MASTER) &&   /* no timeout for masters */
      !(c->flags & CLIENT_BLOCKED) &&  /* no timeout for BLPOP */
      !(c->flags & CLIENT_PUBSUB) &&   /* no timeout for Pub/Sub clients */
      (now - c->lastinteraction > server.maxidletime))
  {
    serverLog(LL_VERBOSE,"Closing idle client");
    // freeclient里调用close
    freeClient(c);
    return 1;
  }
  // ...
  return 0;
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK