在前面的文章中,其實很多代碼就涉及到加鎖釋放鎖的動作了,但是自己一直避免去深究他們,好了這篇文章就講Nginx是如何實現鎖的吧,然后還要講Nginx是如何使用鎖來避免驚群的發生。
在Nginx的鎖的實現中,要分為兩種情況,分別為支持原子操作以與不支持原子操作。其定義在Ngx_shmtx.h當中:
//鎖的定義 typedef struct { #if (NGX_HAVE_ATOMIC_OPS) ngx_atomic_t *lock; //如果支持原子鎖的話,那么使用它 #if (NGX_HAVE_POSIX_SEM) ngx_atomic_t *wait; ngx_uint_t semaphore; sem_t sem; #endif #else ngx_fd_t fd; //不支持原子操作的話就使用文件鎖來實現 u_char *name; #endif ngx_uint_t spin; //這是自旋鎖么? } ngx_shmtx_t;
嗯,其實定義還是很簡單的,一看就明白了。好接下來看支持原子操作的方式是如何實現的吧,在ngx_event_core_module模塊的ngx_event_module_init函數中會有如下代碼:
/*后面將會創建size大小的共享內存,這塊共享內存將被均分成三段, 分別供ngx_accept_mutex、ngx_connection_counter、ngx_temp_number 使用。 */ /* cl should be equal to or greater than cache line size */ cl = 128; size = cl /* ngx_accept_mutex */ + cl /* ngx_connection_counter */ + cl; /* ngx_temp_number */ //共享內存的初始化 shm.size = size; shm.name.len = sizeof("nginx_shared_zone"); shm.name.data = (u_char *) "nginx_shared_zone"; shm.log = cycle->log; if (ngx_shm_alloc(&shm) != NGX_OK) { //為共享內存分配內存空間 return NGX_ERROR; } shared = shm.addr; //獲取共享內存的地址 ngx_accept_mutex_ptr = (ngx_atomic_t *) shared; //存放互斥量內存地址的指針 ngx_accept_mutex.spin = (ngx_uint_t) -1; //初始化自旋鎖的初值為-1 if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared, //如果支持原子操作的話,這個就很簡單了,就直接將內存地址分配過去就行了 cycle->lock_file.data) != NGX_OK) { return NGX_ERROR; } ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl); //ngx_connection_counter為其分配共享內存的內存空間 (void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1); ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "counter: %p, %d", ngx_connection_counter, *ngx_connection_counter); ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl); //ngx_temp_number的內存空間
這段代碼的意思是首先調用ngx_shm_alloc函數創建共享內存,然后再為ngx_accept_mutex變量在其中分配其lock域內存,嗯,這個變量的用處大概大家也知道吧。(其實lock說白了也就是一個64位的int而已),當然共享內存中還有其他一些變量的定義,ngx_connection_counter變量用於保存當前服務器總共持有的connection。
嗯,注意ngx_shmtx_create函數,它用於創建鎖,這里有兩種方式的實現,分別為支持原子操作,和不支持原子操作的兩種,這里我們只看支持原子操作的方式吧:
//為鎖mtx的lock域分配內存 ngx_int_t ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name) { mtx->lock = &addr->lock; //其實就是直接將內存地址賦個mtx的lock域就完事了 if (mtx->spin == (ngx_uint_t) -1) { return NGX_OK; } mtx->spin = 2048; return NGX_OK; }
上面的代碼夠簡單吧。
嗯,接下來看Nginx如何獲取以及釋放鎖。嗯,實現有兩種,分別為lock與trylock,如果是lock的話,那么會組設,也就是自旋,直到獲取了鎖位置,如果使用trylock的話,那么就是非阻塞的方式,如果沒有獲取到,那么直接返回錯誤就好了。我們先看trylock吧,定義在Ngx_shmtx.c 當中(還是只看支持原子操作的實現方式吧):
//嘗試獲取鎖,原子的方式 ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)); }
嗯,其實很簡單,首先是判斷mtx的lock域是否等於0,如果不等於,那么就直接返回false好了,如果等於的話,那么就要調用原子操作ngx_atomic_cmp_set了,它用於比較mtx的lock域,如果等於零,那么設置為當前進程的進程id號,否則返回false。嗯,這個ngx_atomic_cmp_set函數是跟體系結構相關的,這里就不細講了。
然后就可以將lock的實現了,
//嘗試獲取鎖,原子的方式 ngx_uint_t ngx_shmtx_trylock(ngx_shmtx_t *mtx) { return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)); } //阻塞的方式獲取鎖 void ngx_shmtx_lock(ngx_shmtx_t *mtx) { ngx_uint_t i, n; ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock"); //一個死循環,不斷的去看是否獲取了鎖,直到獲取了之后才退出 for ( ;; ) { //如果獲取了鎖,那么就可以直接返回了 if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } //如果cpu的數量大於一 if (ngx_ncpu > 1) { for (n = 1; n < mtx->spin; n <<= 1) { for (i = 0; i < n; i++) { ngx_cpu_pause(); } if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) { return; } } } ngx_sched_yield(); } }
一個for循環就暴露了其自旋的本質。里面還涉及到 一些優化的,嗯,我也不太懂,以后再說吧。接下來就可以將unlock了:
//釋放鎖 void ngx_shmtx_unlock(ngx_shmtx_t *mtx) { if (mtx->spin != (ngx_uint_t) -1) { ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock"); } if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) { ngx_shmtx_wakeup(mtx); } }
嗯,還是很簡單,判斷鎖的lock域與當前進程的進程id是否相等,如果相等的話,那么就將lock設置為0,然后就相當於釋放了鎖。
好接下來可以看如何用鎖來避免驚群了。在ngx_event_core_module模塊的ngx_event_module_init函數中我們已經看到了ngx_accept_mutex的lock域的內存是在共享內存中,因而,所有worker進程都共享它,在ngx_process_events_and_timers函數中我們可以看到如下的代碼:
/*嘗試鎖accept mutex,只有成功獲取鎖的進程,才會將listen 套接字放入epoll中。因此,這就保證了只有一個進程擁有 監聽套接口,故所有進程阻塞在epoll_wait時,不會出現驚群現象。 */ //這里的ngx_trylock_accept_mutex函數中,如果順利的獲取了鎖,那么它會將監聽端口注冊到當前worker進程的epoll當中 if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { return; }
函數ngx_trylock_accept_mutex用於嘗試獲取ngx_accept_mutex鎖,如果獲取了的話,那么就將listening加入到epoll當中,我們可以來看這個函數:
//嘗試獲取鎖,如果獲取了鎖,那么還要將當前監聽端口全部注冊到當前worker進程的epoll當中去 ngx_int_t ngx_trylock_accept_mutex(ngx_cycle_t *cycle) { if (ngx_shmtx_trylock(&ngx_accept_mutex)) { //嘗試獲取互斥鎖 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex locked"); //如果本來已經獲得鎖,則直接返回Ok if (ngx_accept_mutex_held && ngx_accept_events == 0 && !(ngx_event_flags & NGX_USE_RTSIG_EVENT)) { return NGX_OK; } //到達這里,說明重新獲得鎖成功,因此需要打開被關閉的listening句柄,調用ngx_enable_accept_events函數,將監聽端口注冊到當前worker進程的epoll當中去 if (ngx_enable_accept_events(cycle) == NGX_ERROR) { ngx_shmtx_unlock(&ngx_accept_mutex); return NGX_ERROR; } ngx_accept_events = 0; ngx_accept_mutex_held = 1; //表示當前獲取了鎖 return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "accept mutex lock failed: %ui", ngx_accept_mutex_held); //這里表示的是以前曾經獲取過,但是這次卻獲取失敗了,那么需要將監聽端口從當前的worker進程的epoll當中移除,調用的是ngx_disable_accept_events函數 if (ngx_accept_mutex_held) { if (ngx_disable_accept_events(cycle) == NGX_ERROR) { return NGX_ERROR; } ngx_accept_mutex_held = 0; //表示當前並沒有獲取鎖 } return NGX_OK; }
調用ngx_shmtx_trylock來嘗試獲取ngx_accept_mutex鎖,如果獲取了的話,在判斷在上次循環中是否已經獲取了鎖,如果獲取了,那么listening就已經在當前worker進程的epoll當中了,否則的話就調用ngx_enable_accept_events函數來講listening加入到epoll當中,並要對變量ngx_accept_mutex_held賦值,表示已經獲取了鎖。如果沒有獲取到鎖的話,還要判斷上次是否已經獲取了鎖,如果上次獲取了的話,那么還要調用ngx_disable_accept_events函數,將listening從epoll當中移除。
嗯,就這樣就可以保證所有的worker進程中就只有一個worker將listening放入到了epoll當中,也就避免了驚群的發生。好了,就講完了(當然我只講了有原子操作的情況下的實現方案,並沒有講文件鎖的實現方案,但是其實也都大同小異)。
轉自:http://www.xuebuyuan.com/2041519.html