已經個把月沒有寫長篇博文了,最近抽了點時間,將memcached源碼分析系列文章的線程機制篇給整出來,在分析源碼的過程中參考了網上的一些資源。
該文主要集中於兩個問題:(1)memcached線程池是如何創建的,(2)線程池中的線程又是如何進行調度的。一切從源碼中找答案。
memcached的線程池模型采用較典型的Master-Worker模型:
(1)主線程負責監聽客戶端的建立連接請求,以及accept 連接,將連接好的套接字放入連接隊列;
(2)調度workers空閑線程來負責處理已經建立好的連接的讀寫等事件。
1 關鍵數據抽象
(1)memcached單個線程結構的封裝
1 //memcached線程結構的封裝結構 2 typedef struct { 3 pthread_t thread_id; /* unique ID of this thread */ 4 struct event_base *base; /* libevent handle this thread uses */ 5 struct event notify_event; /* listen event for notify pipe */ 6 int notify_receive_fd; /* receiving end of notify pipe */ 7 int notify_send_fd; /* sending end of notify pipe */ 8 struct thread_stats stats; /* Stats generated by this thread */ 9 struct conn_queue *new_conn_queue; /* queue of new connections to handle */ 10 cache_t *suffix_cache; /* suffix cache */ 11 } LIBEVENT_THREAD;
這是memcached里的線程結構的封裝,可以看到每個線程都包含一個CQ隊列,一條通知管道pipe ,一個libevent的實例event_base等。
(2)線程連接隊列
1 /* A connection queue. */ 2 typedef struct conn_queue CQ; 3 struct conn_queue { 4 CQ_ITEM *head; 5 CQ_ITEM *tail; 6 pthread_mutex_t lock; 7 pthread_cond_t cond; 8 };
每個線程結構體中都指向一個CQ鏈表,CQ鏈表管理CQ_ITEM的單向鏈表。
(3)連接項結構體
1 /* An item in the connection queue. */ 2 typedef struct conn_queue_item CQ_ITEM; 3 struct conn_queue_item { 4 int sfd; 5 enum conn_states init_state; 6 int event_flags; 7 int read_buffer_size; 8 enum network_transport transport; 9 CQ_ITEM *next; 10 };
CQ_ITEM實際上是主線程accept后返回的已建立連接的fd的封裝,由主線程創建初始化並放入連接鏈表CQ中,共workers線程使用。
(4)網絡連接的封裝結構體
1 /** 2 * The structure representing a connection into memcached. 3 */ 4 //memcached表示一個conn的抽象結構 5 typedef struct conn conn; 6 struct conn { 7 .................. 8 };
由於這個結構太大,就略去中間的成員不展示了,與我們線程池相關的有一個成員則非常關鍵,那就是state,它是memcached中狀態機驅動的關鍵(由drive_machine函數實現)。
2 線程池的初始化:
main()中線程池初始化函數入口為:
/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);
函數的定義在thread.c實現,源碼如下所示:
1 /* 2 * Initializes the thread subsystem, creating various worker threads. 3 * 4 * nthreads Number of worker event handler threads to spawn 5 * main_base Event base for main thread 6 */ 7 void thread_init(int nthreads, struct event_base *main_base) { 8 int i; 9 10 pthread_mutex_init(&cache_lock, NULL); 11 pthread_mutex_init(&stats_lock, NULL); 12 13 pthread_mutex_init(&init_lock, NULL); 14 pthread_cond_init(&init_cond, NULL); 15 16 pthread_mutex_init(&cqi_freelist_lock, NULL); 17 cqi_freelist = NULL; 18 19 //分配線程池結構數組 20 threads = calloc(nthreads, sizeof(LIBEVENT_THREAD)); 21 if (! threads) { 22 perror("Can't allocate thread descriptors"); 23 exit(1); 24 } 25 26 dispatcher_thread.base = main_base; 27 dispatcher_thread.thread_id = pthread_self(); 28 29 //為線程池每個線程創建讀寫管道 30 for (i = 0; i < nthreads; i++) { 31 int fds[2]; 32 if (pipe(fds)) { 33 perror("Can't create notify pipe"); 34 exit(1); 35 } 36 37 threads[i].notify_receive_fd = fds[0]; 38 threads[i].notify_send_fd = fds[1]; 39 40 //填充線程結構體信息 41 setup_thread(&threads[i]); 42 } 43 44 /* Create threads after we've done all the libevent setup. */ 45 for (i = 0; i < nthreads; i++) { 46 //為線程池創建數目為nthreads的線程,worker_libevent為線程的回調函數, 47 create_worker(worker_libevent, &threads[i]); 48 } 49 50 /* Wait for all the threads to set themselves up before returning. */ 51 pthread_mutex_lock(&init_lock); 52 while (init_count < nthreads) { 53 pthread_cond_wait(&init_cond, &init_lock); 54 } 55 pthread_mutex_unlock(&init_lock); 56 }
線程池初始化函數由主線程進行調用,該函數先初始化各互斥鎖,然后使用calloc分配nthreads*sizeof(LIBEVENT_THREAD)個字節的內存塊來管理線程池,返回一個全局static變量 threads(類型為LIBEVENT_THREAD *);然后為每個線程創建一個匿名管道(該pipe將在線程的調度中發揮作用),接下來的setup_thread函數為線程設置事件監聽,綁定CQ鏈表等初始化信息,源碼如下所示:
1 /* 2 * Set up a thread's information. 3 */ 4 static void setup_thread(LIBEVENT_THREAD *me) { 5 me->base = event_init(); 6 if (! me->base) { 7 fprintf(stderr, "Can't allocate event base\n"); 8 exit(1); 9 } 10 11 /* Listen for notifications from other threads */ 12 //為管道設置讀事件監聽,thread_libevent_process為回調函數 13 event_set(&me->notify_event, me->notify_receive_fd, 14 EV_READ | EV_PERSIST, thread_libevent_process, me); 15 event_base_set(me->base, &me->notify_event); 16 17 if (event_add(&me->notify_event, 0) == -1) { 18 fprintf(stderr, "Can't monitor libevent notify pipe\n"); 19 exit(1); 20 } 21 22 //為新線程創建連接CQ鏈表 23 me->new_conn_queue = malloc(sizeof(struct conn_queue)); 24 if (me->new_conn_queue == NULL) { 25 perror("Failed to allocate memory for connection queue"); 26 exit(EXIT_FAILURE); 27 } 28 //初始化線程控制器內的CQ鏈表 29 cq_init(me->new_conn_queue); 30 31 if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) { 32 perror("Failed to initialize mutex"); 33 exit(EXIT_FAILURE); 34 } 35 //創建cache 36 me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*), 37 NULL, NULL); 38 if (me->suffix_cache == NULL) { 39 fprintf(stderr, "Failed to create suffix cache\n"); 40 exit(EXIT_FAILURE); 41 } 42 }
memcached使用libevent實現事件循環,關於libevent,不熟悉的讀者可以查看相關資料,這里不做介紹,源碼中的這句代碼:
event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);
在me->notify_receive_fd(即匿名管道的讀端)設置可讀事件,回調函數 為thread_libevent_process,函數定義如下:
1 static void thread_libevent_process(int fd, short which, void *arg) { 2 LIBEVENT_THREAD *me = arg; 3 CQ_ITEM *item; 4 char buf[1]; 5 6 //響應pipe可讀事件,讀取主線程向管道內寫的1字節數據(見dispatch_conn_new()函數) 7 if (read(fd, buf, 1) != 1) 8 if (settings.verbose > 0) 9 fprintf(stderr, "Can't read from libevent pipe\n"); 10 11 //從鏈接隊列中取出一個conn 12 item = cq_pop(me->new_conn_queue); 13 14 if (NULL != item) { 15 //使用conn創建新的任務 16 conn *c = conn_new(item->sfd, item->init_state, item->event_flags, 17 item->read_buffer_size, item->transport, me->base); 18 if (c == NULL) { 19 if (IS_UDP(item->transport)) { 20 fprintf(stderr, "Can't listen for events on UDP socket\n"); 21 exit(1); 22 } else { 23 if (settings.verbose > 0) { 24 fprintf(stderr, "Can't listen for events on fd %d\n", 25 item->sfd); 26 } 27 close(item->sfd); 28 } 29 } else { 30 c->thread = me; 31 } 32 cqi_free(item); 33 } 34 }
使用setup_thread設置線程結構體的初始化信息之后,現在我們回到thread_init函數,thread_init中接着循環調用(循環調用nthreads次)create_worker(worker_libevent, &threads[i]); 創建真正運行的線程,create_worker是對pthread_create()簡單的封裝,參數worker_libevent作為每個線程的運行體,&threads[i]為傳入參數。
worker_libevent為線程體,源碼如下:
1 /* 2 * Worker thread: main event loop 3 */ 4 static void *worker_libevent(void *arg) { 5 LIBEVENT_THREAD *me = arg; 6 7 /* Any per-thread setup can happen here; thread_init() will block until 8 * all threads have finished initializing. 9 */ 10 pthread_mutex_lock(&init_lock); 11 init_count++; //每創建新線程,將全局init_count加1 12 pthread_cond_signal(&init_cond); // 發送init_cond信號 13 pthread_mutex_unlock(&init_lock); 14 15 //新創建線程阻塞於此,等待事件 16 event_base_loop(me->base, 0); //Libevent的事件主循環 17 return NULL; 18 }
worker_libevent中給init_count加1的目的在thread_init函數的這段代碼可以看出來,
1 /* Wait for all the threads to set themselves up before returning. */ 2 pthread_mutex_lock(&init_lock); 3 while (init_count < nthreads) { 4 pthread_cond_wait(&init_cond, &init_lock); 5 } 6 pthread_mutex_unlock(&init_lock);
即主線程阻塞如此,等待worker_libevent發出的init_cond信號,喚醒后檢查init_count < nthreads是否為假(即創建的線程數目是否達到要求),否則繼續等待。
至此,線程池創建的代碼已分析完畢,由於篇幅較長,將分析線程池中線程的調度流程另立一篇。
