初始化服務器狀態結構
redis中一個最重要的數據結構是redis_server,會創建一個這個結構的全局變量server,初始化服務器的第一步就是創建一個struct redisServer類型的實例變量server作為服務器的狀態,並為結構中的各個屬性設置默認值。初始化server變量的工作由redis.c/initServerConfig函數完成,initServerConfig函數中,大部分是對server的屬性設置默認值,還有一部分是調用populateCommandTable函數對redis的命令表初始化。全局變量redisCommandTable是redisCommand類型的數組,保存redis支持的所有命令。server.commands是一個dict,保存命令名到redisCommand的映射。populateCommandTable函數會遍歷全局redisCommandTable表,把每條命令插入到server.commands中,根據每個命令的屬性設置其flags。以下是這個函數的部分代碼:
void initServerConfig(void){
//
設置服務器的運行id
getRandomHexChars(server.runid,REDIS_RUN_ID_SIZE);
//
為運行id
加上結尾字符
server.runid[REDIS_RUN_ID_SIZE] = '\0';
//
設置默認配置文件路徑
server.configfile = NULL;
//
設置默認服務器頻率
server.hz = REDIS_DEFAULT_HZ;
//
設置服務器的運行架構
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
//
設置默認服務器端口號
server.port = REDIS_SERVERPORT;
// ...
/* Command table -- we initiialize it here as it is part of the
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
// 初始化命令表
// 在這里初始化是因為接下來讀取 .conf 文件時可能會用到這些命令
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
...
}
以下是initServerConfig函數完成的主要工作:
·設置服務器的運行ID。
·設置服務器的默認運行頻率。
·設置服務器的默認配置文件路徑。
·設置服務器的運行架構。
·設置服務器的默認端口號。
·設置服務器的默認RDB持久化條件和AOF持久化條件。
·初始化服務器的LRU時鍾。
·創建命令表。
載入配置選項
在啟動服務器時,用戶可以通過給定配置參數或者指定配置文件來修改服務器的默認配置。舉個例子,如果我們在終端中輸入:
$ redis-server --port 10086
那么我們就通過給定配置參數的方式,修改了服務器的運行端口號。另外,如果我們在終端中輸入:
$ redis-server redis.conf
那么我們就通過指定配置文件的方式修改了服務器的數據庫數量,以及RDB持久化模塊的壓縮功能。
服務器在用initServerConfig函數初始化完server變量之后,就會開始載入用戶給定的配置參數和配置文件,並根據用戶設定的配置,對server變量相關屬性的值進行修改。
這一部分是在main()函數中實現的,下面是源代碼:
// 檢查用戶是否指定了配置文件,或者配置選項 if (argc >= 2) { int j = 1; /* First option to parse in argv[] */ sds options = sdsempty(); char *configfile = NULL; /* Handle special options --help and --version */ // 處理特殊選項 -h 、-v 和 --test-memory if (strcmp(argv[1], "-v") == 0 || strcmp(argv[1], "--version") == 0) version(); if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-h") == 0) usage(); if (strcmp(argv[1], "--test-memory") == 0) { if (argc == 3) { memtest(atoi(argv[2]),50); exit(0); } else { fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n"); exit(1); } } /* First argument is the config file name? */ // 如果第一個參數(argv[1])不是以 "--" 開頭 // 那么它應該是一個配置文件 if (argv[j][0] != '-' || argv[j][1] != '-') configfile = argv[j++]; /* All the other options are parsed and conceptually appended to the * configuration file. For instance --port 6380 will generate the * string "port 6380\n" to be parsed after the actual file name * is parsed, if any. */ // 對用戶給定的其余選項進行分析,並將分析所得的字符串追加稍后載入的配置文件的內容之后 // 比如 --port 6380 會被分析為 "port 6380\n" while(j != argc) { if (argv[j][0] == '-' && argv[j][1] == '-') { /* Option name */ if (sdslen(options)) options = sdscat(options,"\n"); options = sdscat(options,argv[j]+2); options = sdscat(options," "); } else { /* Option argument */ options = sdscatrepr(options,argv[j],strlen(argv[j])); options = sdscat(options," "); } j++; } //getAbsolutePath()函數用於得到一直文件名的絕對路徑 if (configfile) server.configfile = getAbsolutePath(configfile); // 重置保存條件 resetServerSaveParams(); // 載入配置文件, options 是前面分析出的給定選項 loadServerConfig(configfile,options); sdsfree(options); // 獲取配置文件的絕對路徑 if (configfile) server.configfile = getAbsolutePath(configfile); } else { redisLog(REDIS_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis"); }
其中 loadServerConfig(char *filename,char *option)函數主要用於從給定文件中載入服務器配置。
loadServerConfig:完成的功能很簡單,就是將文件內容讀到字符串中。並將通過命令行傳入的配置項追加到該字符串后。后面loadServerConfig會調用loadServerConfigFromString函數:從字符串中解析出配置項,並設置server的相關屬性(可參照源代碼)。
此步完成后,server中的簡單屬性(整數、字符串)基本都設置完成。
初始化服務器數據結構
在之前執行initServerConfig函數初始化server狀態時,程序只創建了命令表一個數據結構,不過除了命令表之外,服務器狀態還包含其他數據結構,比如:
- server.clients鏈表,這個鏈表記錄了所有與服務器相連的客戶端的狀態結構,鏈表的每個節點都包含了一個redisClient結構實例;
- server.db數組,數組中包含了服務器的所有數據庫;
- 用於保存頻道訂閱信息的server.pubsub_channels字典,以及用於保存模式訂閱信息的server.pubsub_patterns鏈表;
- 用於執行Lua腳本的Lua環境server.lua;
- 用於保存慢查詢日志的server.slowlog屬性。
此時服務器將調用initServer函數為上面提到的這些數據結構進行分配內存,並在需要的時候為其關聯初始化值。
除了上面這些外,initServer還進行了一些非常重要的設置操作,其中包括:
- 為服務器設置進程信號處理器;
- 創建共享對象:這些對象包含Redis服務器經常用到的一些值,比如包含"OK"回復的字符串對象,包含"ERR"回復的字符串對象,包含整數1到10000的字符串對象等等,服務器通過重用這些共享對象來避免反復創建相同的對象;
- 打開服務器的監聽端口,並為監聽套接字關聯連接應答事件處理器,等待服務器正式運行時接受客戶端的連接;
- 為serverCron函數創建時間事件,等待服務器正式運行時執行serverCron函數;
- 如果AOF持久化功能已經打開,那么打開現有的AOF文件,如果AOF文件不存在,那么創建並打開一個新的AOF文件,為AOF寫入做好准備;
- 初始化服務器的后台I/O模塊(bio),為將來的I/O操作做好准備。
void initServer() { int j; // 設置信號處理函數 signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); // 如果設置開啟syslog(系統日志記錄器),則初始化 if (server.syslog_enabled) { openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility); } // 初始化並創建數據結構 server.current_client = NULL; server.clients = listCreate(); server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.clients_paused = 0; // 創建共享對象 createSharedObjects(); adjustOpenFilesLimit(); server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR); server.db = zmalloc(sizeof(redisDb)*server.dbnum); /* Open the TCP listening socket for the user commands. */ // 打開 TCP 監聽端口,用於等待客戶端的命令請求 if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR) exit(1); /* Open the listening Unix domain socket. */ // 打開 UNIX 本地端口
//初始化監聽socket,就是調用socket、bind、listen等,並將socket設置為非阻塞。如果配置了unix domain socket,也會進行相應的初始化 if (server.unixsocket != NULL) { unlink(server.unixsocket); /* don't care if this fails */ server.sofd = anetUnixServer(server.neterr,server.unixsocket, server.unixsocketperm, server.tcp_backlog); if (server.sofd == ANET_ERR) { redisLog(REDIS_WARNING, "Opening socket: %s", server.neterr); exit(1); } anetNonBlock(NULL,server.sofd); } /* Abort if there are no listening sockets at all. */ if (server.ipfd_count == 0 && server.sofd < 0) { redisLog(REDIS_WARNING, "Configured to not listen anywhere, exiting."); exit(1); } /* Create the Redis databases, and initialize other internal state. */ // 創建並初始化數據庫結構 for (j = 0; j < server.dbnum; j++) { server.db[j].dict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); server.db[j].ready_keys = dictCreate(&setDictType,NULL); server.db[j].watched_keys = dictCreate(&keylistDictType,NULL); server.db[j].eviction_pool = evictionPoolAlloc(); server.db[j].id = j; server.db[j].avg_ttl = 0; } // 創建 PUBSUB 相關結構 server.pubsub_channels = dictCreate(&keylistDictType,NULL); server.pubsub_patterns = listCreate(); listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */ server.rdb_save_time_last = -1; server.rdb_save_time_start = -1; server.dirty = 0; resetServerStats(); /* A few stats we don't want to reset: server startup time, and peak mem. */ server.stat_starttime = time(NULL); server.stat_peak_memory = 0; server.resident_set_size = 0; server.lastbgsave_status = REDIS_OK; server.aof_last_write_status = REDIS_OK; server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; updateCachedTime(); /* Create the serverCron() time event, that's our main way to process * background operations. */ // 為 serverCron() 創建時間事件 if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); } /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ // 為 TCP 連接關聯連接應答(accept)處理器 // 用於接受並應答客戶端的 connect() 調用 for (j = 0; j < server.ipfd_count; j++) { if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR) { redisPanic( "Unrecoverable error creating server.ipfd file event."); } } // 為本地套接字關聯應答處理器 if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) redisPanic("Unrecoverable error creating server.sofd file event."); /* Open the AOF file if needed. */ // 如果 AOF 持久化功能已經打開,那么打開或創建一個 AOF 文件 if (server.aof_state == REDIS_AOF_ON) { server.aof_fd = open(server.aof_filename, O_WRONLY|O_APPEND|O_CREAT,0644); if (server.aof_fd == -1) { redisLog(REDIS_WARNING, "Can't open the append-only file: %s", strerror(errno)); exit(1); } } /* 32 bit instances are limited to 4GB of address space, so if there is * no explicit limit in the user provided configuration we set a limit * at 3 GB using maxmemory with 'noeviction' policy'. This avoids * useless crashes of the Redis instance for out of memory. */ // 對於 32 位實例來說,默認將最大可用內存限制在 3 GB if (server.arch_bits == 32 && server.maxmemory == 0) { redisLog(REDIS_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now."); server.maxmemory = 3072LL*(1024*1024); /* 3 GB */ server.maxmemory_policy = REDIS_MAXMEMORY_NO_EVICTION; } // 如果服務器以 cluster 模式打開,那么初始化 cluster if (server.cluster_enabled) clusterInit(); // 初始化復制功能有關的腳本緩存 replicationScriptCacheInit(); // 初始化腳本系統 scriptingInit(); // 初始化慢查詢功能 slowlogInit(); // 初始化 BIO 系統 bioInit(); }
還原數據庫狀態
緊接着,如果在完成了對服務器狀態server變量的初始化之后,服務器需要載入RDB文件或者AOF文件,並根據文件記錄的內容來還原服務器的數據庫狀態。
根據服務器是否啟用了AOF持久化功能,服務器載入數據時所使用的目標文件會有所不同:
- 如果服務器啟用了AOF持久化功能,那么服務器使用AOF文件來還原數據庫狀態;
- 相反地,如果服務器沒有啟用AOF持久化功能,那么服務器使用RDB文件來還原數據庫狀態。
當服務器完成數據庫狀態還原工作之后,服務器將在日志中打印出載入文件並還原數據庫狀態所耗費的時長:
[5244] 21 Nov 22:43:49.084 * DB loaded from disk: 0.068 seconds
在main函數中導入RDB或者AOF的函數源碼如下:
// 如果服務器不是運行在 SENTINEL 模式,那么執行以下代碼 if (!server.sentinel_mode) { /* Things not needed when running in Sentinel mode. */ // 打印問候語 redisLog(REDIS_WARNING,"Server started, Redis version " REDIS_VERSION); #ifdef __linux__ // 打印內存警告 linuxOvercommitMemoryWarning(); #endif // 從 AOF 文件或者 RDB 文件中載入數據 loadDataFromDisk(); // 啟動集群? if (server.cluster_enabled) { if (verifyClusterConfigWithData() == REDIS_ERR) { redisLog(REDIS_WARNING, "You can't have keys in a DB different than DB 0 when in " "Cluster mode. Exiting."); exit(1); } } // 打印 TCP 端口 if (server.ipfd_count > 0) redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); // 打印本地套接字端口 if (server.sofd > 0) redisLog(REDIS_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); } else { sentinelIsRunning(); }
main函數的最后,是啟動事件循環。在事件循環的每次迭代,sleep之前會調用beforeSleep函數,進行一些異步處理。此處首先設置beforeSleep函數,然后啟動aeMain事件循環。當從事件循環退出后,清理事件循環,然后退出。
// 運行事件處理器,一直到服務器關閉為止 aeSetBeforeSleepProc(server.el,beforeSleep);
// 開啟事件循環 aeMain(server.el); // 服務器關閉,停止事件循環 aeDeleteEventLoop(server.el); return 0; }