memcached源碼分析之線程池機制(二)


    在上一篇中已分析了memcached線程池的創建流程,由於上篇篇幅較長,因此將memcached線程池中線程的調度流程另立一篇。

先讓我們把目光轉到主函數中,主線程在調用thread_init函數創建好線程池后,就開始創建監聽套接字,memcached支持TCP,UDP,UNIX域套接字,因此相應的要創建三種監聽套接字

這里我們只分析TCP listening socket的創建(UDP與TCP的創建采用統一的接口),函數入口為:

1  errno = 0;
2         if (settings.port && server_sockets(settings.port, tcp_transport,
3                                            portnumber_file)) {
4             vperror("failed to listen on TCP port %d", settings.port);
5             exit(EX_OSERR);
6         }

server_sockets函數即為創建TCP listening socket的入口函數。在server_sockets主要調用server_socket函數來實現,

 1 /**
 2  * Create a socket and bind it to a specific port number
 3  * @param interface the interface to bind to
 4  * @param port the port number to bind to
 5  * @param transport the transport protocol (TCP / UDP)
 6  * @param portnumber_file A filepointer to write the port numbers to
 7  *        when they are successfully added to the list of ports we
 8  *        listen on.
 9  */
10 static int server_socket(const char *interface,
11                          int port,
12                          enum network_transport transport,
13                          FILE *portnumber_file);

server_socket函數實現源碼較長,以下只列出部分:

 1 static int server_socket(const char *interface,
 2                          int port,
 3                          enum network_transport transport,
 4                          FILE *portnumber_file) {
 5     int sfd;
 6     struct linger ling = {0, 0};
 7     struct addrinfo *ai;
 8     struct addrinfo *next;
 9     struct addrinfo hints = { .ai_flags = AI_PASSIVE,
10                               .ai_family = AF_UNSPEC };
11     //套接字的創建過程
12     ..................
13     ..................
14         if (IS_UDP(transport)) {
15             int c;
16 
17             for (c = 0; c < settings.num_threads_per_udp; c++) {
18                 /* this is guaranteed to hit all threads because we round-robin */
19                 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
20                                   UDP_READ_BUFFER_SIZE, transport);
21             }
22         } else {
23             if (!(listen_conn_add = conn_new(sfd, conn_listening,
24                                              EV_READ | EV_PERSIST, 1,
25                                              transport, main_base))) {
26                 fprintf(stderr, "failed to create listening connection\n");
27                 exit(EXIT_FAILURE);
28             }
29             listen_conn_add->next = listen_conn;
30             listen_conn = listen_conn_add;
31         }
32     }
33 
34     freeaddrinfo(ai);
35 
36     /* Return zero iff we detected no errors in starting up connections */
37     return success == 0;
38 }

在server_socket中,我們只關注兩個函數:

(1)dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,UDP_READ_BUFFER_SIZE, transport);

當創建的是UDP套接字時,使用這個函數,由於UDP是無連接的,因此直接啟動settings.num_threads_per_udp個線程來服務於UDP端口。

(2)listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base);

當創建的是TCP套接字時,調用conn_new函數,源碼如下:

 1 conn *conn_new(const int sfd, enum conn_states init_state,
 2                 const int event_flags,
 3                 const int read_buffer_size, enum network_transport transport,
 4                 struct event_base *base) {
 5     conn *c = conn_from_freelist();
 6 
 7     ........................//略去部分源碼
 8 
 9     event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
10     event_base_set(base, &c->event);
11     c->ev_flags = event_flags;
12 
13     if (event_add(&c->event, 0) == -1) {
14         if (conn_add_to_freelist(c)) {
15             conn_free(c);
16         }
17         perror("event_add");
18         return NULL;
19     }
20 
21     STATS_LOCK();
22     stats.curr_conns++;
23     stats.total_conns++;
24     STATS_UNLOCK();
25 
26     MEMCACHED_CONN_ALLOCATE(c->sfd);
27 
28     return c;
29 }

該函數對套接字設置conn_listening監聽事件,回調函數為event_handler,在事件響應函數中調用狀態機。

 1 void event_handler(const int fd, const short which, void *arg) {
 2     conn *c;
 3 
 4     c = (conn *)arg;
 5     assert(c != NULL);
 6 
 7     c->which = which;
 8 
 9     /* sanity */
10     if (fd != c->sfd) {
11         if (settings.verbose > 0)
12             fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
13         conn_close(c);
14         return;
15     }
16 
17     drive_machine(c);
18 
19     /* wait for next event */
20     return;
21 }

memcached中的狀態機是memcached運轉發動機,它根據鏈接的不同狀態而采取不同的行為,狀態枚舉如下:

 1 enum conn_states {
 2     conn_listening,  /**< the socket which listens for connections */
 3     conn_new_cmd,    /**< Prepare connection for next command */
 4     conn_waiting,    /**< waiting for a readable socket */
 5     conn_read,       /**< reading in a command line */
 6     conn_parse_cmd,  /**< try to parse a command from the input buffer */
 7     conn_write,      /**< writing out a simple response */
 8     conn_nread,      /**< reading in a fixed number of bytes */
 9     conn_swallow,    /**< swallowing unnecessary bytes w/o storing */
10     conn_closing,    /**< closing this connection */
11     conn_mwrite,     /**< writing out many items sequentially */
12     conn_max_state   /**< Max state value (used for assertion) */
13 };

 狀態機的實現函數為drive_machine,由於該函數的源碼實現過長,這里只分析對conn_listening狀態的響應。

 1 static void drive_machine(conn *c) {
 2     bool stop = false;
 3     int sfd, flags = 1;
 4     socklen_t addrlen;
 5     struct sockaddr_storage addr;
 6     int nreqs = settings.reqs_per_event;
 7     int res;
 8 
 9     assert(c != NULL);
10 
11     while (!stop) {
12 
13         switch(c->state) {
14             //監聽套接字發生事件
15         case conn_listening:
16             addrlen = sizeof(addr);
17             if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {
18                 if (errno == EAGAIN || errno == EWOULDBLOCK) {
19                     /* these are transient, so don't log anything */
20                     stop = true;
21                 } else if (errno == EMFILE) {
22                     if (settings.verbose > 0)
23                         fprintf(stderr, "Too many open connections\n");
24                     accept_new_conns(false);
25                     stop = true;
26                 } else {
27                     perror("accept()");
28                     stop = true;
29                 }
30                 break;
31             }
32             //新套接字設置非阻塞
33             if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
34                 fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
35                 perror("setting O_NONBLOCK");
36                 close(sfd);
37                 break;
38             }
39             //調度線程來處理連接
40             dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
41                                      DATA_BUFFER_SIZE, tcp_transport);
42             stop = true;
43             break;
44 ...........................//略去其他狀態的處理
45 }

從源碼中我們可以看到,當監聽套接字建立新連接時,通過事件響應函數event_handler來觸發狀態機,再調用dispatch_conn_new調度新線程來處理這個連接的讀寫事件。

 1 /*
 2  * Dispatches a new connection to another thread. This is only ever called
 3  * from the main thread, either during initialization (for UDP) or because
 4  * of an incoming connection.
 5  */
 6 void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,
 7                        int read_buffer_size, enum network_transport transport) {
 8     CQ_ITEM *item = cqi_new();
 9     int tid = (last_thread + 1) % settings.num_threads;
10 
11     //以此種方式來取出線程
12     LIBEVENT_THREAD *thread = threads + tid;
13 
14     last_thread = tid;
15 
16     item->sfd = sfd;
17     item->init_state = init_state;
18     item->event_flags = event_flags;
19     item->read_buffer_size = read_buffer_size;
20     item->transport = transport;
21 
22     //將新item放至threads的new_conn_queue隊列中
23     cq_push(thread->new_conn_queue, item);
24 
25     MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);
26     //寫一個字節啟動新的線程
27     if (write(thread->notify_send_fd, "", 1) != 1) {
28         perror("Writing to thread notify pipe");
29     }
30 }

至此,memcached的線程池調度機制已分析完畢了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM