Nginx的鎖的實現以及驚群的避免


在前面的文章中,其實很多代碼就涉及到加鎖釋放鎖的動作了,但是自己一直避免去深究他們,好了這篇文章就講Nginx是如何實現鎖的吧,然后還要講Nginx是如何使用鎖來避免驚群的發生。

在Nginx的鎖的實現中,要分為兩種情況,分別為支持原子操作以與不支持原子操作。其定義在Ngx_shmtx.h當中:

//鎖的定義
typedef struct {
#if (NGX_HAVE_ATOMIC_OPS)
    ngx_atomic_t  *lock;  //如果支持原子鎖的話,那么使用它
#if (NGX_HAVE_POSIX_SEM) 
    ngx_atomic_t  *wait;
    ngx_uint_t     semaphore;
    sem_t          sem;
#endif
#else
    ngx_fd_t       fd;   //不支持原子操作的話就使用文件鎖來實現
    u_char        *name;
#endif
    ngx_uint_t     spin;     //這是自旋鎖么?
} ngx_shmtx_t;

嗯,其實定義還是很簡單的,一看就明白了。好接下來看支持原子操作的方式是如何實現的吧,在ngx_event_core_module模塊的ngx_event_module_init函數中會有如下代碼:

    /*后面將會創建size大小的共享內存,這塊共享內存將被均分成三段, 
    分別供ngx_accept_mutex、ngx_connection_counter、ngx_temp_number 
    使用。 
    */  
    /* cl should be equal to or greater than cache line size */
    cl = 128;
    size = cl            /* ngx_accept_mutex */
           + cl          /* ngx_connection_counter */
           + cl;         /* ngx_temp_number */

//共享內存的初始化
    shm.size = size;
    shm.name.len = sizeof("nginx_shared_zone");
    shm.name.data = (u_char *) "nginx_shared_zone";
    shm.log = cycle->log;

    if (ngx_shm_alloc(&shm) != NGX_OK) {   //為共享內存分配內存空間
        return NGX_ERROR;
    }

    shared = shm.addr;   //獲取共享內存的地址

    ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;   //存放互斥量內存地址的指針
    ngx_accept_mutex.spin = (ngx_uint_t) -1;   //初始化自旋鎖的初值為-1

    if (ngx_shmtx_create(&ngx_accept_mutex, (ngx_shmtx_sh_t *) shared,  //如果支持原子操作的話,這個就很簡單了,就直接將內存地址分配過去就行了
                         cycle->lock_file.data)
        != NGX_OK)
    {
        return NGX_ERROR;
    }

    ngx_connection_counter = (ngx_atomic_t *) (shared + 1 * cl);   //ngx_connection_counter為其分配共享內存的內存空間

    (void) ngx_atomic_cmp_set(ngx_connection_counter, 0, 1);

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "counter: %p, %d",
                   ngx_connection_counter, *ngx_connection_counter);

    ngx_temp_number = (ngx_atomic_t *) (shared + 2 * cl);  //ngx_temp_number的內存空間

這段代碼的意思是首先調用ngx_shm_alloc函數創建共享內存,然后再為ngx_accept_mutex變量在其中分配其lock域內存,嗯,這個變量的用處大概大家也知道吧。(其實lock說白了也就是一個64位的int而已),當然共享內存中還有其他一些變量的定義,ngx_connection_counter變量用於保存當前服務器總共持有的connection。

嗯,注意ngx_shmtx_create函數,它用於創建鎖,這里有兩種方式的實現,分別為支持原子操作,和不支持原子操作的兩種,這里我們只看支持原子操作的方式吧:

//為鎖mtx的lock域分配內存
ngx_int_t
ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
{
    mtx->lock = &addr->lock;  //其實就是直接將內存地址賦個mtx的lock域就完事了

    if (mtx->spin == (ngx_uint_t) -1) {
        return NGX_OK;
    }

    mtx->spin = 2048;

    return NGX_OK;
}

上面的代碼夠簡單吧。

嗯,接下來看Nginx如何獲取以及釋放鎖。嗯,實現有兩種,分別為lock與trylock如果是lock的話,那么會組設,也就是自旋,直到獲取了鎖位置,如果使用trylock的話,那么就是非阻塞的方式,如果沒有獲取到,那么直接返回錯誤就好了。我們先看trylock吧,定義在Ngx_shmtx.c 當中(還是只看支持原子操作的實現方式吧):

//嘗試獲取鎖,原子的方式
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

