serverCron是redis里主要的定時處理函數,在initServer中通過調用aeCreateTimeEvent,將serverCron做為callback注冊到全局的eventLoop結構當中。它在主循環中的位置:
aeMain { while (!stop) { beforeSleep aeApiPoll process file events /* process time events */ for each timeEntry(te) in eventLoop { retval = te->timeProc() /* 這里面timeProc就是serverCron */ if (NOMORE == retval) Delete time entry from eventLoop else aeAddMillisecondsToNow(te, retval) /* te下一次觸發的時隔更新為retval */ } } }
看serverCron的實現之前先看這個run_with_period的定義:
#define run_with_period(_ms_) \
if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
由它的定義,run_with_period(_ms_)會在兩種情況下返回1:
1. _ms_ <= 1000/server.hz,就是說_ms_比serverCron的執行間隔要小。
2. 或者_ms_比serverCron的執行間隔要大並且serverCron執行的次數剛好是_ms_/(1000/server.hz)的整數倍。
server.hz的意義是serverCron在一秒內執行的次數(從redis的實現來看,這個值是以ms為最小單位來計算的),那么1000/server.hz就是serverCron的執行間隔(ms),再結合run_with_period的定義可以看出,run_with_period表示每_ms_毫秒執行一段任務。
舉個例子來說,server.hz是100,也就是servreCron的執行間隔是10ms(可能不完全精確,畢竟是單線程順序執行)。
假如有一些任務需要每500ms執行一次,就可以在serverCron中用run_with_period(500)把每500ms需要執行一次的工作控制起來。所以,serverCron每執行到第500/10次,run_with_period(500)就會返回1
serverCron的實現如下:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { int j; REDIS_NOTUSED(eventLoop); REDIS_NOTUSED(id); REDIS_NOTUSED(clientData); /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ /* 用SIGALRM信號觸發watchdog的處理過程,具體的函數為watchdogSignalHandler */ if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period); /* Update the time cache. */ /* 更新server.unixtime和server.mstime */ updateCachedTime(); /* 每100ms更新一次統計量,包括這段時間內的commands, net_input_bytes, net_output_bytes */ run_with_period(100) { trackInstantaneousMetric(REDIS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(REDIS_METRIC_NET_INPUT, server.stat_net_input_bytes); trackInstantaneousMetric(REDIS_METRIC_NET_OUTPUT, server.stat_net_output_bytes); } /* We have just REDIS_LRU_BITS bits per object for LRU information. * So we use an (eventually wrapping) LRU clock. * * Note that even if the counter wraps it's not a big problem, * everything will still work but some object will appear younger * to Redis. However for this to happen a given object should never be * touched for all the time needed to the counter to wrap, which is * not likely. * * Note that you can change the resolution altering the * REDIS_LRU_CLOCK_RESOLUTION define. */ /* 根據server.lruclock的定義,getLRUClock返回的是當前時間換算成秒數的低23位 */ server.lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ /* 記錄最大內存使用情況 */ if (zmalloc_used_memory() > server.stat_peak_memory) server.stat_peak_memory = zmalloc_used_memory(); /* Sample the RSS here since this is a relatively slow call. */ /* 記錄當前的RSS值 */ server.resident_set_size = zmalloc_get_rss(); /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ /* 如果收到了SIGTERM信號,嘗試退出 */ if (server.shutdown_asap) { if (prepareForShutdown(0) == REDIS_OK) exit(0); redisLog(REDIS_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); server.shutdown_asap = 0; } /* Show some info about non-empty databases */ /* 每5秒輸出一次非空databases的信息到log當中 */ run_with_period(5000) { for (j = 0; j < server.dbnum; j++) { long long size, used, vkeys; size = dictSlots(server.db[j].dict); used = dictSize(server.db[j].dict); vkeys = dictSize(server.db[j].expires); if (used || vkeys) { redisLog(REDIS_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(server.dict); */ } } } /* Show information about connected clients */ /* 如果不是sentinel模式,則每5秒輸出一個connected的client的信息到log */ if (!server.sentinel_mode) { run_with_period(5000) { redisLog(REDIS_VERBOSE, "%lu clients connected (%lu slaves), %zu bytes in use", listLength(server.clients)-listLength(server.slaves), listLength(server.slaves), zmalloc_used_memory()); } } /* We need to do a few operations on clients asynchronously. */ /* 清理空閑的客戶端或者釋放query buffer中未被使用的空間 */ clientsCron(); /* Handle background operations on Redis databases. */ /* databases的處理,rehash就在這里 */ databasesCron(); /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ /* 如果開啟了aof_rewrite的調度並且當前沒有在background執行rdb/aof的操作,則進行background的aof操作 */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled) { rewriteAppendOnlyFileBackground(); } /* Check if a background saving or AOF rewrite in progress terminated. */ if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) { /* 如果有aof或者rdb在后台進行,則等待對應的退出。注意,這里用了WNOHANG,所以不會阻塞在wait3 */ int statloc; pid_t pid; /* wait3返回非0值,要么是子進程退出,要么是出錯 */ if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) { int exitcode = WEXITSTATUS(statloc); int bysignal = 0; if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc); /* 如果是出錯,在log中記錄這次錯誤 * 如果是rdb任務退出,調用backgroundSaveDoneHandler進行收尾工作 * 如果是aof任務退出,調用backgroundRewriteDoneHandler進行收尾工作 */ if (pid == -1) { redisLog(LOG_WARNING,"wait3() returned an error: %s. " "rdb_child_pid = %d, aof_child_pid = %d", strerror(errno), (int) server.rdb_child_pid, (int) server.aof_child_pid); } else if (pid == server.rdb_child_pid) { backgroundSaveDoneHandler(exitcode,bysignal); } else if (pid == server.aof_child_pid) { backgroundRewriteDoneHandler(exitcode,bysignal); } else { redisLog(REDIS_WARNING, "Warning, detected child with unmatched pid: %ld", (long)pid); } /* 如果當前有rdb/aof任務在處理,則將dict_can_resize設置為0(表示不允許進行resize),否則,設置為1 */ updateDictResizePolicy(); } } else { /* 當前沒有rdb/aof任務在執行,這里來判斷是否要開啟新的rdb/aof任務 */ /* If there is not a background saving/rewrite in progress check if * we have to save/rewrite now */ for (j = 0; j < server.saveparamslen; j++) { struct saveparam *sp = server.saveparams+j; /* Save if we reached the given amount of changes, * the given amount of seconds, and if the latest bgsave was * successful or if, in case of an error, at least * REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */ if (server.dirty >= sp->changes && server.unixtime-server.lastsave > sp->seconds && (server.unixtime-server.lastbgsave_try > REDIS_BGSAVE_RETRY_DELAY || server.lastbgsave_status == REDIS_OK)) { redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...", sp->changes, (int)sp->seconds); rdbSaveBackground(server.rdb_filename); break; } } /* Trigger an AOF rewrite if needed */ if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc && server.aof_current_size > server.aof_rewrite_min_size) { long long base = server.aof_rewrite_base_size ? server.aof_rewrite_base_size : 1; long long growth = (server.aof_current_size*100/base) - 100; if (growth >= server.aof_rewrite_perc) { redisLog(REDIS_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); rewriteAppendOnlyFileBackground(); } } } /* AOF postponed flush: Try at every cron cycle if the slow fsync * completed. */ /* 如果開啟了aof_flush_postponed_start,則在每次serverCron流程里都將server.aof_buf寫入磁盤文件。 * PS, server.aof_buf是從上一次寫aof文件到目前為止所執行過的命令集合,所以是append only file */ if (server.aof_flush_postponed_start) flushAppendOnlyFile(0); /* AOF write errors: in this case we have a buffer to flush as well and * clear the AOF error in case of success to make the DB writable again, * however to try every second is enough in case of 'hz' is set to * an higher frequency. */ /* 每一秒檢查一次上一輪aof的寫入是否發生了錯誤,如果有錯誤則嘗試重新寫一次 */ run_with_period(1000) { if (server.aof_last_write_status == REDIS_ERR) flushAppendOnlyFile(0); } /* Close clients that need to be closed asynchronous */ /* server.clients_to_close鏈表上的元素都是待關閉的連接 */ freeClientsInAsyncFreeQueue(); /* Clear the paused clients flag if needed. */ /* clients被paused時,會相應地記錄一個超時的時間,如果那個時間已經到來,則給client打上REDIS_UNBLOCKED標記(slave的client不處理),並加到server.unblocked_clients上 */ clientsArePaused(); /* Don't check return value, just use the side effect. */ /* Replication cron function -- used to reconnect to master and * to detect transfer failures. */ /* 每1秒執行一次replication */ run_with_period(1000) replicationCron(); /* Run the Redis Cluster cron. */ /* 每100ms執行一次clusterCron */ run_with_period(100) { if (server.cluster_enabled) clusterCron(); } /* Run the Sentinel timer if we are in sentinel mode. */ /* 每100ms執行一次sentine的定時器 */ run_with_period(100) { if (server.sentinel_mode) sentinelTimer(); } /* Cleanup expired MIGRATE cached sockets. */ /* 每1秒清理一次server.migrate_cached_sockets鏈表上的超時sockets */ run_with_period(1000) { migrateCloseTimedoutSockets(); } /* serverCron執行次數 */ server.cronloops++; /* 返回下一次執行serverCron的間隔 */ return 1000/server.hz; }
