zl程序教程

您现在的位置是:首页 >  数据库

当前栏目

Redis启动分析

Redis 分析 启动
2023-06-13 09:14:05 时间

文件入口:server.c##main

配置初始化

这一步表示Redis服务器基本数据结构和各种参数的初始化。在Redis源码中,Redis服务器是用一个叫做redisServer的struct来表达的,里面定义了Redis服务器赖以运行的各种参数,比如监听的端口号和文件描述符、当前连接的各个client端、Redis命令表(command table)配置、持久化相关的各种参数,以及事件循环结构。

void initServerConfig(void) {
    int j;

    ...

    /* Command table -- we initiialize it here as it is part of the
     * initial configuration, since command names may be changed via
     * redis.conf using the rename-command directive. */
    server.commands = dictCreate(&commandTableDictType,NULL);
    server.orig_commands = dictCreate(&commandTableDictType,NULL);
    populateCommandTable();
    server.delCommand = lookupCommandByCString("del");
    server.multiCommand = lookupCommandByCString("multi");
    server.lpushCommand = lookupCommandByCString("lpush");
    server.lpopCommand = lookupCommandByCString("lpop");
    server.rpopCommand = lookupCommandByCString("rpop");
    server.zpopminCommand = lookupCommandByCString("zpopmin");
    server.zpopmaxCommand = lookupCommandByCString("zpopmax");
    server.sremCommand = lookupCommandByCString("srem");
    server.execCommand = lookupCommandByCString("exec");
    server.expireCommand = lookupCommandByCString("expire");
    server.pexpireCommand = lookupCommandByCString("pexpire");
    server.xclaimCommand = lookupCommandByCString("xclaim");
    server.xgroupCommand = lookupCommandByCString("xgroup");
    server.rpoplpushCommand = lookupCommandByCString("rpoplpush");

    /* Debugging */
    server.assert_failed = "<no assertion failed>";
    server.assert_file = "<no file>";
    server.assert_line = 0;
    server.bug_report_start = 0;
    server.watchdog_period = 0;

    /* By default we want scripts to be always replicated by effects
     * (single commands executed by the script), and not by sending the
     * script to the slave / AOF. This is the new way starting from
     * Redis 5. However it is possible to revert it via redis.conf. */
    server.lua_always_replicate_commands = 1;

    initConfigValues();
}

Redis服务器在运行时就是由这个redisServer类型的全局变量来表示的(变量名就叫server),这一步的初始化主要就是对于这个全局变量进行初始化。

在整个初始化过程中,有一个需要特别关注的函数:populateCommandTable。它初始化了Redis命令表,通过它可以由任意一个Redis命令的名字查找该命令的配置信息(比如该命令接收的命令参数个数、执行函数入口等)。

void populateCommandTable(void) {
    int j;
    int numcommands = sizeof(redisCommandTable)/sizeof(struct redisCommand);

    for (j = 0; j < numcommands; j++) {
        struct redisCommand *c = redisCommandTable+j;
        int retval1, retval2;

        /* Translate the command string flags description into an actual
         * set of flags. */
        if (populateCommandTableParseFlags(c,c->sflags) == C_ERR)
            serverPanic("Unsupported command flag");

        c->id = ACLGetCommandID(c->name); /* Assign the ID used for ACL. */
        retval1 = dictAdd(server.commands, sdsnew(c->name), c);
        /* Populate an additional dictionary that will be unaffected
         * by rename-command statements in redis.conf. */
        retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
        serverAssert(retval1 == DICT_OK && retval2 == DICT_OK);
    }
}

redis的命令是硬编码的,我们可以进入redisCommandTable看到如下:

struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},

    /* Note that we can't flag set as fast, since it may perform an
     * implicit DEL of a large key. */
    {"set",setCommand,-3,
     "write use-memory @string",
     0,NULL,1,1,1,0,0,0},

	...
}

表里包含了命令以及命令对应的函数,其中,每个命令的结构如下:

struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags;   /* Flags as string representation, one char per flag. */
    uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
    /* Use a function to determine keys arguments in a command line.
     * Used for Redis Cluster redirect. */
    redisGetKeysProc *getkeys_proc;
    /* What keys should be loaded in background when calling this command? */
    int firstkey; /* The first argument that's a key (0 = no keys) */
    int lastkey;  /* The last argument that's a key */
    int keystep;  /* The step between first and last key */
    long long microseconds, calls;
    int id;     /* Command ID. This is a progressive ID starting from 0 that
                   is assigned at runtime, and is used in order to check
                   ACLs. A connection is able to execute a given command if
                   the user associated to the connection has this command
                   bit set in the bitmap of allowed commands. */
};

读取配置文件

回到初始化server结构体代码中,我们可以看到:在对全局的redisServer结构进行了初始化之后,还需要从配置文件(redis.conf)中加载配置。这个过程可能覆盖掉之前初始化过的redisServer结构中的某些参数。换句话说,就是先经过一轮初始化,保证Redis的各个内部数据结构以及参数都有缺省值,然后再从配置文件中加载自定义的配置。

void initConfigValues() {
    for (standardConfig *config = configs; config->name != NULL; config++) {
        config->interface.init(config->data);
    }
}

创建事件循环

在Redis中,事件循环是用一个叫aeEventLoop的struct来表示的。「创建事件循环」这一步主要就是创建一个aeEventLoop结构,并存储到server全局变量(即前面提到的redisServer类型的结构)中。另外,事件循环的执行依赖系统底层的I/O多路复用机制(I/O multiplexing),比如Linux系统上的epoll机制。因此,这一步也包含对于底层I/O多路复用机制的初始化(调用系统API)。

aeEventLoop *aeCreateEventLoop(int setsize) {
    aeEventLoop *eventLoop;
    int i;

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

开启socket监听

服务器程序需要监听才能收到请求。根据配置,这一步可能会打开两种监听:对于TCP连接的监听和对于Unix domain socket的监听。Unix domain socket是一种高效的进程间通信(IPC)机制,在POSIX规范中也有明确的定义,用于在同一台主机上的两个不同进程之间进行通信,比使用TCP协议性能更高(因为省去了协议栈的开销)。当使用Redis客户端连接同一台机器上的Redis服务器时,可以选择使用「Unix domain socket」进行连接。但不管是哪一种监听,程序都会获得文件描述符,并存储到server全局变量中。对于TCP的监听来说,由于监听的IP地址和端口可以绑定多个,因此获得的用于监听TCP连接的文件描述符也可以包含多个。后面,程序就可以拿这一步获得的文件描述符去注册I/O事件回调了。

int listenToPort(int port, int *fds, int *count) {
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always
     * entering the loop if j == 0. */
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        if (server.bindaddr[j] == NULL) {
            int unsupported = 0;
            /* Bind * for both IPv6 and IPv4, we enter here only if
             * server.bindaddr_count == 0. */
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,
                server.tcp_backlog);
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]);
                (*count)++;
            } else if (errno == EAFNOSUPPORT) {
                unsupported++;
                serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
            }

            if (*count == 1 || unsupported) {
                /* Bind the IPv4 address as well. */
                fds[*count] = anetTcpServer(server.neterr,port,NULL,
                    server.tcp_backlog);
                if (fds[*count] != ANET_ERR) {
                    anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                    serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
                }
            }
            /* Exit the loop if we were able to bind * on IPv4 and IPv6,
             * otherwise fds[*count] will be ANET_ERR and we'll print an
             * error and return to the caller with an error. */
            if (*count + unsupported == 2) break;
        } else if (strchr(server.bindaddr[j],':')) {
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        } else {
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) {
            serverLog(LL_WARNING,
                "Could not create server TCP listening socket %s:%d: %s",
                server.bindaddr[j] ? server.bindaddr[j] : "*",
                port, server.neterr);
                if (errno == ENOPROTOOPT     || errno == EPROTONOSUPPORT ||
                    errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
                    errno == EAFNOSUPPORT    || errno == EADDRNOTAVAIL)
                    continue;
            return C_ERR;
        }
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return C_OK;
}

注册timer事件回调

Redis作为一个单线程(single-threaded)的程序,它如果想调度一些异步执行的任务,比如周期性地执行过期key的回收动作,除了依赖事件循环机制,没有其它的办法。这一步就是向前面刚刚创建好的事件循环中注册一个timer事件,并配置成可以周期性地执行一个回调函数:serverCron。由于Redis只有一个主线程,因此这个函数周期性的执行也是在这个线程内,它由事件循环来驱动(即在合适的时机调用),但不影响同一个线程上其它逻辑的执行(相当于按时间分片了)。serverCron函数到底做了什么呢?实际上,它除了周期性地执行过期key的回收动作,还执行了很多其它任务,比如主从重连、Cluster节点间的重连、BGSAVE和AOF rewrite的触发执行,等等

long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    if (te->next)
        te->next->prev = te;
    eventLoop->timeEventHead = te;
    return id;
}

注册I/O事件回调

Redis服务端最主要的工作就是监听I/O事件,从中分析出来自客户端的命令请求,执行命令,然后返回响应结果。对于I/O事件的监听,自然也是依赖事件循环。前面提到过,Redis可以打开两种监听:对于TCP连接的监听和对于Unix domain socket的监听。因此,这里就包含对于这两种I/O事件的回调的注册,两个回调函数分别是acceptTcpHandler和acceptUnixHandler。对于来自Redis客户端的请求的处理,就会走到这两个函数中去。另外,其实Redis在这里还会注册一个I/O事件,用于通过管道(pipe)机制与module进行双向通信。

 /* Create an event handler for accepting new connections in TCP and Unix
 * domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
	if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
		acceptTcpHandler,NULL) == AE_ERR)
		{
			serverPanic(
				"Unrecoverable error creating server.ipfd file event.");
		}
}
for (j = 0; j < server.tlsfd_count; j++) {
	if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE,
		acceptTLSHandler,NULL) == AE_ERR)
		{
			serverPanic(
				"Unrecoverable error creating server.tlsfd file event.");
		}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
	acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");


/* Register a readable event for the pipe used to awake the event loop
 * when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
	moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
		serverPanic(
			"Error registering the readable event for the module "
			"blocked clients subsystem.");
}

接下来就是InitServerLast方法:

void InitServerLast() {
    bioInit();
    initThreadedIO();
    set_jemalloc_bg_thread(server.jemalloc_bg_thread);
    server.initial_memory_usage = zmalloc_used_memory();
}

初始化后台线程

Redis会创建一些额外的线程,在后台运行,专门用于处理一些耗时的并且可以被延迟执行的任务(一般是一些清理工作)。在Redis里面这些后台线程被称为bio(Background I/O service)。它们负责的任务包括:可以延迟执行的文件关闭操作(比如unlink命令的执行),AOF的持久化写库操作(即fsync调用,但注意只有可以被延迟执行的fsync操作才在后台线程执行),还有一些大key的清除操作(比如flushdb async命令的执行)。可见bio这个名字有点名不副实,它做的事情不一定跟I/O有关。对于这些后台线程,我们可能还会产生一个疑问:前面的初始化过程,已经注册了一个timer事件回调,即serverCron函数,按说后台线程执行的这些任务似乎也可以放在serverCron中去执行。因为serverCron函数也是可以用来执行后台任务的。实际上这样做是不行的。前面我们已经提到过,serverCron由事件循环来驱动,执行还是在Redis主线程上,相当于和主线程上执行的其它操作(主要是对于命令请求的执行)按时间进行分片了。这样的话,serverCron里面就不能执行过于耗时的操作,否则它就会影响Redis执行命令的响应时间。因此,对于耗时的、并且可以被延迟执行的任务,就只能放到单独的线程中去执行了。

void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

初始化主线程IO

前面创建好了事件循环的结构,但还没有真正进入循环的逻辑。过了这一步,事件循环就运行起来,驱动前面注册的timer事件回调和I/O事件回调不断执行。

void initThreadedIO(void) {
    io_threads_active = 0; /* We start with threads not active. */

    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;

    if (server.io_threads_num > IO_THREADS_MAX_NUM) {
        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
                             "The maximum number is %d.", IO_THREADS_MAX_NUM);
        exit(1);
    }

    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

还原数据库

初始化完服务器的状态后,服务器已经处于一个可启动状态,因为redis有持久化特性,服务器还需要加载相应的文件来还原之前数据库的数据。判断Redis当前开启了哪种模式,如果是AOF,则通过AOF还原数据库的数据,否则,载入RDB文件,通过RDB文件还原数据库的数据。

void loadDataFromDisk(void) {
    long long start = ustime();
    if (server.aof_state == AOF_ON) {
        if (loadAppendOnlyFile(server.aof_filename) == C_OK)
            serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
        if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
            serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);

            /* Restore the replication ID / offset from the RDB file. */
            if ((server.masterhost ||
                (server.cluster_enabled &&
                nodeIsSlave(server.cluster->myself))) &&
                rsi.repl_id_is_set &&
                rsi.repl_offset != -1 &&
                /* Note that older implementations may save a repl_stream_db
                 * of -1 inside the RDB file in a wrong way, see more
                 * information in function rdbPopulateSaveInfo. */
                rsi.repl_stream_db != -1)
            {
                memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
                server.master_repl_offset = rsi.repl_offset;
                /* If we are a slave, create a cached master from this
                 * information, in order to allow partial resynchronizations
                 * with masters. */
                replicationCacheMasterUsingMyself();
                selectDb(server.cached_master,rsi.repl_stream_db);
            }
        } else if (errno != ENOENT) {
            serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
            exit(1);
        }
    }
}

启动事件监听

main函数会设置beforeSleep和afterSleep回调函数,然后调用aeMain函数启动事件循环器,开始监听事件。aeMain函数是一个死循环,不断的监听新请求的到来。

aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);