嗯,其實很簡單,首先是判斷mtx的lock域是否等於0,如果不等於,那么就直接返回false好了,如果等於的話,那么就要調用原子操作ngx_atomic_cmp_set了,它用於比較mtx的lock域,如果等於零,那么設置為當前進程的進程id號,否則返回false。嗯,這個ngx_atomic_cmp_set函數是跟體系結構相關的,這里就不細講了。

然后就可以將lock的實現了,

//嘗試獲取鎖,原子的方式
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
    return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}

//阻塞的方式獲取鎖
void
ngx_shmtx_lock(ngx_shmtx_t *mtx)
{
    ngx_uint_t         i, n;

    ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
//一個死循環,不斷的去看是否獲取了鎖,直到獲取了之后才退出
    for ( ;; ) {
//如果獲取了鎖,那么就可以直接返回了
        if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
            return;
        }
//如果cpu的數量大於一
        if (ngx_ncpu > 1) {
            for (n = 1; n < mtx->spin; n <<= 1) {
                for (i = 0; i < n; i++) {
                    ngx_cpu_pause();
                }

                if (*mtx->lock == 0
                    && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                {
                    return;
                }
            }
        }

        ngx_sched_yield();
    }
}

一個for循環就暴露了其自旋的本質。里面還涉及到 一些優化的,嗯,我也不太懂,以后再說吧。接下來就可以將unlock了:

//釋放鎖
void
ngx_shmtx_unlock(ngx_shmtx_t *mtx)
{
    if (mtx->spin != (ngx_uint_t) -1) {
        ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
    }

    if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
        ngx_shmtx_wakeup(mtx);
    }
}

嗯,還是很簡單,判斷鎖的lock域與當前進程的進程id是否相等,如果相等的話,那么就將lock設置為0,然后就相當於釋放了鎖。

好接下來可以看如何用鎖來避免驚群了。在ngx_event_core_module模塊的ngx_event_module_init函數中我們已經看到了ngx_accept_mutex的lock域的內存是在共享內存中,因而,所有worker進程都共享它,在ngx_process_events_and_timers函數中我們可以看到如下的代碼:

            /*嘗試鎖accept mutex,只有成功獲取鎖的進程,才會將listen 
              套接字放入epoll中。因此,這就保證了只有一個進程擁有 
              監聽套接口,故所有進程阻塞在epoll_wait時,不會出現驚群現象。 
            */  
            //這里的ngx_trylock_accept_mutex函數中,如果順利的獲取了鎖,那么它會將監聽端口注冊到當前worker進程的epoll當中
            if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                return;
            }

函數ngx_trylock_accept_mutex用於嘗試獲取ngx_accept_mutex鎖,如果獲取了的話,那么就將listening加入到epoll當中,我們可以來看這個函數:

//嘗試獲取鎖,如果獲取了鎖,那么還要將當前監聽端口全部注冊到當前worker進程的epoll當中去
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {  //嘗試獲取互斥鎖
        
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");
        //如果本來已經獲得鎖,則直接返回Ok
        if (ngx_accept_mutex_held
            && ngx_accept_events == 0
            && !(ngx_event_flags & NGX_USE_RTSIG_EVENT))
        {
            return NGX_OK;
        }
        //到達這里,說明重新獲得鎖成功,因此需要打開被關閉的listening句柄,調用ngx_enable_accept_events函數,將監聽端口注冊到當前worker進程的epoll當中去
        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;  //表示當前獲取了鎖

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);
//這里表示的是以前曾經獲取過,但是這次卻獲取失敗了,那么需要將監聽端口從當前的worker進程的epoll當中移除,調用的是ngx_disable_accept_events函數
    if (ngx_accept_mutex_held) {
        if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0;   //表示當前並沒有獲取鎖
    }

    return NGX_OK;
}

調用ngx_shmtx_trylock來嘗試獲取ngx_accept_mutex鎖,如果獲取了的話,在判斷在上次循環中是否已經獲取了鎖,如果獲取了,那么listening就已經在當前worker進程的epoll當中了,否則的話就調用ngx_enable_accept_events函數來講listening加入到epoll當中,並要對變量ngx_accept_mutex_held賦值,表示已經獲取了鎖。如果沒有獲取到鎖的話,還要判斷上次是否已經獲取了鎖,如果上次獲取了的話,那么還要調用ngx_disable_accept_events函數,將listening從epoll當中移除。

嗯,就這樣就可以保證所有的worker進程中就只有一個worker將listening放入到了epoll當中,也就避免了驚群的發生。好了,就講完了(當然我只講了有原子操作的情況下的實現方案,並沒有講文件鎖的實現方案,但是其實也都大同小異)。

 

轉自:http://www.xuebuyuan.com/2041519.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM