redis源碼筆記(一) —— 從redis的啟動到command的分發


本作品采用知識共享署名 4.0 國際許可協議進行許可。轉載聯系作者並保留聲明頭部與原文鏈接https://luzeshu.com/blog/redis1
本博客同步在http://www.cnblogs.com/papertree/p/7159802.html


這個系列博客大部分完成於一年前,基於3.0.5版本(但是代碼行數不一定完全相符,調試過程中會修改一些代碼)。

這一篇博客針對第二篇涉及到的redisClient、redisDb、redisObject(robj)等幾個結構體,以及redis程序的啟動到循環、到分發command來進行講解。

看redis源碼時有個很深的感觸就是c語言雖然不是“面向對象編程語言”,原生不支持類、繼承等面向對象編程語言的概念,但不影響在c語言上運用“面向對象編程思想”進行開發。比如很多模塊會定義一個結構體,還有相關的一系列函數,這些函數使用該結構體的指針類型作為第一個參數,實際上這是模擬了this指針的做法。可以把這些函數和結構體看成一個類的方法和成員。

1.1 redis的啟動到進入等待 / aeEventLoop結構體

我們知道redis也是一個普通的服務端程序,監聽6379(默認)端口。從main函數啟動,最終進入事件驅動庫進行循環等待。比如node的libuv。那么redis自己實現了一個簡單的事件驅動庫,放在ae.c文件。並且對系統層面支持的IO復用接口進行了封裝,比如epoll(linux)、kqueue(OS X、FreeBSD等)、evport(Solaris 10等)、select。來看下圖,可以知道當系統不支持其他IO復用接口時,默認使用了select模型。

圖1-1-1

看到這段代碼,產生一個疑問,我們都知道windows下最高效的IO復用模型應該屬IOCP了,比如node使用的libuv庫,就對IOCP進行了封裝。但ae.c里面看到,redis不支持IOCP?於是針對這個問題,筆者google了一下,發現了兩個有意思的鏈接。

http://www.oschina.net/news/23944/redis-deny-microsoft-windows-fixpack
http://oldblog.antirez.com/post/redis-win32-msft-patch.html

大體情況就是redis原生不支持IOCP,於是微軟采用libuv把redis移植到了windows,在github上給redis提交了補丁。但redis作者拒絕將此補丁加入主干代碼。第二個鏈接是redis作者對此的解釋。

回歸正題,那么可以看到redis從啟動到進入等待的過程並不復雜。redis.c/main() -> ae.c/aeMain() -> (ae_epoll.c、ae_kqueue.c、ae_select.c等)/aeApiPoll(),看下圖:

圖1-1-2

1.1.1 redisServer結構體

看一下redisServer的結構體定義:

圖1-1-3

注意幾個成員,與稍后講解有關:

aeEventLoop *el:這里表示的就是一個事件驅動庫的結構體
int ipfd[REDIS_BINDADDR_MAX]:redis服務監聽的socket fd
int ipfd_count:ipfd的計數成員

redis有一個全局的變量struct redisServer server,保存了當前redisServer的各種信息,包括aeEventLoop類型的server.el成員等。
當main函數調用了ae事件驅動庫的aeMain()時,傳了server.el,這里就是前面說的面向對象編程思想的做法了,server.el充當一個this指針。
當進入ae.c模塊時,我們來看看aeEventLoop結構體。

1.1.2 aeEventLoop結構體

看一下 aeEventLoop結構體和幾個相關的結構體、函數指針類型的定義:

圖1-1-4

來看幾個關鍵成員及相關的結構體,以及與之相關的方法:

1.1.2.1 void *apidata與aeApiState結構體與aeApiCreate()

從下圖1-1-5里的aeApiCreate()函數里面可以看到,apidata實際上放的是一個aeApiState結構體指針,可以看到ae_epoll.c(圖1-1-5左)、ae_vport.c(圖1-1-5右)分別對aeApiState有不同的結構體定義,實際上是對不同操作系統(不同復用接口)的封裝。

按照上面說的“面向對象編程思想”,aeApiState結構體相關的方法的“this指針”應該是aeApiState指針。
可以從圖1-1-5中看到aeApiCreate()、aeApiResize()等幾個跟aeApiState結構體相關的方法的定義,發現他們的“this指針”都是aeEventLoop*類型,而不是aeApiState*,當方法內部訪問aeApiState時,通過eventLoop->apidata去訪問。
注意到這幾個方法內部(比如aeApiAddEvent),並不都僅僅只是使用了eventLoop->apidata,同時也訪問了eventLoop的其他成員,所以這里使用aeEventLoop*作為“this指針”是合理的。

圖1-1-5

1.1.2.2 events成員(aeFileEvent結構體的動態數組,以fd為索引)與aeCreateFileEvent()

aeCreateFileEvent()是aeEventLoop的一個方法成員,通過該方法,往aeEventLoop的events里添加一個aeFileEvent對象,可以看到圖1-1-4的定義。可以看出aeFileEvent實際上代表的是一個事件handler,封裝了事件的回調函數,以及對應的clientData。當epoll_wait()監聽的fd有事件到來時,該對象被取出,回調函數被執行,clientData被回傳。圖1-1-4中的兩個函數指針定義,就是該回調函數的類型。

這里舉兩個關鍵的使用位置:

1. 監聽socket的回調函數

在main函數開始后,initServer的時候,會調用aeCreateFileEvent(),把server.ipfd[]中監聽的fd依次創建一個aeFileEvent對象,響應函數為(aeFileProc*) acceptTcpHandler,加進事件驅動庫,並添加到 server.el->events 成員里面,以fd為數組索引下標。注意了此時的clientData是NULL的,看一下此處的代碼:

圖1-1-6

2. 連接socket的回調函數

當有連接到來的時候,acceptTcpHandler被觸發,此時redis創建了一個redisClient的對象,並同樣調用了aeCreateFileEvent(),把相應的回調函數(aeFileProc*) readQueryFromClient同樣封裝成aeFileEvent對象,加進事件驅動庫,添加到server.el->events成員里面,以fd為索引下標,此時的clientData是對應的redisClient對象,這個redisClient標識了一個客戶端的連接,redisClient結構體、以及readQeuryFromClient如何分發處理command的詳細介紹在1.2.3節。

來看一下對應的調用代碼:

圖1-1-7

可以看到networking.c文件里面,acceptTcpHandler、readQueryFromClient都是aeFileProc類型的回調函數。

*1.1.2.3 aeCreateTimeEvent()方法與timeEventHead成員(aeTimeEvent結構體的鏈表頭,所以aeTimeEvent存在next成員)

這里額外講多一個結構體類型,不在本篇博客“從啟動到進入等待、從接收連接到分發命令”的主線,但是在第三篇博客《redis源碼筆記(三) —— redis的哨兵模式以及高可用性》的3.2節里面會用到。

我們知道server進入epoll_wait()之后會進入等待,但是事實上redis-server是不斷被定時喚醒的,因為它后台有一個定時任務函數 —— serverCron。這個后台執行任務被封裝在aeTimeEvent對象里面,aeEventLoop對象(server.el)通過自身的aeCreateTimeEvent()方法去往自身的timeEventHead鏈表添加這樣一個對象。在圖1-1-6中可以看到initServer里面有aeCreateTimeEvent這么一個過程。

這里需要講的是:這個后台任務是如何被周期性執行的,還有執行周期是什么。

看到圖1-1-8中aeCreateTimeEvent的定義,看到第二個參數milliseconds,再看到圖1-1-6里面initServer添加serverCron時該參數為1,不要誤以為這個后台任務就是執行周期為1ms。

圖1-1-8

先看到aeProcessEvents,每次進入aeApiPoll()前,aeProcessEvents都會調用aeSearchNearestTimer從eventLoop->timeEventHead 去找到第一個aeTimeEvent對象,通過該對象的when_sec和when_ms去計算下一次監聽中斷的時長。

圖1-1-9

那么當上面根據eventLoop->timeEventHead計算的最短時長到達后,aeApiPoll返回,執行processTimeEvents,對eventLoop->timeEventHead里面所有過時了的aeTimeEvent對象進行“執行回調”,看代碼:

圖1-1-10

那么可以看到這個回調函數,大多數情況下就是上面的后台任務函數serverCron。根據該函數返回的retval,加上當前時間並更新到當前的aeTimeEvent對象的時間成員上面,那么就是說,這個后台任務的執行周期,是由該后台任務的返回值決定的,如果該函數返回了AE_NOMORE,那么這個aeTimeEvent對象就會從eventLoop->timeEventHead鏈表里面刪除。

來看看serverCron函數的返回值:

圖1-1-11

可以看出serverCron返回的是一個變量,1000/server.hz,這個hz就是頻率的意思(還記得物理里面的單位嗎,時間的倒數就是頻率),比如頻率為10,那么1000ms里面10次的間隔就是100ms。這個server.hz 可以通過配置文件redis.conf里面的hz 選項進行設置。

另外注意到上面的run_with_period這個宏定義。這個比如run_with_period(100) {} 限制了該代碼塊的“最小周期”是100ms,比如說,你的server.hz 是2,那么你的serverCron周期是500ms,那么周期大於100ms可以接受,每次執行serverCron時run_with_period(100)的代碼塊都會被執行。如果server.hz 是20,那么serverCron周期是50ms,那么周期小於100ms了,run_with_period(100){}的代碼塊會根據server.cronloops的計數來判斷,每兩次serverCron執行一次,如果server.hz是100,serverCron周期是10ms,那么每10次serverCron執行一次代碼塊,保證run_with_period(100)里面的代碼真的是每100ms執行一次。

那么回到上面aeCreateFileEvent的第二個參數milliseconds、以及圖1-1-6在initServer時調用這個時候傳的“1”是指什么呢?其實跟processTimeEvents每次執行serverCron后拿到下一個周期的監聽時長、添加到當前的aeTimeEvent對象上一樣,這個initServer調用aeCreateFileEvent時傳的“1”也表示下一個周期的監聽時長,也就是這個aeTimeEvent封裝的serverCron第一次被執行應該是在當前時間的1ms之后,而隨后的周期性執行才是根據serverCron本身返回的值去決定下一個周期監聽時長。

但是這里注意的是,serverCron第一次被執行也往往不是在1ms之后,我們看到圖1-1-9的378到385這幾行代碼。在進入aeApiPoll前會進行計算下一個周期監聽時長,計算方式就是從eventLoop->timeEventHead取出最近的那個aeTimeEvent,減去當前時間。但是當initServer執行aeCreateFileEvent()到這幾行代碼的時候,往往歷經了幾毫秒,那么這個最近的aeTimeEvent的時間已經過期了幾毫秒。那么從上面的計算方式可以發現,tvp表示的應該是{900+毫秒,-1秒},但是第384行代碼會把負值的秒清零。所以往往第一次serverCron的調用會是在900+毫秒之后。

小結:
通過上面幾個結構體和相關方法的講解,我們大概知道了從main函數啟動,到進入監聽等待的過程中,涉及到的相關結構體及方法。

下面來看一下從接收到客戶端的連接請求、到command的分發過程。



1.2 從客戶端的連接請求到command的分發

從1.1節看到,與“對客戶端的連接請求處理”相關的是aeFileEvent結構體,當tcp連接請求到來時,acceptTcpHandler被調用,並針對該tcp連接創建一個新的aeFileEvent對象,用於處理后續到來的command,這個新建的aeFileEvent對象的回調函數是readQueryFromClient。

1.2.1 接收客戶端連接請求 / acceptTcpHandler()

上面1.1.2.2節對acceptTcpHandler里面如何創建一個aeFileEvent對象(clientData*為redisClient指針,回調函數為readQueryFromClient)講的很清楚。

1.2.2 接收客戶端的命令 / readQueryFromClient()

上面1.1.2.2節也說了,當epoll_wait()監聽的fd有事件到來時,該對象被取出,回調函數被執行,clientData被回傳。
當建立的tcp連接有數據到來時,調用回調函數readQueryFromClient(),並把clientData回傳(即privdata參數),實際上clientData就是針對該連接的redisClient對象。

可以看到這里,幾乎都是對redisClient的操作。這里結合redisClient的結構體及相關的方法,來對這個流程進行講解。

首先看源碼:

1154 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1155     redisClient *c = (redisClient*) privdata;
1156     int nread, readlen;
1157     size_t qblen;
1158     REDIS_NOTUSED(el);
1159     REDIS_NOTUSED(mask);
1160
1161     server.current_client = c;
1162     readlen = REDIS_IOBUF_LEN;
1163     /* If this is a multi bulk request, and we are processing a bulk reply
1164      * that is large enough, try to maximize the probability that the query
1165      * buffer contains exactly the SDS string representing the object, even
1166      * at the risk of requiring more read(2) calls. This way the function
1167      * processMultiBulkBuffer() can avoid copying buffers to create the
1168      * Redis Object representing the argument. */
1169     if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
1170         && c->bulklen >= REDIS_MBULK_BIG_ARG)
1171     {
1172         int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
1173
1174         if (remaining < readlen) readlen = remaining;
1175     }
1176
1177     qblen = sdslen(c->querybuf);
1178     if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
1179     c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
1180     nread = read(fd, c->querybuf+qblen, readlen);
1181     if (nread == -1) {
1182         if (errno == EAGAIN) {
1183             nread = 0;
1184         } else {
1185             redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
1186             freeClient(c);
1187             return;
1188         }
1189     } else if (nread == 0) {
1190         redisLog(REDIS_VERBOSE, "Client closed connection");
1191         freeClient(c);
1192         return;
1193     }
1194     if (nread) {
1195         sdsIncrLen(c->querybuf,nread);
1196         c->lastinteraction = server.unixtime;
1197         if (c->flags & REDIS_MASTER) c->reploff += nread;
1198         server.stat_net_input_bytes += nread;
1199     } else {
1200         server.current_client = NULL;
1201         return;
1202     }
1203     if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
1204         sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
1205
1206         bytes = sdscatrepr(bytes,c->querybuf,64);
1207         redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
1208         sdsfree(ci);
1209         sdsfree(bytes);
1210         freeClient(c);
1211         return;
1212     }
1213     processInputBuffer(c);
1214     server.current_client = NULL;
1215 }

第1180代碼讀取當前TCP連接收到的數據,這些數據正是redis命令行的TCP數據格式。

在一個窗口“gdb src/redis-server”,並且“break networking.c:1180”,然后“run”。
tmux開另一個pane,運行“src/redis-cli”,然后輸入“keys *”命令。

此時gdb會在斷點的地方停下,然后“next”,查看 redisClient的querybuf成員。

gdb$ p c->querybuf
$8 = (sds) 0x7ffff0121008 "*2\r\n$4\r\nkeys\r\n$1\r\n*\r\n"

這便是redis-server接收到客戶端的命令的最原始的數據(當然還有更原始的mac層、ip層的數據包是由系統處理的)。
關於redis命令交互的協議,文檔上有詳細介紹: https://redis.io/topics/protocol

最后readQeuryFromClient()->processInputBuffer(c)->processCommand() 進行command的分發和處理。

processCommand() 在src/redis.c 里面。同時,該文件里面有一個全局表維護着命令與對應的處理函數:

 123 struct redisCommand redisCommandTable[] = {
 124     {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},
 125     {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},
 126     {"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},
 127     {"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},
 128     {"psetex",psetexCommand,4,"wm",0,NULL,1,1,1,0,0},
 129     {"append",appendCommand,3,"wm",0,NULL,1,1,1,0,0},
 130     {"strlen",strlenCommand,2,"rF",0,NULL,1,1,1,0,0},
 131     {"del",delCommand,-2,"w",0,NULL,1,-1,1,0,0},
 132     {"exists",existsCommand,-2,"rF",0,NULL,1,-1,1,0,0},
 133     {"setbit",setbitCommand,4,"wm",0,NULL,1,1,1,0,0},
 134     {"getbit",getbitCommand,3,"rF",0,NULL,1,1,1,0,0},
 135     {"setrange",setrangeCommand,4,"wm",0,NULL,1,1,1,0,0},
 136     {"getrange",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
 137     {"substr",getrangeCommand,4,"r",0,NULL,1,1,1,0,0},
 138     {"incr",incrCommand,2,"wmF",0,NULL,1,1,1,0,0},
 139     {"decr",decrCommand,2,"wmF",0,NULL,1,1,1,0,0},
 140     {"mget",mgetCommand,-2,"r",0,NULL,1,-1,1,0,0},
 141     {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
 142     {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
 143     {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
 144     {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
 145     {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
 146     {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
 147     {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
 148     {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
 149     {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
 150     {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
 151     {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
 152     {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
 153     {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
 154     {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
 155     {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
 156     {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
 157     {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
 158     {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
 159     {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
 160     {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
 161     {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
......
 287 };


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM