直接上圖:
memcached使用多線程模型,一個master線程,多個worker線程,master和worker通過管道實現通信。
每個worker線程有一個隊列,隊列元素為CQ_ITEM。
typedef struct { pthread_t thread_id; /* unique ID of this thread */ struct event_base *base; /* libevent handle this thread uses */ struct event notify_event; /* listen event for notify pipe */ int notify_receive_fd; /* receiving end of notify pipe */ int notify_send_fd; /* sending end of notify pipe */ struct thread_stats stats; /* Stats generated by this thread */ struct conn_queue *new_conn_queue; /* queue of new connections to handle */ cache_t *suffix_cache; /* suffix cache */ logger *l; /* logger buffer */ void *lru_bump_buf; /* async LRU bump buffer */ } LIBEVENT_THREAD; /* An item in the connection queue. */ typedef struct conn_queue_item CQ_ITEM; struct conn_queue_item { int sfd; enum conn_states init_state; int event_flags; int read_buffer_size; enum network_transport transport; conn *c; CQ_ITEM *next; }; /* A connection queue. */ typedef struct conn_queue CQ; struct conn_queue { CQ_ITEM *head; CQ_ITEM *tail; pthread_mutex_t lock; };
memcached使用libevent實現事件監聽,master和worker各有一個event_base。
起初,master負責監聽連接的到來,worker線程負責監聽管道的讀事件。
當有一個連接到來,master線程accept該連接,並將conn_fd封裝成一個CQ_ITEM對象放入一個worker線程的隊列中,同時向管道寫入數據觸發管道讀事件。
對應worker線程執行管道讀事件的回調函數thread_libevent_process:
/* * Processes an incoming "handle a new connection" item. This is called when * input arrives on the libevent wakeup pipe. */ static void thread_libevent_process(int fd, short which, void *arg) { LIBEVENT_THREAD *me = arg; CQ_ITEM *item; char buf[1]; unsigned int timeout_fd; if (read(fd, buf, 1) != 1) { if (settings.verbose > 0) fprintf(stderr, "Can't read from libevent pipe\n"); return; } switch (buf[0]) { case 'c': item = cq_pop(me->new_conn_queue); if (NULL != item) { conn *c = conn_new(item->sfd, item->init_state, item->event_flags, item->read_buffer_size, item->transport, me->base); if (c == NULL) { if (IS_UDP(item->transport)) { fprintf(stderr, "Can't listen for events on UDP socket\n"); exit(1); } else { if (settings.verbose > 0) { fprintf(stderr, "Can't listen for events on fd %d\n", item->sfd); } close(item->sfd); } } else { c->thread = me; } cqi_free(item); } break; case 'r': item = cq_pop(me->new_conn_queue); if (NULL != item) { conn_worker_readd(item->c); cqi_free(item); } break; /* we were told to pause and report in */ case 'p': register_thread_initialized(); break; /* a client socket timed out */ case 't': if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) { if (settings.verbose > 0) fprintf(stderr, "Can't read timeout fd from libevent pipe\n"); return; } conn_close_idle(conns[timeout_fd]); break; } }
在conn_new中,將conn_fd的讀事件添加進自己的event_base中。
至此,worker線程開始監聽連接上的I/O事件。
參考資料: