主要看下redis是怎么使用多線程的
先說明下redis也是多線程的.但是redis的主線程處理業務.而其他三個線程跟主要功能是關系不到的
redis的三個線程主要是做什么
初始化入口
void initServer(void) {
...
bioInit();
...
}
初始化后redis其他后台線程.
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects
*
* 初始化 job 隊列,以及線程狀態
*/
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_condvar[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system
*
* 設置棧大小
*/
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of.
*
* 創建線程
*/
for (j = 0; j < REDIS_BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
redisLog(REDIS_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
初始化三類線程. 這三類線程被認為是后台執行.不影響主線程
- BIO_CLOSE_FILE . 關閉重寫之前的aof文件.
- BIO_AOF_FSYNC . 定時刷新數據到磁盤上.
- BIO_LAZY_FREE . 惰性刪除過期時間數據
redis為了保證其高效.一些比較耗時的動作會起線程或者進程來完成.不會阻塞在業務主線程上.
使用多線程的特點
- 創建3個線程.這個三個線程的功能互不影響
- 每個線程都有一個工作隊列.主線程生產任務放到任務隊里.這三個線程消費這些任務.
- 任務隊列和取出消費的時候都得加鎖.防止競爭
- 使用條件變量來等待任務.以及通知
// 存放工作的隊列
static list *bio_jobs[REDIS_BIO_NUM_OPS];
bio_jobs是一個雙端鏈表結構
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));
job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
// 將新工作推入隊列
listAddNodeTail(bio_jobs[type],job);
bio_pending[type]++;
pthread_cond_signal(&bio_condvar[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}
當有任務的時候.先把任務丟到redis工作隊列里.這里記得加鎖
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg;
sigset_t sigset;
/* Make the thread killable at any time, so that bioKillThreads()
* can work reliably. */
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
pthread_mutex_lock(&bio_mutex[type]);
/* Block SIGALRM so we are sure that only the main thread will
* receive the watchdog signal. */
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
redisLog(REDIS_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
while(1) {
listNode *ln;
/* The loop always starts with the lock hold. */
if (listLength(bio_jobs[type]) == 0) {
pthread_cond_wait(&bio_condvar[type],&bio_mutex[type]);
continue;
}
/* Pop the job from the queue.
*
* 取出(但不刪除)隊列中的首個任務
*/
ln = listFirst(bio_jobs[type]);
job = ln->value;
/* It is now possible to unlock the background system as we know have
* a stand alone job structure to process.*/
pthread_mutex_unlock(&bio_mutex[type]);
/* Process the job accordingly to its type. */
// 執行任務
if (type == REDIS_BIO_CLOSE_FILE) {
close((long)job->arg1);
} else if (type == REDIS_BIO_AOF_FSYNC) {
aof_fsync((long)job->arg1);
} else {
redisPanic("Wrong job type in bioProcessBackgroundJobs().");
}
zfree(job);
/* Lock again before reiterating the loop, if there are no longer
* jobs to process we'll block again in pthread_cond_wait(). */
pthread_mutex_lock(&bio_mutex[type]);
// 將執行完成的任務從隊列中刪除,並減少任務計數器
listDelNode(bio_jobs[type],ln);
bio_pending[type]--;
}
}
- 操作前先上鎖
- 從工作任務里取任務
- 解鎖
- 執行業務邏輯
- 執行完上鎖.重新pthread_cond_wait
條件變量
條件變量是利用線程間共享的全局變量進行同步的一種機制,主要包括兩個動作:
- 一個線程等待"條件變量的條件成立"而掛起;
- 另一個線程使"條件成立"(給出條件成立信號)。
為了防止競爭,條件變量的使用總是和一個互斥鎖結合在一起
pthread_cond_wait原理
就是說pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)函數傳入的參數mutex用於保護條件,因為我們在調用pthread_cond_wait時,如果條件不成立我們就進入阻塞,但是進入阻塞這個期間,如果條件變量改變了的話,那我們就漏掉了這個條件。因為這個線程還沒有放到等待隊列上,所以調用pthread_cond_wait前要先鎖互斥量,即調用pthread_mutex_lock()。
pthread_cond_wait在把線程放進阻塞隊列后,自動對mutex進行解鎖,使得其它線程可以獲得加鎖的權利。這樣其它線程才能對臨界資源進行訪問並在適當的時候喚醒這個阻塞的進程。當pthread_cond_wait返回的時候又自動給mutex加鎖