memcached源碼分析之線程池機制(一)


已經個把月沒有寫長篇博文了,最近抽了點時間,將memcached源碼分析系列文章的線程機制篇給整出來,在分析源碼的過程中參考了網上的一些資源。

該文主要集中於兩個問題:(1)memcached線程池是如何創建的,(2)線程池中的線程又是如何進行調度的。一切從源碼中找答案。

memcached的線程池模型采用較典型的Master-Worker模型:

(1)主線程負責監聽客戶端的建立連接請求,以及accept 連接,將連接好的套接字放入連接隊列;

(2)調度workers空閑線程來負責處理已經建立好的連接的讀寫等事件。

1 關鍵數據抽象

(1)memcached單個線程結構的封裝

 1 //memcached線程結構的封裝結構
 2 typedef struct {
 3     pthread_t thread_id;        /* unique ID of this thread */
 4     struct event_base *base;    /* libevent handle this thread uses */
 5     struct event notify_event;  /* listen event for notify pipe */
 6     int notify_receive_fd;      /* receiving end of notify pipe */
 7     int notify_send_fd;         /* sending end of notify pipe */
 8     struct thread_stats stats;  /* Stats generated by this thread */
 9     struct conn_queue *new_conn_queue; /* queue of new connections to handle */
10     cache_t *suffix_cache;      /* suffix cache */
11 } LIBEVENT_THREAD;

這是memcached里的線程結構的封裝,可以看到每個線程都包含一個CQ隊列,一條通知管道pipe ,% m) z( Q4 O1 P+ d6 一個libevent的實例event_base等。

(2)線程連接隊列

1 /* A connection queue. */
2 typedef struct conn_queue CQ;
3 struct conn_queue {
4     CQ_ITEM *head;
5     CQ_ITEM *tail;
6     pthread_mutex_t lock;
7     pthread_cond_t  cond;
8 };

每個線程結構體中都指向一個CQ鏈表,CQ鏈表管理CQ_ITEM的單向鏈表。

(3)連接項結構體

 1 /* An item in the connection queue. */
 2 typedef struct conn_queue_item CQ_ITEM;
 3 struct conn_queue_item {
 4     int               sfd;
 5     enum conn_states  init_state;
 6     int               event_flags;
 7     int               read_buffer_size;
 8     enum network_transport     transport;
 9     CQ_ITEM          *next;
10 };

CQ_ITEM實際上是主線程accept后返回的已建立連接的fd的封裝,由主線程創建初始化並放入連接鏈表CQ中,共workers線程使用。

(4)網絡連接的封裝結構體

1 /**
2  * The structure representing a connection into memcached.
3  */
4  //memcached表示一個conn的抽象結構
5 typedef struct conn conn;
6 struct conn {
7 ..................   
8 };

由於這個結構太大,就略去中間的成員不展示了,與我們線程池相關的有一個成員則非常關鍵,那就是state,它是memcached中狀態機驅動的關鍵(由drive_machine函數實現)。

2 線程池的初始化:

main()中線程池初始化函數入口為:

/* start up worker threads if MT mode */

thread_init(settings.num_threads, main_base);

函數的定義在thread.c實現,源碼如下所示:

 1 /*
 2  * Initializes the thread subsystem, creating various worker threads.
 3  *
 4  * nthreads  Number of worker event handler threads to spawn
 5  * main_base Event base for main thread
 6  */
 7 void thread_init(int nthreads, struct event_base *main_base) {
 8     int         i;
 9 
10     pthread_mutex_init(&cache_lock, NULL);
11     pthread_mutex_init(&stats_lock, NULL);
12 
13     pthread_mutex_init(&init_lock, NULL);
14     pthread_cond_init(&init_cond, NULL);
15 
16     pthread_mutex_init(&cqi_freelist_lock, NULL);
17     cqi_freelist = NULL;
18 
19     //分配線程池結構數組
20     threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
21     if (! threads) {
22         perror("Can't allocate thread descriptors");
23         exit(1);
24     }
25 
26     dispatcher_thread.base = main_base;
27     dispatcher_thread.thread_id = pthread_self();
28 
29     //為線程池每個線程創建讀寫管道
30     for (i = 0; i < nthreads; i++) {
31         int fds[2];
32         if (pipe(fds)) {
33             perror("Can't create notify pipe");
34             exit(1);
35         }
36 
37         threads[i].notify_receive_fd = fds[0];
38         threads[i].notify_send_fd = fds[1];
39 
40         //填充線程結構體信息
41         setup_thread(&threads[i]);
42     }
43 
44     /* Create threads after we've done all the libevent setup. */
45     for (i = 0; i < nthreads; i++) {
46         //為線程池創建數目為nthreads的線程,worker_libevent為線程的回調函數,
47         create_worker(worker_libevent, &threads[i]);
48     }
49 
50     /* Wait for all the threads to set themselves up before returning. */
51     pthread_mutex_lock(&init_lock);
52     while (init_count < nthreads) {
53         pthread_cond_wait(&init_cond, &init_lock);
54     }
55     pthread_mutex_unlock(&init_lock);
56 }

線程池初始化函數由主線程進行調用,該函數先初始化各互斥鎖,然后使用calloc分配nthreads*sizeof(LIBEVENT_THREAD)個字節的內存塊來管理線程池,返回一個全局static變量 threads(類型為LIBEVENT_THREAD *);然后為每個線程創建一個匿名管道(該pipe將在線程的調度中發揮作用),接下來的setup_thread函數為線程設置事件監聽,綁定CQ鏈表等初始化信息,源碼如下所示:

 1 /*
 2  * Set up a thread's information.
 3  */
 4 static void setup_thread(LIBEVENT_THREAD *me) {
 5     me->base = event_init();
 6     if (! me->base) {
 7         fprintf(stderr, "Can't allocate event base\n");
 8         exit(1);
 9     }
10 
11     /* Listen for notifications from other threads */
12     //為管道設置讀事件監聽,thread_libevent_process為回調函數
13     event_set(&me->notify_event, me->notify_receive_fd,
14               EV_READ | EV_PERSIST, thread_libevent_process, me);
15     event_base_set(me->base, &me->notify_event);
16 
17     if (event_add(&me->notify_event, 0) == -1) {
18         fprintf(stderr, "Can't monitor libevent notify pipe\n");
19         exit(1);
20     }
21 
22     //為新線程創建連接CQ鏈表
23     me->new_conn_queue = malloc(sizeof(struct conn_queue));
24     if (me->new_conn_queue == NULL) {
25         perror("Failed to allocate memory for connection queue");
26         exit(EXIT_FAILURE);
27     }
28     //初始化線程控制器內的CQ鏈表
29     cq_init(me->new_conn_queue);
30 
31     if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {
32         perror("Failed to initialize mutex");
33         exit(EXIT_FAILURE);
34     }
35     //創建cache
36     me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),
37                                     NULL, NULL);
38     if (me->suffix_cache == NULL) {
39         fprintf(stderr, "Failed to create suffix cache\n");
40         exit(EXIT_FAILURE);
41     }
42 }

memcached使用libevent實現事件循環,關於libevent,不熟悉的讀者可以查看相關資料,這里不做介紹,源碼中的這句代碼:

 event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);

在me->notify_receive_fd(即匿名管道的讀端)設置可讀事件,回調函數 為thread_libevent_process,函數定義如下:

 1 static void thread_libevent_process(int fd, short which, void *arg) {
 2     LIBEVENT_THREAD *me = arg;
 3     CQ_ITEM *item;
 4     char buf[1];
 5 
 6     //響應pipe可讀事件,讀取主線程向管道內寫的1字節數據(見dispatch_conn_new()函數)
 7     if (read(fd, buf, 1) != 1)
 8         if (settings.verbose > 0)
 9             fprintf(stderr, "Can't read from libevent pipe\n");
10 
11     //從鏈接隊列中取出一個conn
12     item = cq_pop(me->new_conn_queue);
13 
14     if (NULL != item) {
15         //使用conn創建新的任務
16         conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
17                            item->read_buffer_size, item->transport, me->base);
18         if (c == NULL) {
19             if (IS_UDP(item->transport)) {
20                 fprintf(stderr, "Can't listen for events on UDP socket\n");
21                 exit(1);
22             } else {
23                 if (settings.verbose > 0) {
24                     fprintf(stderr, "Can't listen for events on fd %d\n",
25                         item->sfd);
26                 }
27                 close(item->sfd);
28             }
29         } else {
30             c->thread = me;
31         }
32         cqi_free(item);
33     }
34 }

      使用setup_thread設置線程結構體的初始化信息之后,現在我們回到thread_init函數,thread_init中接着循環調用(循環調用nthreads次)create_worker(worker_libevent, &threads[i]); 創建真正運行的線程,create_worker是對pthread_create()簡單的封裝,參數worker_libevent作為每個線程的運行體,&threads[i]為傳入參數。

worker_libevent為線程體,源碼如下:

 1 /*
 2  * Worker thread: main event loop
 3  */
 4 static void *worker_libevent(void *arg) {
 5     LIBEVENT_THREAD *me = arg;
 6 
 7     /* Any per-thread setup can happen here; thread_init() will block until
 8      * all threads have finished initializing.
 9      */
10     pthread_mutex_lock(&init_lock);
11     init_count++;     //每創建新線程,將全局init_count加1
12     pthread_cond_signal(&init_cond);  // 發送init_cond信號
13     pthread_mutex_unlock(&init_lock);
14 
15     //新創建線程阻塞於此,等待事件
16     event_base_loop(me->base, 0); //Libevent的事件主循環
17     return NULL;
18 }

worker_libevent中給init_count加1的目的在thread_init函數的這段代碼可以看出來,

1  /* Wait for all the threads to set themselves up before returning. */
2     pthread_mutex_lock(&init_lock);
3     while (init_count < nthreads) {
4         pthread_cond_wait(&init_cond, &init_lock);
5     }
6     pthread_mutex_unlock(&init_lock);

即主線程阻塞如此,等待worker_libevent發出的init_cond信號,喚醒后檢查init_count < nthreads是否為假(即創建的線程數目是否達到要求),否則繼續等待。

至此,線程池創建的代碼已分析完畢,由於篇幅較長,將分析線程池中線程的調度流程另立一篇。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM