14

Redis源码阅读:启动过程

 3 years ago
source link: https://jiajunhuang.com/articles/2021_05_22-redis_source_code_1.md.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Redis源码阅读:启动过程

最近我突然想看看Redis是怎么开始处理命令的,也就是从启动到开始处理请求,中间大概都发生了什么。话不多说,首先fork 原始仓库,然后把代码拉下来:

$ git clone [email protected]:jiajunhuang/redis.git

这样主要是为了方便自己加注释&保存。

在阅读之前,我们可以大概猜测一下如果是我们自己来写Redis,应该是怎么处理。如果我们使用Go来写一个Redis,那么 差不多就是先 listen ,然后 accept 之后,得到一个 connection,然后起一个goroutine来处理这个客户端连接。 但是C里没有这么方便,如果想要达到Go那样的写法,最简单的其实就是来一个连接,就起一个线程去处理,但是这样其实 抗不了多少并发,Redis使用的是epoll,epoll的模式,差不多就是 listen 然后 开始把得到的fd注册到epoll,然后开始 不断的循环去做 epoll_wait,碰到可以读的fd,就读出来然后进行处理。

我们进行猜测之后,然后去Redis的源码里进行求证。

C语言的程序,都是从 main 函数开始执行的,所以我们要先找到 main 函数,在server.c 里面:

int main(int argc, char **argv) {
    struct timeval tv;
    int j;
    char config_from_stdin = 0;

    ...
    redisSetCpuAffinity(server.server_cpulist);
    setOOMScoreAdj(-1);

    aeMain(server.el);
    aeDeleteEventLoop(server.el);
    return 0;
}

如果仔细看的话,aeMain 其实就是那个循环:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

/*
#define AE_FILE_EVENTS (1<<0)
#define AE_TIME_EVENTS (1<<1)
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT (1<<2)
#define AE_CALL_BEFORE_SLEEP (1<<3)
#define AE_CALL_AFTER_SLEEP (1<<4)
*/

这个ae是Redis抽象出来的一个I/O多路复用库。既然在 aeMain(server.el) 里是循环,那么必定在循环之前有注册处理 TCP连接,以及如何读取连接的内容的函数。我们在 main 里翻一翻。

我往上翻了一翻,翻到一个 initServer(),进去看了一眼,还真的是:

void initServer(void) {
    int j;

    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    setupSignalHandlers();
    makeThreadKillable();
    ...
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
    if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TLS socket accept handler.");
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
    ...

}

可以看到就在这里注册了时间事件和accept的回调函数。我们跳进去看看:

提一下,上面的 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR 就是Redis定时执行一些任务的注册处。

/* Create an event handler for accepting new connections in TCP or TLS domain sockets.
 * This works atomically for all socket fds */
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler) {
    int j;

    for (j = 0; j < sfd->count; j++) {
        if (aeCreateFileEvent(server.el, sfd->fd[j], AE_READABLE, accept_handler,NULL) == AE_ERR) {
            /* Rollback */
            for (j = j-1; j >= 0; j--) aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
            return C_ERR;
        }
    }
    return C_OK;
}

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    aeFileEvent *fe = &eventLoop->events[fd];

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

然后我们看看我们传进去的 accept_handler 长啥样:

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        anetCloexec(cfd);
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    client *c;
    char conninfo[100];
    UNUSED(ip);
    ...
    /* Create connection and client */
    if ((c = createClient(conn)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (conn: %s)",
            connGetLastError(conn),
            connGetInfo(conn, conninfo, sizeof(conninfo)));
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }
    ...
    if (connAccept(conn, clientAcceptHandler) == C_ERR) {
        char conninfo[100];
        if (connGetState(conn) == CONN_STATE_ERROR)
            serverLog(LL_WARNING,
                    "Error accepting a client connection: %s (conn: %s)",
                    connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
        freeClient(connGetPrivateData(conn));
        return;
    }
}

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    /* passing NULL as conn it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient);
        connSetPrivateData(conn, c);
    }
    ...
}

这其中的 connSetReadHandler(conn, readQueryFromClient) 就设置了当socket缓冲区里有数据的时候,调用这个函数来处理。 可是,什么时候会调用呢?也就是说, 是在哪里注册的 conn->type 和 可读事件的关系给I/O多路复用库的呢?我翻了一下没找到, 不过当我搜索 set_read_handler 的时候找到了:

ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
};

/* Register a read handler, to be called when the connection is readable.
 * If NULL, the existing handler is removed.
 */
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

也就是说,当有可读事件时,会调用 conn->type->ae_handlerconn->type 其实就是 CT_Socket,所以这个 ae_handler 其实就是:

static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
    UNUSED(el);
    UNUSED(fd);
    connection *conn = clientData;

    if (conn->state == CONN_STATE_CONNECTING &&
            (mask & AE_WRITABLE) && conn->conn_handler) {

        int conn_error = connGetSocketError(conn);
        if (conn_error) {
            conn->last_errno = conn_error;
            conn->state = CONN_STATE_ERROR;
        } else {
            conn->state = CONN_STATE_CONNECTED;
        }

        if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);

        if (!callHandler(conn, conn->conn_handler)) return;
        conn->conn_handler = NULL;
    }

    /* Normally we execute the readable event first, and the writable
     * event later. This is useful as sometimes we may be able
     * to serve the reply of a query immediately after processing the
     * query.
     *
     * However if WRITE_BARRIER is set in the mask, our application is
     * asking us to do the reverse: never fire the writable event
     * after the readable. In such a case, we invert the calls.
     * This is useful when, for instance, we want to do things
     * in the beforeSleep() hook, like fsync'ing a file to disk,
     * before replying to a client. */
    int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;

    int call_write = (mask & AE_WRITABLE) && conn->write_handler;
    int call_read = (mask & AE_READABLE) && conn->read_handler;

    /* Handle normal I/O flows */
    if (!invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
    /* Fire the writable event. */
    if (call_write) {
        if (!callHandler(conn, conn->write_handler)) return;
    }
    /* If we have to invert the call, fire the readable event now
     * after the writable one. */
    if (invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
}

所以可以看到,就在这里,当有事件发生时,会分别调用处理读和写的回调函数,而处理读的函数,也就是最开始我们传入的 readQueryFromClient

void readQueryFromClient(connection *conn) {
    ...

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = connRead(c->conn, c->querybuf+qblen, readlen);
    if (nread == -1) {
        if (connGetState(conn) == CONN_STATE_CONNECTED) {
            return;
        } else {
            serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
            freeClientAsync(c);
            return;
        }
    } else if (nread == 0) {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClientAsync(c);
        return;
    } else if (c->flags & CLIENT_MASTER) {
        /* Append the query buffer to the pending (not applied) buffer
         * of the master. We'll use this buffer later in order to have a
         * copy of the string applied by the last command executed. */
        c->pending_querybuf = sdscatlen(c->pending_querybuf,
                                        c->querybuf+qblen,nread);
    }

    ...

    /* There is more data in the client input buffer, continue parsing it
     * in case to check if there is a full command to execute. */
     processInputBuffer(c);
}

可以看到,Redis的做法,其实就是从Socket读取,存到 c->querybuf,然后再从 c->querybuf 里提取命令并且执行。

这篇文章我们看了一下Redis是怎么启动,然后开始处理来自客户端的命令的,这些逻辑如果是在Go里来实现的话,其实很简单, 但是由于Redis使用C语言使用epoll来实现,而epoll的方式则是回调的方式,所以可以看到,很多逻辑被拆成了很多碎片,再加上 Redis本身做的一些抽象,让代码更加的绕一些,当然这对Redis本身来说,是降低了工程复杂,但是对于阅读源码的人来说,的确 加大了难度。


微信公众号
关注公众号,获得及时更新

flutter开发体验汇报

自己封装一个好用的Dart HTTP库

Flutter应用启动后检查更新

Grafana Gravatar头像显示bug修复

flutter中使用RESTful接口

Vim YouCompleteMe使用LSP(以dart为例)

flutter webview加载时显示进度

SQLAlchemy快速更新或插入对象

修复Linux下curl等无法使用letsencrypt证书

欣赏一下K&R两位大神的代码

MySQL的ON DUPLICATE KEY UPDATE语句

使用microk8s快速搭建k8s

Python中优雅的处理文件路径

Go语言MySQL时区问题

我的技术栈选型




Recommend

  • 75
    • www.hoohack.me 6 years ago
    • Cache

    [Redis源码阅读]redis持久化

    [Redis源码阅读]redis持久化

  • 43
    • 微信 mp.weixin.qq.com 4 years ago
    • Cache

    Redis 源码阅读:字符串

    使用过 Redis 的都知道 Redis 用的最多的可能是它的 Key/Value 的...

  • 37
    • 微信 mp.weixin.qq.com 3 years ago
    • Cache

    Redis 源码阅读:链表

    本文是参考 黄建宏 先生所写的 《Redis 设计与实现》

  • 10

    如何阅读 Redis 源码?¶ 在这篇文章中, 我将向大家介绍一种我认为比较合理的 Redis 源码阅读顺序, 希望可以给对 Redis 有兴趣并打算阅读 Redis 源码的朋...

  • 3
    • jiajunhuang.com 3 years ago
    • Cache

    Redis源码阅读:字典

    Redis源码阅读:字典 dict,也就是哈希表这个数据结构,在Redis中的作用非常广泛,比如,Redis用它来存储支持的命令,这篇文章我们会看一下Redis是 如何实现dict的。 上一篇我们讲到,Redis读取网络请求的内容,解析出命令后,开始处理。其中...

  • 8
    • jiajunhuang.com 3 years ago
    • Cache

    Redis源码阅读:执行命令

    Redis源码阅读:执行命令 上一篇我们读到,Redis是怎么从启动服务,到开始读取来自socket的字节流。这一篇我们继续看看,如何处理字节流,然后变成命令, 到返回对应数据。 在开始之前,我们得先看看Redis服务端与客户端的通信协议,也就是

  • 10
    • jiajunhuang.com 3 years ago
    • Cache

    Redis源码阅读:key是怎么过期的

    Redis源码阅读:key是怎么过期的 我们经常用到Redis的expire这个命令,比如我们设置一个缓存,通常会这样用: SETEX mykey 10 "Hello" 如官网文档所说,这个命令相当于: SET mykey value EXPIRE...

  • 12
    • jiajunhuang.com 3 years ago
    • Cache

    Redis源码阅读:AOF持久化

    Redis源码阅读:AOF持久化 都说Redis是内存数据库,其实 Redis 也有持久化机制,就是我们在 redis.conf 里配置的如下几行: appendonly no # The name of the append only file (default: "appendonly.aof") appendfi...

  • 4
    • segmentfault.com 3 years ago
    • Cache

    阅读 MyBatis 源码:SQL 执行过程

    上一篇文章介绍了 JDBC 实现 SQL 查询的原理 ,本文通过一个简单的 MyBatis 查询示例,探究 MyBatis 执行 SQL 查询的代码流程。本文基于 MyBatis 3.5.7。1...

  • 9

    本篇文章旨在分析SOFAJRaft中jraft-example模块的启动过程,由于SOFAJRaft在持续开源的过程中,所以无法保证示例代码永远是最新的,要是有较大的变动或者纰漏、错误的地方,欢迎大家留言讨论。 @Author:Akai-yuan

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK