redis學習筆記(五): serverCron


serverCron是redis里主要的定時處理函數,在initServer中通過調用aeCreateTimeEvent,將serverCron做為callback注冊到全局的eventLoop結構當中。它在主循環中的位置:

aeMain
{
    while (!stop)
    {
        beforeSleep
        aeApiPoll
        process file events
        /* process time events */
        for each timeEntry(te) in eventLoop
        {
            retval = te->timeProc()    /* 這里面timeProc就是serverCron */
            if (NOMORE == retval)
                Delete time entry from eventLoop
            else
                aeAddMillisecondsToNow(te, retval) /* te下一次觸發的時隔更新為retval */
        }
    }
}

看serverCron的實現之前先看這個run_with_period的定義:

#define run_with_period(_ms_) \
  if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))

由它的定義,run_with_period(_ms_)會在兩種情況下返回1:

1. _ms_ <= 1000/server.hz,就是說_ms_比serverCron的執行間隔要小。

2. 或者_ms_比serverCron的執行間隔要大並且serverCron執行的次數剛好是_ms_/(1000/server.hz)的整數倍。

server.hz的意義是serverCron在一秒內執行的次數(從redis的實現來看,這個值是以ms為最小單位來計算的),那么1000/server.hz就是serverCron的執行間隔(ms),再結合run_with_period的定義可以看出,run_with_period表示每_ms_毫秒執行一段任務。

舉個例子來說,server.hz是100,也就是servreCron的執行間隔是10ms(可能不完全精確,畢竟是單線程順序執行)。

假如有一些任務需要每500ms執行一次,就可以在serverCron中用run_with_period(500)把每500ms需要執行一次的工作控制起來。所以,serverCron每執行到第500/10次,run_with_period(500)就會返回1

serverCron的實現如下:

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    int j;
    REDIS_NOTUSED(eventLoop);
    REDIS_NOTUSED(id);
    REDIS_NOTUSED(clientData);

    /* Software watchdog: deliver the SIGALRM that will reach the signal
     * handler if we don't return here fast enough. */
    /* 用SIGALRM信號觸發watchdog的處理過程,具體的函數為watchdogSignalHandler */
    if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);

    /* Update the time cache. */
    /* 更新server.unixtime和server.mstime */
    updateCachedTime();

    /* 每100ms更新一次統計量,包括這段時間內的commands, net_input_bytes, net_output_bytes */
    run_with_period(100) {
        trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands);
        trackInstantaneousMetric(REDIS_METRIC_NET_INPUT,
                server.stat_net_input_bytes);
        trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT,
                server.stat_net_output_bytes);
    }

    /* We have just REDIS_LRU_BITS bits per object for LRU information.
     * So we use an (eventually wrapping) LRU clock.
     *
     * Note that even if the counter wraps it's not a big problem,
     * everything will still work but some object will appear younger
     * to Redis. However for this to happen a given object should never be
     * touched for all the time needed to the counter to wrap, which is
     * not likely.
     *
     * Note that you can change the resolution altering the
     * REDIS_LRU_CLOCK_RESOLUTION define. */
     /* 根據server.lruclock的定義,getLRUClock返回的是當前時間換算成秒數的低23位 */
    server.lruclock = getLRUClock();

    /* Record the max memory used since the server was started. */
    /* 記錄最大內存使用情況 */
    if (zmalloc_used_memory() > server.stat_peak_memory)
        server.stat_peak_memory = zmalloc_used_memory();

    /* Sample the RSS here since this is a relatively slow call. */
    /* 記錄當前的RSS值 */
    server.resident_set_size = zmalloc_get_rss();

    /* We received a SIGTERM, shutting down here in a safe way, as it is
     * not ok doing so inside the signal handler. */
    /* 如果收到了SIGTERM信號,嘗試退出 */
    if (server.shutdown_asap) {
        if (prepareForShutdown(0) == REDIS_OK) exit(0);
        redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information");
        server.shutdown_asap = 0;
    }

    /* Show some info about non-empty databases */
    /* 每5秒輸出一次非空databases的信息到log當中 */
    run_with_period(5000) {
        for (j = 0; j < server.dbnum; j++) {
            long long size, used, vkeys;

            size = dictSlots(server.db[j].dict);
            used = dictSize(server.db[j].dict);
            vkeys = dictSize(server.db[j].expires);
            if (used || vkeys) {
                redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
                /* dictPrintStats(server.dict); */
            }
        }
    }

    /* Show information about connected clients */
    /* 如果不是sentinel模式,則每5秒輸出一個connected的client的信息到log */
    if (!server.sentinel_mode) {
        run_with_period(5000) {
            redisLog(REDIS_VERBOSE,
                "%lu clients connected (%lu slaves), %zu bytes in use",
                listLength(server.clients)-listLength(server.slaves),
                listLength(server.slaves),
                zmalloc_used_memory());
        }
    }

    /* We need to do a few operations on clients asynchronously. */
    /* 清理空閑的客戶端或者釋放query buffer中未被使用的空間 */
    clientsCron();

    /* Handle background operations on Redis databases. */
    /* databases的處理,rehash就在這里 */
    databasesCron();

    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    /* 如果開啟了aof_rewrite的調度並且當前沒有在background執行rdb/aof的操作,則進行background的aof操作 */
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

    /* Check if a background saving or AOF rewrite in progress terminated. */    
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        /* 如果有aof或者rdb在后台進行,則等待對應的退出。注意,這里用了WNOHANG,所以不會阻塞在wait3 */
        int statloc;
        pid_t pid;

        /* wait3返回非0值,要么是子進程退出,要么是出錯 */
        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;

            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            /* 如果是出錯,在log中記錄這次錯誤
             * 如果是rdb任務退出,調用backgroundSaveDoneHandler進行收尾工作
             * 如果是aof任務退出,調用backgroundRewriteDoneHandler進行收尾工作
             */
            if (pid == -1) {
                redisLog(LOG_WARNING,"wait3() returned an error: %s. "
                    "rdb_child_pid = %d, aof_child_pid = %d",
                    strerror(errno),
                    (int) server.rdb_child_pid,
                    (int) server.aof_child_pid);
            } else if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            /* 如果當前有rdb/aof任務在處理,則將dict_can_resize設置為0(表示不允許進行resize),否則,設置為1 */
            updateDictResizePolicy();
        }
    } else {
        /* 當前沒有rdb/aof任務在執行,這里來判斷是否要開啟新的rdb/aof任務 */
        /* If there is not a background saving/rewrite in progress check if
         * we have to save/rewrite now */
         for (j = 0; j < server.saveparamslen; j++) {
            struct saveparam *sp = server.saveparams+j;

            /* Save if we reached the given amount of changes,
             * the given amount of seconds, and if the latest bgsave was
             * successful or if, in case of an error, at least
             * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
            if (server.dirty >= sp->changes &&
                server.unixtime-server.lastsave > sp->seconds &&
                (server.unixtime-server.lastbgsave_try >
                 REDIS_BGSAVE_RETRY_DELAY ||
                 server.lastbgsave_status == REDIS_OK))
            {
                redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
                    sp->changes, (int)sp->seconds);
                rdbSaveBackground(server.rdb_filename);
                break;
            }
         }

         /* Trigger an AOF rewrite if needed */
         if (server.rdb_child_pid == -1 &&
             server.aof_child_pid == -1 &&
             server.aof_rewrite_perc &&
             server.aof_current_size > server.aof_rewrite_min_size)
         {
            long long base = server.aof_rewrite_base_size ?
                            server.aof_rewrite_base_size : 1;
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) {
                redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
                rewriteAppendOnlyFileBackground();
            }
         }
    }


    /* AOF postponed flush: Try at every cron cycle if the slow fsync
     * completed. */
    /* 如果開啟了aof_flush_postponed_start,則在每次serverCron流程里都將server.aof_buf寫入磁盤文件。
     * PS, server.aof_buf是從上一次寫aof文件到目前為止所執行過的命令集合,所以是append only file
     */
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);

    /* AOF write errors: in this case we have a buffer to flush as well and
     * clear the AOF error in case of success to make the DB writable again,
     * however to try every second is enough in case of 'hz' is set to
     * an higher frequency. */
    /* 每一秒檢查一次上一輪aof的寫入是否發生了錯誤,如果有錯誤則嘗試重新寫一次 */
    run_with_period(1000) {
        if (server.aof_last_write_status == REDIS_ERR)
            flushAppendOnlyFile(0);
    }

    /* Close clients that need to be closed asynchronous */
    /* server.clients_to_close鏈表上的元素都是待關閉的連接 */
    freeClientsInAsyncFreeQueue();

    /* Clear the paused clients flag if needed. */
    /* clients被paused時,會相應地記錄一個超時的時間,如果那個時間已經到來,則給client打上REDIS_UNBLOCKED標記(slave的client不處理),並加到server.unblocked_clients上 */
    clientsArePaused(); /* Don't check return value, just use the side effect. */

    /* Replication cron function -- used to reconnect to master and
     * to detect transfer failures. */
    /* 每1秒執行一次replication */
    run_with_period(1000) replicationCron();

    /* Run the Redis Cluster cron. */
    /* 每100ms執行一次clusterCron */
    run_with_period(100) {
        if (server.cluster_enabled) clusterCron();
    }

    /* Run the Sentinel timer if we are in sentinel mode. */
    /* 每100ms執行一次sentine的定時器 */
    run_with_period(100) {
        if (server.sentinel_mode) sentinelTimer();
    }

    /* Cleanup expired MIGRATE cached sockets. */
    /* 每1秒清理一次server.migrate_cached_sockets鏈表上的超時sockets */
    run_with_period(1000) {
        migrateCloseTimedoutSockets();
    }

    /* serverCron執行次數 */
    server.cronloops++;
    /* 返回下一次執行serverCron的間隔 */
    return 1000/server.hz;
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM