記一個多線程使用libevent的問題


前段時間使用libevent網絡庫實現了一個游戲服務器引擎,在此記錄下其中遇到的一個問題。

我在設計服務器上選擇把邏輯和網絡分線程,線程之間通信使用隊列。但是這樣做會有個問題:

當邏輯線程想要主動的發一個數據包的時候,網絡線程此時可能還阻塞在等待網絡IO的系統調用上(比如說epoll)。如果不做特殊處理的話,此時消息包就會一直積壓在緩沖區中,直到下一次網絡線程從掛起的系統調用返回(比如來了一個消息包)。因此,當邏輯線程發送消息包的時候(bufferevent_write)需要一種喚醒機制,讓網絡線程從epoll等系統調用中返回並處理發送消息包邏輯。

 

由於對libevent的api不熟悉,起初我是自己實現這個功能的。實現確實是不復雜,但是缺違背了我的初心:只寫簡單必要的代碼,保證盡可能少的bug。直到后來和同事探討(爭論:P)了一番,才發現原來libevent是有對此做支持的,但是具體怎么做,文檔里面沒有詳細的說,因此同事也說不出個所以然。鑒於此情況,我決定,把libevent中與此相關的源碼粗略的過一遍,以求能弄明白以下兩件事:

(1)與跨線程喚醒事件等待相關的api有哪些,以及如何使用?

(2)這些api背后到底做了哪些工作?

 

相關API,及用法

和我起初想得不一樣,libevent相關的api很簡單並且只有一個:

/*call event_use_pthreads() if use posix threads.*/ 
evthread_use_windows_threads();
struct event_base* ev_base = event_base_new();

  

需要注意的是函數evthread_use_windows_threads的調用必須在初始化event_base之前。在此之后,無需再做別的事情,邏輯線程在執行bufferevent_write的時候,libevent就會自動喚醒網絡線程的事件循環,並執行發送數據。

 

隱藏在API背后的邏輯

先看看evthread_use_windows_threads函數做了什么?

int evthread_use_windows_threads(void) {
  ...  
  evthread_set_lock_callbacks(&cbs);
  evthread_set_id_callback(evthread_win32_get_id);
  evthread_set_condition_callbacks(&cond_cbs);
  return 0;            
}

通過調用evthread_use_windows_threads,我們設置了一些回調函數,包括設置了libevent獲取線程id的回調函數evthread_id_fn_

 

看看初始化事件循環的函數event_base_new做了什么:

// event.c
struct event_base * 
event_base_new(void) {
    ...
    base = event_base_new_with_config(cfg);
}

struct event_base * 
event_base_new_with_config(const struct event_config *cfg) {
    ...
#ifndef EVENT__DISABLE_THREAD_SUPPORT
	if (EVTHREAD_LOCKING_ENABLED() &&
	    (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
		int r;
		EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0);
		EVTHREAD_ALLOC_COND(base->current_event_cond);
		r = evthread_make_base_notifiable(base);
		if (r<0) {
			event_warnx("%s: Unable to make base notifiable.", __func__);
			event_base_free(base);
			return NULL;
		}
	}
#endif
    ...
}

int
evthread_make_base_notifiable(struct event_base *base) {
	int r;
	if (!base)
		return -1;

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	r = evthread_make_base_notifiable_nolock_(base);
	EVBASE_RELEASE_LOCK(base, th_base_lock);
	return r;
}

static int
evthread_make_base_notifiable_nolock_(struct event_base *base) {
    ...
    if (evutil_make_internal_pipe_(base->th_notify_fd) == 0) {
	notify = evthread_notify_base_default;
	cb = evthread_notify_drain_default;
    } else {
	return -1;
    }

    base->th_notify_fn = notify;
}

  

它通過如下調用:

event_base_new-->event_base_new_with_config-->evthread_make_base_notifiable-->evthread_make_base_notifiable_nolock_

最后通過evutil_make_internal_pipe_函數創建了兩個互相連接的socket(windows環境下,用此來模擬pipe):

/* Internal function: Set fd[0] and fd[1] to a pair of fds such that writes on
 * fd[1] get read from fd[0].  Make both fds nonblocking and close-on-exec.
 * Return 0 on success, -1 on failure.
 */
int
evutil_make_internal_pipe_(evutil_socket_t fd[2])
{
	/*
	  Making the second socket nonblocking is a bit subtle, given that we
	  ignore any EAGAIN returns when writing to it, and you don't usally
	  do that for a nonblocking socket. But if the kernel gives us EAGAIN,
	  then there's no need to add any more data to the buffer, since
	  the main thread is already either about to wake up and drain it,
	  or woken up and in the process of draining it.
	*/

#if defined(EVENT__HAVE_PIPE2)
	if (pipe2(fd, O_NONBLOCK|O_CLOEXEC) == 0)
		return 0;
#endif
#if defined(EVENT__HAVE_PIPE)
	if (pipe(fd) == 0) {
		if (evutil_fast_socket_nonblocking(fd[0]) < 0 ||
		    evutil_fast_socket_nonblocking(fd[1]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[0]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[1]) < 0) {
			close(fd[0]);
			close(fd[1]);
			fd[0] = fd[1] = -1;
			return -1;
		}
		return 0;
	} else {
		event_warn("%s: pipe", __func__);
	}
#endif

#ifdef _WIN32
#define LOCAL_SOCKETPAIR_AF AF_INET
#else
#define LOCAL_SOCKETPAIR_AF AF_UNIX
#endif
	if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, fd) == 0) {
		if (evutil_fast_socket_nonblocking(fd[0]) < 0 ||
		    evutil_fast_socket_nonblocking(fd[1]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[0]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[1]) < 0) {
			evutil_closesocket(fd[0]);
			evutil_closesocket(fd[1]);
			fd[0] = fd[1] = -1;
			return -1;
		}
		return 0;
	}
	fd[0] = fd[1] = -1;
	return -1;
}

 

之后,再設置用於喚醒操作的notify函數(evthread_notify_base_default):

 

static int
evthread_notify_base_default(struct event_base *base) { 
  char buf[1]; 
  int r; 
  buf[0] = (char) 0; 
#ifdef _WIN32 
  r = send(base->th_notify_fd[1], buf, 1, 0); 
#else 
  r = write(base->th_notify_fd[1], buf, 1); 
#endif 
  return (r < 0 && ! EVUTIL_ERR_IS_EAGAIN(errno)) ? -1 : 0; 
}

  

可以看出,在windows下libevent的喚醒機制實際也是self pipe trick,只不過它通過構造一對socket來模擬pipe,當需要喚醒的時候,它就往其中一個socket寫入1個字節

 

再去看看bufferevent_write:

// bufferevent.c
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) {
	if (evbuffer_add(bufev->output, data, size) == -1)
		return (-1);

	return 0;
}

// buffer.c
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) {
    ...
    evbuffer_invoke_callbacks_(buf);
}

它會觸發一系列列回調函數,而這些回調函數在創建bufferevent的時候被指定:

//bufferevent_sock.c
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) {
    ...
    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
}

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) {
    ...
    if (cbinfo->n_added &&
	    (bufev->enabled & EV_WRITE) &&
	    !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
	    !bufev_p->write_suspended) {
		/* Somebody added data to the buffer, and we would like to
		 * write, and we were not writing.  So, start writing. */
		if (bufferevent_add_event_(&bufev->ev_write, &bufev->timeout_write) == -1) {
		    /* Should we log this? */
		}
	}
}

//bufferevent.c
int
bufferevent_add_event_(struct event *ev, const struct timeval *tv) {
	if (!evutil_timerisset(tv))
		return event_add(ev, NULL);
	else
		return event_add(ev, tv);
}

//event.c
int
event_add(struct event *ev, const struct timeval *tv) {
	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
	res = event_add_nolock_(ev, tv, 0);
	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}

int
event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) {
    ...
    /* if we are not in the right thread, we need to wake up the loop */
    //如果在構造event_base之前調用了evthread_use_windows_threads,EVBASE_NEED_NOTIFY此時會返回true,否則為false。
    if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
        evthread_notify_base(base);
}

由代碼可知,在往bufferevent寫數據后執行的回調函數中,就有喚醒網絡線程邏輯(evthread_notify_base)。那為什么還需要手動調用evthread_use_windows_threads函數呢?

這里再說一下:

#define EVBASE_NEED_NOTIFY(base)			 \
	((base)->running_loop &&			 \
	    ((base)->th_owner_id != evthreadimpl_get_id_()))

unsigned long
evthreadimpl_get_id_() {
	return evthread_id_fn_ ? evthread_id_fn_() : 1;
}

之前說過,當調用evthread_use_windows_threads,設置了libevent獲取線程id的回調函數evthread_id_fn_。也正因為此,才會去跑下去執行evthread_notify_base函數:

static int
evthread_notify_base(struct event_base *base) {
	EVENT_BASE_ASSERT_LOCKED(base);
	if (!base->th_notify_fn)
		return -1;
	if (base->is_notify_pending)
		return 0;
	base->is_notify_pending = 1;
	return base->th_notify_fn(base);
}

所以,當我們在邏輯線程調用bufferevent_write嘗試發送一段數據的時候,它會依據以下的調用,通知網絡線程:

bufferevent_write-->evbuffer_add-->evbuffer_invoke_callbacks_-->bufferevent_socket_evtbuf_cb_-->bufferevent_add_event_-->event_add-->event_add_nolock_-->evthread_notify_base

以上便是libevent跨線程喚醒的邏輯。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM