曹工說Redis源碼(5)-- redis server 啟動過程解析,以及EventLoop每次處理事件前的前置工作解析(下)


文章導航

Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎么才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,后續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。

曹工說Redis源碼(1)-- redis debug環境搭建,使用clion,達到和調試java一樣的效果

曹工說Redis源碼(2)-- redis server 啟動過程解析及簡單c語言基礎知識補充

曹工說Redis源碼(3)-- redis server 啟動過程完整解析(中)

曹工說Redis源碼(4)-- 通過redis server源碼來理解 listen 函數中的 backlog 參數

本講主題

本講將延續第三講的主題,將啟動過程的主體講完。為了保證閱讀體驗,避免過於突兀,可以先閱讀第三講。本講,主要講解余下的部分:

  1. 創建pid文件
  2. 加載rdb、aof,獲取數據
  3. 運行事件處理器,准備處理事件,EventLoop每次處理事件前的前置工作

創建pid文件

pid,也就是進程id,以后台模式運行時,redis會把自己的pid,寫入到一個文件中,默認的文件路徑和名稱為:/var/run/redis.pid

配置文件可配:

# When running daemonized, Redis writes a pid file in /var/run/redis.pid by
# default. You can specify a custom pid file location here.
pidfile /var/run/redis.pid

這部分代碼非常簡潔:

void createPidFile(void) {
    // 1
    FILE *fp = fopen(server.pidfile, "w");
    if (fp) {
        // 2
        fprintf(fp, "%d\n", (int) getpid());
        // 3
        fclose(fp);
    }
}
  • 1,打開文件,這里的pidfile就是前面的文件名,/var/run/redis.pid,配置文件可以對其修改。模式為w,表示將對其寫入。
  • 2,調用pid,獲取當前進程的pid,寫入該文件描述符
  • 3,關閉文件。

加載rdb、aof

在啟動時,會檢查aof和rdb選項是否打開,如果打開,則會去加載數據,這里要注意的是,redis總是先查看是否有 aof 開關是否打開;打開的話,則直接使用 aof;

如果 aof 沒打開,則去加載 rdb 文件。

void loadDataFromDisk(void) {
    // 記錄開始時間
    long long start = ustime();

    // AOF 持久化已打開
    if (server.aof_state == REDIS_AOF_ON) {
        // 嘗試載入 AOF 文件
        if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
            // 打印載入信息,並計算載入耗時長度
            redisLog(REDIS_NOTICE, "DB loaded from append only file: %.3f seconds",
                     (float) (ustime() - start) / 1000000);
        // AOF 持久化未打開
    } else {
        // 嘗試載入 RDB 文件
        if (rdbLoad(server.rdb_filename) == REDIS_OK) {
            // 打印載入信息,並計算載入耗時長度
            redisLog(REDIS_NOTICE, "DB loaded from disk: %.3f seconds",
                     (float) (ustime() - start) / 1000000);
        }
    }
}

加載的過程,現在來講,不太合適,比如以aof為例,aof文件中存儲了一條條的命令,加載 aof 文件的過程,其實就會在進程內部創建一個 fake client(源碼中就是這樣命名,也就是一個假的客戶端),來一條條地發送 aof 文件中的命令進行執行。

這個命令執行的過程,現在講會有點早,所以 aof 也放后面吧,講了命令執行再回頭看這塊。

事件循環結構體講解

核心流程如下:

    // 1
    aeSetBeforeSleepProc(server.el, beforeSleep);
    // 2
    aeMain(server.el);
  • 先看2處,這里傳入server這個全局變量中的el屬性,該屬性就代表了當前事件處理器的狀態,其定義如下:

        // 事件狀態
        aeEventLoop *el;
    

    el,實際就是EventLoop的簡寫;結構體 aeEventLoop,里面維護了:當前使用的多路復用庫的函數、當前注冊到多路復用庫,在發生讀寫事件時,需要被通知的socket 文件描述符、以及其他一些東西。

    typedef struct aeEventLoop {
    
        // 目前已注冊的最大描述符
        int maxfd;   /* highest file descriptor currently registered */
    
        // 目前已追蹤的最大描述符
        int setsize; /* max number of file descriptors tracked */
    
        // 用於生成時間事件 id
        long long timeEventNextId;
    
        // 最后一次執行時間事件的時間
        time_t lastTime;     /* Used to detect system clock skew */
    
        // 1 已注冊的文件事件
        aeFileEvent *events; /* Registered events */
    
        // 2 已就緒的文件事件
        aeFiredEvent *fired; /* Fired events */
    
        // 3 時間事件
        aeTimeEvent *timeEventHead;
    
        // 事件處理器的開關
        int stop;
    
        // 4 多路復用庫的私有數據
        void *apidata; /* This is used for polling API specific data */
    
        // 5 在處理事件前要執行的函數
        aeBeforeSleepProc *beforesleep;
    
    } aeEventLoop;
    
    • 1處,注冊到多路復用庫,需要監聽的socket 文件描述符事件,比如,某socket的可讀事件;

    • 2處,以select或者epoll這類多路復用庫為例,在一次 select 中,如果發現某些socket事件已經滿足,則,這些ready的事件,會被存放到本屬性中。

      因為我的描述比較抽象,這里拿一段 man select中的說明給大家看下:

      select() allow  a  program to monitor multiple file descriptors, waiting until one or more of the file descriptors become "ready" for some class of I/O operation (e.g., input possible).  A file descriptor is considered ready if it is possible to perform the corresponding I/O  operation  (e.g., read(2)) without blocking.
      

      直譯一下:select() 允許一個程序去監聽多個文件描述符,等待直到1個或多個文件描述符變成 ready狀態,該狀態下,可以不阻塞地讀寫該文件描述符。

    • 3處,事件事件,主要用來周期執行,執行一些redis的后台任務,如刪除過期key,后面細講。

    • 4處,指向當前正在使用的多路復用庫的相關數據,目前redis支持:select、epoll、kqueue、evport

    • 5處,在處理事件前,要執行的一個函數

再回頭來看前面的代碼:

// 1    
aeSetBeforeSleepProc(server.el, beforeSleep);
aeMain(server.el);

這里的1處,就是設置前面第5點提到的,設置處理事件前,先要執行的一個函數。

事件循環處理器的主循環

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件處理前執行的函數,那么運行它
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        // 開始處理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

可以看到,一共2個部分,首先執行eventLoop的事件處理前要執行的函數;接着再開始處理事件。

事件處理前的前置執行函數

這里講解下面這一句:

    eventLoop->beforesleep(eventLoop);

這個函數,在前面已經看到了,被賦值為:

    aeSetBeforeSleepProc(server.el, beforeSleep);

這個 beforeSleep如下:

void beforeSleep(struct aeEventLoop *eventLoop) {

    /* Run a fast expire cycle (the called function will return
     * ASAP if a fast cycle is not needed). */
    // 1 執行一次快速的主動過期檢查
    if (server.active_expire_enabled && server.masterhost == NULL)
        activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);

	// 2
    ...

    /* Write the AOF buffer on disk */
    // 3 將 AOF 緩沖區的內容寫入到 AOF 文件
    flushAppendOnlyFile(0);

    /* Call the Redis Cluster before sleep function. */
    // 在進入下個事件循環前,執行一些集群收尾工作
    if (server.cluster_enabled) clusterBeforeSleep();
}
  • 1,這里會去執行主動的過期檢查,大致流程代碼如下:

    void activeExpireCycle(int type) {
        /* This function has some global state in order to continue the work
         * incrementally across calls. */
        // 靜態變量,用來累積函數連續執行時的數據
        static unsigned int current_db = 0; /* Last DB tested. */
        ...
    
        unsigned int j, iteration = 0;
        // 默認每次處理的數據庫數量
        unsigned int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
        // 函數開始的時間
        long long start = ustime(), timelimit;
    
        dbs_per_call = server.dbnum;
    
        timelimit = 1000000 * ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC / server.hz / 100;
        timelimit_exit = 0;
        if (timelimit <= 0) timelimit = 1;
    
        // 1 遍歷數據庫
        for (j = 0; j < dbs_per_call; j++) {
            int expired;
            // 指向要處理的數據庫
            redisDb *db = server.db + (current_db % server.dbnum);
            current_db++;
    
            do {
                unsigned long num, slots;
                long long now, ttl_sum;
                int ttl_samples;
    
                /* If there is nothing to expire try next DB ASAP. */
                // 2 獲取數據庫中帶過期時間的鍵的數量 如果該數量為 0 ,直接跳過這個數據庫
                if ((num = dictSize(db->expires)) == 0) {
                    db->avg_ttl = 0;
                    break;
                }
                // 3 獲取數據庫中鍵值對的數量
                slots = dictSlots(db->expires);
                // 當前時間
                now = mstime();
    
                // 每次最多只能檢查 LOOKUPS_PER_LOOP 個鍵
                if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
                    num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
    
                // 4 開始遍歷數據庫
                while (num--) {
                    dictEntry *de;
                    long long ttl;
    
                    // 從 expires 中隨機取出一個帶過期時間的鍵
                    if ((de = dictGetRandomKey(db->expires)) == NULL) break;
                    // 計算 TTL
                    ttl = dictGetSignedIntegerVal(de) - now;
                    // 5 如果鍵已經過期,那么刪除它,並將 expired 計數器增一
                    if (activeExpireCycleTryExpire(db, de, now)) expired++;
                }
    
                // 6 為這個數據庫更新平均 TTL 統計數據
                ...
                    
                // 更新遍歷次數
                iteration++;
    
                // 7 每遍歷 16 次執行一次
                if ((iteration & 0xf) == 0 && /* check once every 16 iterations. */
                    (ustime() - start) > timelimit) {
                    // 如果遍歷次數正好是 16 的倍數
                    // 並且遍歷的時間超過了 timelimit
                    // 那么斷開 timelimit_exit
                    timelimit_exit = 1;
                }
    
                // 8 已經超時了,返回
                if (timelimit_exit) return;
    
                /* We don't repeat the cycle if there are less than 25% of keys
                 * found expired in the current DB. */
                // 如果已刪除的過期鍵占當前總數據庫帶過期時間的鍵數量的 25 %
                // 那么不再遍歷
            } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP / 4);
        }
    }
    

    這個函數,刪減了一部分,留下了主流程:

    • 1處,遍歷數據庫,一般就是遍歷16個庫
    • 2處,獲取當前庫中,過期鍵的數量,過期鍵都存儲在db->expires中,只需要算這個map的size即可;如果沒有要過期的,處理下一個庫
    • 3處,獲取過期鍵的數量
    • 4處,開始遍歷當前數據庫的過期鍵,最多遍歷20次,這里的num,被ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP賦值,這個值定義為20,也就是說,每次掃描一個庫中,20個過期鍵
    • 5處,如果鍵已過期,則將這個key過期掉,比如從當前數據庫刪除,發布事件等等
    • 6處,計算一些統計數據
    • 7處,遍歷16次,檢查下是否已經執行了足夠長的時間;因為redis是單線程的,不能一直執行過期鍵清理任務,還要處理客戶端請求呢,所以,這里每執行16次循環,就檢查下時間,看看是否已經超時,超時直接返回。
    • 8處,超時返回
  • 講完了主動過期,接着講前面的流程,2處,涉及一些主從復制相關的東西,這塊放到后面吧

  • 3處,將 aof 從緩存中,刷到磁盤

    這個方法比較長,在后面分段講解

刷新aof緩存到磁盤的執行過程

  • 判斷是否有正在進行中的任務
void flushAppendOnlyFile(int force) {
    ssize_t nwritten;
    int sync_in_progress = 0;

    // 緩沖區中沒有任何內容,直接返回
    if (sdslen(server.aof_buf) == 0) return;

    // 策略為每秒 FSYNC 
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        //1  是否有 SYNC 正在后台進行?
        sync_in_progress = bioPendingJobsOfType(REDIS_BIO_AOF_FSYNC) != 0;

1處,會去判斷一個全局變量,該變量是一個隊列,用於存儲后台任務。另外一個后台線程(沒錯,redis不是單純的單線程,還是有其他線程的),會去該隊列取任務,取不到就阻塞;取到了則執行。而刷新 aof 到磁盤這種重io的工作,就是被封裝為一個任務,丟到這個隊列中的。所以,這里去判斷隊列的大小是否為0.

/* Return the number of pending jobs of the specified type. 
 *
 * 返回等待中的 type 類型的工作的數量
 */
unsigned long long bioPendingJobsOfType(int type) {
    unsigned long long val;

    pthread_mutex_lock(&bio_mutex[type]);
  	// 1
    val = bio_pending[type];
    pthread_mutex_unlock(&bio_mutex[type]);

    return val;
}

1處這里的val,就是存儲指定類型的任務的數量。我們這里傳入的type為 REDIS_BIO_AOF_FSYNC,所以就是看看:aof 刷盤的任務數量。

  • 調用write函數執行寫入

        // 1
    	nwritten = write(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
        if (nwritten != (signed)sdslen(server.aof_buf)) {
          // 2
          ...
        }else{
            // 3
            /* Successful write(2). If AOF was in error state, restore the
             * OK state and log the event. */
            // 寫入成功,更新最后寫入狀態
            if (server.aof_last_write_status == REDIS_ERR) {
                redisLog(REDIS_WARNING,
                    "AOF write error looks solved, Redis can write again.");
                server.aof_last_write_status = REDIS_OK;
            }
        }
    
    • 1處,執行寫入,將server.aof_buf這個緩沖區的內容,寫入aof文件,寫入的字節長度為sdslen(server.aof_buf)。也就是,將整個緩沖區寫入。

    • 2處,如果寫入的長度,不等於緩沖區的長度,表示只寫了一部分,進入異常分支

      為什么寫入的會比預期的少,我們看看官方說明:

      write() writes up to count bytes from the buffer pointed buf to the file referred to by the file descriptor fd.
      
      The  number of bytes written may be less than count if, for example, there is insufficient space on the underlying physical medium, or the RLIMIT_FSIZE resource limit is encountered (see setrlimit(2)), or the call was interrupted by a signal handler after having written less than count bytes.  (See also pipe(7).)
      

      這里的第二段就說了,可能是因為底層物理介質的空間不夠;進程的資源限制;或者被中斷。

    • 3處,寫入成功;更新狀態,如果上一次aof寫入狀態為error,這次改為ok

  • flush到磁盤

    前面write是寫入到操作系統的os cache中,但是還沒有落盤。必須執行flush之后,才會刷盤。

    	// 總是執行 fsnyc
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
            /* aof_fsync is defined as fdatasync() for Linux in order to avoid
             * flushing metadata. */
            // 1
            aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
    
            // 更新最后一次執行 fsnyc 的時間
            server.aof_last_fsync = server.unixtime;
    
        // 策略為每秒 fsnyc ,並且距離上次 fsync 已經超過 1 秒
        } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
                    server.unixtime > server.aof_last_fsync)) {
            // 2 放到后台執行
            if (!sync_in_progress) aof_background_fsync(server.aof_fd);
            // 更新最后一次執行 fsync 的時間
            server.aof_last_fsync = server.unixtime;
        }
    
    • 1處,如果aof策略為:AOF_FSYNC_ALWAYS,則調用fsync,刷盤

    • 2處,如果策略為每秒刷盤:AOF_FSYNC_EVERYSEC,放到后台去刷盤。這里的放到后台,就是放到前面提到的任務隊列中,由其他線程去刷。

      void aof_background_fsync(int fd) {
          bioCreateBackgroundJob(REDIS_BIO_AOF_FSYNC,(void*)(long)fd,NULL,NULL);
      }
      void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
          struct bio_job *job = zmalloc(sizeof(*job));
      
          job->time = time(NULL);
          job->arg1 = arg1;
          job->arg2 = arg2;
          job->arg3 = arg3;
      
          pthread_mutex_lock(&bio_mutex[type]);
      
          // 1 將新工作推入隊列
          listAddNodeTail(bio_jobs[type],job);
          bio_pending[type]++;
      
          pthread_cond_signal(&bio_condvar[type]);
      
          pthread_mutex_unlock(&bio_mutex[type]);
      }
      

      這里的1處,可以看到,將任務丟到了隊列中,且前后進行了加鎖。因為這個隊列,是會被其他線程訪問的,所以為了線程安全,進行了加鎖。

todo

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        // 如果有需要在事件處理前執行的函數,那么運行它
        if (eventLoop->beforesleep != NULL)
            // 1
            eventLoop->beforesleep(eventLoop);

        // 2開始處理事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

啟動做的事實在太多了,本篇把1這里的這個函數講了,下篇才能講2.

總結

本篇主要講了,redis啟動過程中,主循環的大流程,以及在主循環去處理一個事件之前,要執行的任務。這個主循環如何處理事件,放到下篇繼續。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM