文章導航
Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎么才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,后續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。
曹工說Redis源碼(1)-- redis debug環境搭建,使用clion,達到和調試java一樣的效果
曹工說Redis源碼(2)-- redis server 啟動過程解析及簡單c語言基礎知識補充
本講主題
首先,會再補充一點c語言中,指針的相關知識;接下來,開始接着昨天的那篇,講redis的啟動過程,由大到小來講,避免迅速陷入到細節中。
關於指針的理解
指針,其實就是指向一個內存地址,在知道這個地址前后存儲的內容的前提下,這個指針可以被你任意解釋。我舉個例子:
typedef struct Test_Struct{
int a;
int b;
}Test_Struct;
int main() {
// 1
void *pVoid = malloc(4);
// 2
memset(pVoid,0x01,4);
// 3
int *pInt = pVoid;
// 4
char *pChar = pVoid;
// 5
short *pShort = pVoid;
// 6
Test_Struct *pTestStruct = pVoid;
// 7
printf("address:%p, point to %d\n", pChar, *pChar);
printf("address:%p, point to %d\n", pShort, *pShort);
printf("address:%p, point to %d\n", pInt, *pInt);
printf("address:%p, point to %d\n", pTestStruct, pTestStruct->a);
}
-
1處,分配一片內存,4個字節,32位;返回一個指針,指向這片內存區域,准確地說,指向第一個字節,因為分配的內存是連續的,你可以理解為數組。
The malloc() function allocates size bytes and returns a pointer to the allocated memory.
-
2處,調用memset,將這個pVoid 指向的內存開始的4個字節,設置為0x01,其實就是把每個字節設置為00000001。
這個memset的注釋如下:
NAME memset - fill memory with a constant byte SYNOPSIS #include <string.h> void *memset(void *s, int c, size_t n); DESCRIPTION The memset() function fills the first n bytes of the memory area pointed to by s with the constant byte c.
參考資料: https://www.cnblogs.com/yhlboke-1992/p/9292877.html
這里我們把每個字節,設為0x01,最終的二進制,其實就是如下這樣:
-
3處,定義int類型的指針,將pVoid賦值給它,int占4字節
-
4處,定義char類型的指針,將pVoid賦值給它,char占1字節
-
5處,定義short類型的指針,將pVoid賦值給它,short占2字節
-
6處,定義Test_Struct類型的指針,這是個結構體,類似於高級語言的類,這個結構體的結構如下:
typedef struct Test_Struct{ int a; int b; }Test_Struct;
同樣,我們將pVoid賦值給它。
-
7處,分別打印各類指針的地址,和對其解引用后的值。
輸出如下:
257的二進制就是:0000 0001 0000 0001
16843009的二進制就是:0000 0001 0000 0001 0000 0001 0000 0001
結構體那個,也好理解,因為這個結構體,第一個屬性a,就是int類型的,占4個字節。
另外,大家要注意,上面輸出的指針地址都是一模一樣的。
如果大家能理解這個demo,再看看這個鏈接,相信會更加理解指針:
redis server大致的啟動過程
int main(int argc, char **argv) {
struct timeval tv;
/**
* 1 設置時區等等
*/
setlocale(LC_COLLATE,"");
...
// 2 檢查服務器是否以 Sentinel 模式啟動
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 3 初始化服務器配置
initServerConfig();
// 4
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
// 5 檢查用戶是否指定了配置文件,或者配置選項
if (argc >= 2) {
...
// 載入配置文件, options 是前面分析出的給定選項
loadServerConfig(configfile,options);
sdsfree(options);
}
// 6 將服務器設置為守護進程
if (server.daemonize) daemonize();
// 7 創建並初始化服務器數據結構
initServer();
// 8 如果服務器是守護進程,那么創建 PID 文件
if (server.daemonize) createPidFile();
// 9 為服務器進程設置名字
redisSetProcTitle(argv[0]);
// 10 打印 ASCII LOGO
redisAsciiArt();
// 11 如果服務器不是運行在 SENTINEL 模式,那么執行以下代碼
if (!server.sentinel_mode) {
// 從 AOF 文件或者 RDB 文件中載入數據
loadDataFromDisk();
// 啟動集群
if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == REDIS_ERR) {
redisLog(REDIS_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
// 打印 TCP 端口
if (server.ipfd_count > 0)
redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
} else {
sentinelIsRunning();
}
// 12 運行事件處理器,一直到服務器關閉為止
aeSetBeforeSleepProc(server.el,beforeSleep);
aeMain(server.el);
// 13 服務器關閉,停止事件循環
aeDeleteEventLoop(server.el);
return 0;
}
-
1,2,3處,在前面那篇中已經講過,主要是初始化各種配置參數,比如socket相關的;redis.conf中涉及的,aof,rdb,replication,sentinel等;redis server自己內部的數據結構等,如runid,配置文件地址,服務器的相關信息(32位還是64位,因為redis直接運行在操作系統上,而不是像高級語言有虛擬機,32位和64位下,不同數據的長度是不同的),日志級別,最大客戶端數量,客戶端最大idle時間等等
-
4處,因為sentinel和普通的redis server其實是共用同一份代碼,所以這里啟動時,要看是啟動sentinel,還是啟動普通的redis server,如果是啟動sentinel,則進行sentinel相關配置
-
5處,檢查啟動時的命令行參數中,是否指定了配置文件,如果指定了,要使用配置文件的配置為准
-
6處,設置為守護進程
-
7處,根據前面的配置,初始化redis server
-
8處,創建pid文件,一般默認路徑:/var/run/redis.pid,這個可以在redis.conf進行配置,如:
pidfile "/var/run/redis_6379.pid"
-
9處,為服務器進程設置名字
-
10處,打印logo
-
11處,如果不是sentinel模式啟動的話,加載aof或rdb文件
-
12處,跳入死循環,開始等待接收連接,處理客戶端的請求;同時,周期執行后台任務,比如刪除過期key等
-
13處,服務器關閉,一般來說,走不到這里,一般都是陷入在12處的死循環中;只有在某些場景下,將一個全局變量stop修改為true后,程序會從12處跳出死循環,然后走到這里。
初始化redis server的過程
這一節,主要是細化前面的第7步操作,即初始化redis server。這一個函數,位於redis.c中,名為initServer,做的事情很多,接下來會順序講解。
設置全局的信號處理函數
// 設置信號處理函數
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
最重要的是最后一行:
void setupSignalHandlers(void) {
// 1
struct sigaction act;
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
* Otherwise, sa_handler is used. */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
// 2
act.sa_handler = sigtermHandler;
// 3
sigaction(SIGTERM, &act, NULL);
return;
}
3處,設置了:接收到SIGTERM信號時,使用act
來處理信號,act在1處定義,是一個局部變量,它有一個字段,在2處被賦值,這是一個函數指針。函數指針類似於java中的一個static方法的引用,為什么是static,因為執行這類方法不需要new一個對象;在c語言中,所有的方法都是最頂級的,調用時,不需要new一個對象;所以,從這點來說,c語言的函數指針類似java中的static方法的引用。
我們可以看看2處,
act.sa_handler = sigtermHandler;
這個sigtermHandler,應該就是一個全局函數了,看看其怎么被定義的:
// SIGTERM 信號的處理器
static void sigtermHandler(int sig) {
REDIS_NOTUSED(sig);
redisLogFromHandler(REDIS_WARNING,"Received SIGTERM, scheduling shutdown...");
// 打開關閉標識
server.shutdown_asap = 1;
}
這個函數就是打開server這個全局變量的shutdown_asap。這個字段在以下地方被使用:
serverCron in redis.c
/* We received a SIGTERM, shutting down here in a safe way, as it is
* not ok doing so inside the signal handler. */
// 服務器進程收到 SIGTERM 信號,關閉服務器
if (server.shutdown_asap) {
// 嘗試關閉服務器
if (prepareForShutdown(0) == REDIS_OK) exit(0);
// 如果關閉失敗,那么打印 LOG ,並移除關閉標識
redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
server.shutdown_asap = 0;
}
以上這段代碼的第一行,標識了這段代碼所處的位置,為redis.c中的serverCron函數,這個函數,就是redis server的周期執行函數,類似於java中的ScheduledThreadPoolExecutor,當這個周期任務,檢測到server.shutdown_asap打開后,就會去關閉服務器。
那,上面這個接收到信號,要執行的動作說完了,那么,什么是信號,信號其實是linux下進程間通訊的一種手段,比如kill -9 ,就會給對應的pid,發送一個SIGKILL 命令;在redis前台運行時,你按下ctrl + c,其實也是發送了一個信號,信號為SIGINT,值為2。大家可以看下圖:
那么,前面我們注冊的信號是哪個呢,是:SIGTERM,15。也就是我們按下kill -15時,會觸發這個信號。
關於kill 9 和kill 15的差別,可以看這篇博客:
開啟syslog
// 設置 syslog
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
這個就是發送日志到linux系統的syslog,可以看看openlog函數的說明:
send messages to the system logger
這個感覺用得不多,可以查閱:
初始化當前redisServer的部分屬性
// 初始化並創建數據結構
server.current_client = NULL;
// 1
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
這個其實沒啥說的,大家看到,比如1處,這個server.clients,server是一個全局變量,維護當前redis server的各種狀態,clients呢,是用來保存當前連接到redis server的客戶端,類型為鏈表:
// 一個鏈表,保存了所有客戶端狀態結構
list *clients; /* List of active clients */
所以,這里其實就是調用listCreate()
,創建了一個空鏈表,然后賦值給clients。
其他屬性,類似。
創建常量字符串池,供復用
大家知道,redis在返回響應的時候,通常就是一句:"+OK"之類的。這個字符串,如果每次響應的時候,再去new一個,也太浪費了,所以,干脆,redis自己把這些常用的字符串,緩存了起來。
void createSharedObjects(void) {
int j;
// 常用回復
shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
...
// 常用錯誤回復
shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
"-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"));
...
}
這個和java中,把字符串字面量緩存起來,是一樣的,都是為了提高性能;java里,不是還把128以內的整數也緩存了嗎,對吧。
調整進程可以打開的最大文件數
服務器一般在真實線上環境,如果是需要應對高並發的話,可能會有幾十上百萬的客戶端,和服務器上的某個進程,建立tcp連接,而這時候,一般就需要調整進程可以打開的最大文件數(socket也是文件)。
在閱讀redis源碼之前,我知道的,修改進程可以打開的最大文件數的方式是通過ulimit,具體的,大家可以看下面這兩個鏈接:
但是,在這個源碼中,發現了另外一種方式:
- 獲取當前的指定資源的限制值的api
#define RLIMIT_NOFILE 5 /* max number of open files */
struct rlimit {
rlim_t rlim_cur;
rlim_t rlim_max;
};
struct rlimit limit;
getrlimit(RLIMIT_NOFILE,&limit)
上面這個代碼,獲取當前系統中,NOFILE(進程最大文件數)這個值的資源限制大小。
通過man getrlimit(需要先安裝,安裝方式:yum install man-pages.noarch
),可以看到:
-
setrlimit則可以設置資源的相關限制
limit.rlim_cur = f; limit.rlim_max = f; setrlimit(RLIMIT_NOFILE,&limit)
創建事件循環相關數據結構
事件循環器的結構如下:
/*
* State of an event based program
*
* 事件處理器的狀態
*/
typedef struct aeEventLoop {
// 目前已注冊的最大描述符
int maxfd; /* highest file descriptor currently registered */
// 目前已追蹤的最大描述符
int setsize; /* max number of file descriptors tracked */
// 用於生成時間事件 id
long long timeEventNextId;
// 最后一次執行時間事件的時間
time_t lastTime; /* Used to detect system clock skew */
// 已注冊的文件事件
aeFileEvent *events; /* Registered events */
// 已就緒的文件事件
aeFiredEvent *fired; /* Fired events */
// 時間事件
aeTimeEvent *timeEventHead;
// 事件處理器的開關
int stop;
// 多路復用庫的私有數據
void *apidata; /* This is used for polling API specific data */
// 在處理事件前要執行的函數
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
初始化上面這個數據結構的代碼在:aeCreateEventLoop in redis.c
上面這個結構中,主要就是:
-
apidata中,主要用於存儲多路復用庫的相關數據,每次調用多路復用庫,去進行select時,如果發現有就緒的io事件發生,就會存放到 fired 屬性中。
比如,select就是linux下,老版本的linux內核中,多路復用的一種實現,redis中,其代碼如下:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { ... // 1 retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { ... // 2 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
省略了部分代碼,其中,1處,進行select,這一步類似於java中nio的select操作;2處,將select返回的,已就緒的文件描述符,填充到fired 屬性。
-
另外,我們提到過,redis有一些后台任務,比如清理過期key,這個不是一蹴而就的;每次周期運行后台任務時,就會去清理一部分,而這里的后台任務,其實就是上面這個數據結構中的時間事件。
// 時間事件 aeTimeEvent *timeEventHead;
分配16個數據庫的內存空間
server.db = zmalloc(sizeof(redisDb) * server.dbnum);
打開listen端口,監聽請求
/* Open the TCP listening socket for the user commands. */
// 打開 TCP 監聽端口,用於等待客戶端的命令請求
listenToPort(server.port, server.ipfd, &server.ipfd_count)
這里就是打開平時的6379端口的地方。
初始化16個數據庫對應的數據結構
/* Create the Redis databases, and initialize other internal state. */
// 創建並初始化數據庫結構
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType, NULL);
server.db[j].expires = dictCreate(&keyptrDictType, NULL);
server.db[j].blocking_keys = dictCreate(&keylistDictType, NULL);
server.db[j].ready_keys = dictCreate(&setDictType, NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType, NULL);
server.db[j].eviction_pool = evictionPoolAlloc();
server.db[j].id = j;
server.db[j].avg_ttl = 0;
}
db的數據結構如下:
typedef struct redisDb {
// 數據庫鍵空間,保存着數據庫中的所有鍵值對
dict *dict; /* The keyspace for this DB */
// 鍵的過期時間,字典的鍵為鍵,字典的值為過期事件 UNIX 時間戳
dict *expires; /* Timeout of keys with a timeout set */
// 正處於阻塞狀態的鍵
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
// 可以解除阻塞的鍵
dict *ready_keys; /* Blocked keys that received a PUSH */
// 正在被 WATCH 命令監視的鍵
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
struct evictionPoolEntry *eviction_pool; /* Eviction pool of keys */
// 數據庫號碼
int id; /* Database ID */
// 數據庫的鍵的平均 TTL ,統計信息
long long avg_ttl; /* Average TTL, just for stats */
} redisDb;
這里可以看到,設置了過期時間的key,除了會在 dict 屬性存儲,還會新增一條記錄到 expires 字典。
expires字典的key:執行鍵的指針;value:過期時間。
創建pub/sub相關數據結構並初始化
// 創建 PUBSUB 相關結構
server.pubsub_channels = dictCreate(&keylistDictType, NULL);
server.pubsub_patterns = listCreate();
初始化部分統計屬性
// serverCron() 函數的運行次數計數器
server.cronloops = 0;
// 負責執行 BGSAVE 的子進程的 ID
server.rdb_child_pid = -1;
// 負責進行 AOF 重寫的子進程 ID
server.aof_child_pid = -1;
aofRewriteBufferReset();
// AOF 緩沖區
server.aof_buf = sdsempty();
// 最后一次完成 SAVE 的時間
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
// 最后一次嘗試執行 BGSAVE 的時間
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
// 服務器啟動時間
server.stat_starttime = time(NULL);
// 已使用內存峰值
server.stat_peak_memory = 0;
server.resident_set_size = 0;
// 最后一次執行 SAVE 的狀態
server.lastbgsave_status = REDIS_OK;
server.aof_last_write_status = REDIS_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
updateCachedTime();
設置時間事件對應的函數指針
/* Create the serverCron() time event, that's our main way to process
* background operations. */
// 為 serverCron() 創建時間事件
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
這里的serverCron就是一個函數,后續每次周期觸發時間事件時,就會運行這個serverCron。
可以看這里的英文注釋,作者也提到,這是主要的處理后台任務的方式。
這塊以后也會重點分析。
設置connect事件對應的連接處理器
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler, NULL)
這里的acceptTcpHandler就是處理新連接的函數:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[REDIS_IP_STR_LEN];
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask);
REDIS_NOTUSED(privdata);
while (max--) {
// accept 客戶端連接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
// 為客戶端創建客戶端狀態(redisClient)
acceptCommonHandler(cfd, 0);
}
}
創建aof文件
如果aof打開了,就需要創建aof文件。
if (server.aof_state == REDIS_AOF_ON) {
server.aof_fd = open(server.aof_filename,
O_WRONLY | O_APPEND | O_CREAT, 0644);
}
剩下的幾個,暫時不涉及的任務
// 如果服務器以 cluster 模式打開,那么初始化 cluster
if (server.cluster_enabled) clusterInit();
// 初始化復制功能有關的腳本緩存
replicationScriptCacheInit();
// 初始化腳本系統
scriptingInit();
// 初始化慢查詢功能
slowlogInit();
// 初始化 BIO 系統
bioInit();
上面的幾個,我們暫時還講解不到,先看看就行。
到此,初始化redis server,就基本結束了。
總結
本講內容較多,主要是redis啟動過程中,要做的事,也太多了。希望我已經大致講清楚了,其中,連接處理器那些都只是大致講了,后面會繼續。謝謝大家。