線程之線程同步


前言

當多個控制線程共享相同的內存時,需要確保每個線程看到一致的數據視圖。如果每個線程使用的變量都是其他線程不會讀取或修改的,那么就不會存在一致性問題。同樣地,如果變量是只讀的,多個線程同時讀取該變量也不會有一致性問題。但是,當某個線程可以修改變量,而其他線程也可以讀取或修改這個變量的時候,就需要對這些線程進行同步,以確保它們在訪問變量的存儲內容時不會訪問到無效的數值。

當一個線程修改變量時,其他線程在讀取這個變量的值時就可能會看到不一致的數據。在變量修改時間多於一個存儲器訪問周期的處理器結構中,當存儲器讀與存儲器寫這兩個周期交叉時,這種潛在的不一致性就會出現。當然,這種行為是與處理器結構相關的,但是可移植性程序並不能對使用何種處理器結果作出假設。

圖11-2描述了兩個線程讀寫相同變量的假設例子。在這個例子中,線程A讀取變量然后給這個變量賦予一個新的值,但寫操作需要兩個存儲器周期。當線程B在這兩個存儲器寫周期中間讀取這個相同的變量時,它就會得到不一致的值。

09092847-9fa6dfc1d3fc41f1a6e2f477c09bb5dd

為了解決這個問題,線程不得不使用鎖,在同一時間只允許一個線程訪問該變量。圖11-3描述了這種同步。如果線程B希望讀取變量,它首先要獲取鎖;同樣地,當線程A更新變量時,也需要獲取這把同樣的鎖。因而線程B在線程A釋放鎖以前不能讀取變量。

當兩個或多個線程試圖在同一時間修改同一變量時,也需要進行同步。考慮變量遞增操作的情況(圖11-4),增量操作通常可分為三步:

(1)從內存單元讀入寄存器。

(2)在寄存器中進行變量值的增加。

(3)把新的值寫回內存單元。

28852942_1370263391En91

                        圖11-4 兩個非同步的線程對同一個變量做增量操作

如果兩個線程試圖在幾乎同一時間對同一變量做增量操作而不進行同步的話,結果就可能出現不一致。變量可能比原來增加了1,也有可能比原來增加了2,具體是1還是2取決於第二個線程開始操作時獲取的數值。如果第二個線程執行第一步要比第一個線程執行第三步早,第二個線程讀到的初始值就與第一個線程一樣,它為變量加1,然后再寫回去,事實上沒有實際的效果,總的來說變量只增加了1。

如果修改操作是原子操作,那么就不存在競爭。在前面的例子中,如果增加1只需要一個存儲器周期,那么就沒有競爭存在。如果數據總是以順序一致的方式出現,就不需要額外的同步。當多個線程並不能觀察到數據的不一致時,那么操作就是順序一致的。在現代計算機系統中,存儲訪問需要多個總線周期,多處理器的總線周期通常在多個處理器上是交叉的,所以無法保證數據是順序一致的。

在順序一致的環境中,可以把數據修改操作解釋為運行線程的順序操作步驟。可以把這樣的情形描述為“線程A對變量增加了1,然后線程B對變量增加了1,所以變量的值比原來的大2”,或者描述為“線程B對變量增加了1,然后線程A對變量增加了1,所以變量的值比原來的大2”。這兩個線程的任何操作順序都不可能讓變量出現除了上述值以外的其他數值。

除了計算機體系結構的因素以外,程序使用變量的方式也會引起競爭,也會導致不一致的情況發生。例如,可能會對某個變量加1,然后基於這個數值作出某種決定。增量操作這一步和作出決定這一步兩者的組合並非原子操作,因而給不一致情況的出現提供了可能。

1、互斥量

可以通過使用pthread的互斥接口保護數據,確保同一時間只有一個線程訪問數據。互斥量(mutex從本質上說是一把鎖,在訪問共享資源前對互斥量進行加鎖,在訪問完成后釋放互斥量上的鎖。對互斥量進行加鎖以后,任何其他試圖再次對互斥量加鎖的線程將會被阻塞直到當前線程釋放該互斥鎖。如果釋放互斥鎖時有多個線程阻塞,所有在該互斥鎖上的阻塞線程都會變成可運行狀態,第一個變為運行狀態的線程可以對互斥量加鎖,其他線程將會看到互斥鎖依然被鎖住,只能回去再次等待它重新變為可用。在這種方式下,每次只有一個線程可以向前執行。

在設計時需要規定所有的線程必須遵守相同的數據訪問規則,只有這樣,互斥機制才能正常工作。操作系統並不會做數據訪問的串行化。如果允許其中的的某個線程在沒有得到鎖的情況下也可以訪問共享資源,那么即使其他的線程在使用共享資源前都獲取了鎖,也還是會出現數據不一致的問題。

互斥變量用pthread_mutex_t數據類型表示在使用互斥變量以前,必須首先對它進行初始化,可以把它置為常量PTHREAD_MUTEX_INITIALIZER(只對靜態分配的互斥量),也可以通過調用pthread_mutex_init函數進行初始化。如果動態地分配互斥量(例如通過調用malloc函數),那么在釋放內存前需要調用pthread_mutex_destroy。

#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex, 
                       const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);
返回值:若成功則返回0,否則返回錯誤編號

要用默認的屬性初始化互斥量,只需把attr設置為NULL。

對互斥量進行加鎖,需要調用pthread_mutex_lock,如果互斥量已經上鎖,調用線程將阻塞直到互斥量被解鎖。對互斥量解鎖,需要調用pthread_mutex_unlock。

#include <pthread.h>
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:若成功則返回0,否則返回錯誤編號

如果線程不希望被阻塞,它可以使用pthread_mutex_trylock嘗試對互斥量進行加鎖。如果調用pthread_mutex_trylock時互斥量處於未鎖住狀態,那么pthread_mutex_trylock將鎖住互斥量,不會出現阻塞並返回0,否則pthread_mutex_trylock就會失敗,不能鎖住互斥量,而返回EBUSY。

實例

程序清單11-5描述了用於保護某個數據結構的互斥量。當多個線程需要訪問動態分配的對象時,可以在對象中嵌入引用計數,確保在所有使用該對象的線程完成數據訪問之前,該對象內存空間不會被釋放。

程序清單11-5 使用互斥量保護數據結構

#include <stdlib.h>
#include <pthread.h>

struct foo {
    int            f_count;
    pthread_mutex_t    f_lock;
    /* more stuff here */
};

struct foo *
foo_alloc(void)    /* allocate the object */
{
    struct foo *fp;

    if((fp = malloc(sizeof(struct foo))) != NULL)
    {
        fp->f_count = 1;
        if(pthread_mutex_init(&fp->f_clock, NULL) != 0)
        {
            free(fp);
            return(NULL);
        }    
        /* continue initialization */
    }
    return(fp);
}

void 
foo_hold(struct foo *fp) /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_clock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}

void
foo_rele(struct foo *fp) /* release a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    if(--fp->f_count == 0)    /* last reference */
    {
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }
    else
    {
        pthread_mutex_unlock(&fp->f_lock);
    }
}

在使用該對象前,線程需要對這個對象的引用計數加1,當對象使用完畢時,需要對引用計數減1。當最后一個引用被釋放時,對象所占的內存空間就被釋放。在對引用計數加1、減1以及檢查引用計數是否為0這些操作之前需要鎖住互斥量。

2、避免死鎖

如果線程試圖對同一個互斥量加鎖兩次,那么它自身就會陷入死鎖狀態,使用互斥量時,還有其他更不明顯的方式也能產生死鎖。例如,程序中使用多個互斥量時,如果允許一個線程一直占有第一個互斥量,並且在試圖鎖住第二個互斥量時處於阻塞狀態,但是擁有第二個互斥量的線程也在試圖鎖住第一個互斥量,這時就會發生死鎖。因為兩個線程都在相互請求另一個線程擁有的資源,所以這兩個線程都無法向前運行,於是就產生死鎖。

可以通過小心地控制互斥量加鎖的順序來避免死鎖的發生。例如,假設需要對兩個互斥量A和B同時加鎖,如果所有線程總是在對互斥量B加鎖之前鎖住互斥量A,那么使用這兩個互斥量不會產生死鎖(當然在其他資源上仍可能出現死鎖);類似地,如果所有的線程總是在鎖住互斥量A之前鎖住互斥量B,那么也不會發生死鎖。只有在一個線程試圖以與另一個線程相反的順序鎖住互斥量時,才可能出現死鎖。

有時候應用程序的結果使得對互斥量加鎖進行排序是很困難的,如果涉及了太多的鎖和數據結構,可用的函數並不能把它轉換成簡單的層次,那么就需要采用另外的方法。可以先釋放占有的鎖,然后過一段時間再試。這種情況可以使用pthread_mutex_trylock接口避免死鎖。如果已經占有某些鎖而且pthread_mutex_trylock接口返回成功,那么就可以前進;但是,如果不能獲取鎖,可以先釋放已經占有的鎖,做好清理工作,然后過一段時間重新嘗試。

實例

程序清單11-6(修改自程序清單11-5)用以描述兩個互斥量的使用方法。當同時使用兩個互斥量時,總是讓它們以相同的順序加鎖,以避免死鎖。第二個互斥量維護着一個用於跟蹤foo數據結構的散列列表。這樣hashlock互斥量保護foo數據結構中的fh散列表和f_next散列鏈字段。foo結構中的f_lock互斥量保護對foo結構中的其他字段的訪問。

程序清單11-6 使用兩個互斥量

#include <stdlib.h>
#include <pthread.h>

#define NHASH    29
#define    HASH(fp)    (((unsigned long)fp)%NHASH)
struct foo *fh[NHASH];

pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo {
    int        f_count;
    pthread_mutex_t    f_lock;
    struct foo    *f_next;    /* protected by hashlock */
    int        f_id;
    /* more stuff here */
};

struct foo *
foo_alloc(void)    /* allocate the object */
{
    struct foo *fp;
    int        idx;

    if((fp = malloc(sizeof(struct foo))) != NULL)
    {
        fp->f_count = 1;
        if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
        {
            free(fp);
            return(NULL);
        }
        idx = HASH(fp);
        pthread_mutex_lock(&hashlock);
        fp->f_next = fh[idx];
        fh[idx] = fp;
        pthread_mutex_lock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        /* continue initialization */
        pthread_mutex_unlock(&fp->f_lock);
    }
    return(fp);
}

void
foo_hold(struct foo *fp)    /* add a reference to the object */
{
    pthread_mutex_lock(&fp->f_lock);
    fp->f_count++;
    pthread_mutex_unlock(&fp->f_lock);
}

struct foo *
foo_find(int id)    /* find an existing object */
{
    struct foo    *fp;
    int           idx;

    idx = HASH(fp);
    pthread_mutex_lock(&hashlock);
    for(fp = fh[idx]; fp != NULL; fp = fp->f_next)
    {
        if(fp->f_id == id)
        {
            foo_hold(fp);
            break;    
        }
    }
    pthread_mutex_unlock(&hashlock);
    return(fp);
}

void
foo_rele(struct foo *fp)    /* release a reference to the object */
{
    struct foo     *tfp;
    int            idx;

    pthread_mutex_lock(&fp->f_lock);
    if(fp->f_count == 1) /* last reference */
    {
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_lock(&hashlock);
        pthread_mutex_lock(&fp->f_lock);
        /* need to recheck the condition */
        if(fp->f_count != 1)
        {
            fp->f_count--;
            pthread_mutex_unlock(&fp->f_lock);
            pthread_mutex_unlock(&hashlock);
            return;
        }
        /* remove from list */
        idx = HASH(fp);
        tfp = fh[idx];
        if(tfp == fp)
        {
            fh[idx] = fp->f_next;
        }
        else
        {
            while(tfp->f_next != fp)
            tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }
        pthread_mutex_unlock(&hashlock);
        pthread_mutex_unlock(&fp->f_lock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }
    else
    {
        fp->f_count--;
        pthread_mutex_unlock(&fp->f_lock);
    }
}

比較程序清單11-6和程序清單11-5,可以看出分配函數現在鎖住散列列表鎖,把新的結構添加到散列存儲桶中,在對散列列表的鎖解鎖之前,先鎖住新結構中的互斥量。因為新的結構是放在全局列表中的,其他線程可以找到它,所以在完成初始化之前,需要阻塞其他試圖訪問新結構的線程。

foo_find函數鎖住散列列表鎖,然后搜索被請求的結構。如果找到了,就增加其引用計數並返回指向該結構的指針。注意加鎖的順序是先在foo_find函數中鎖定散列列表鎖,然后再在foo_hold函數中鎖定foo結構中的f_clock互斥量。

現在有了兩個鎖以后,foo_rele函數變得更加復雜。如果這是最后一個引用,因為將需要從散列列表中刪除這個結構,就要先對這個結構互斥量進行解鎖,才可以獲取散列列表鎖。然后重新獲取結構互斥量。從上一次獲得結構互斥量以來可能處於被阻塞狀態,所以需要重新檢查條件,判斷是否還需要釋放這個結構。如果其他線程在我們為滿足鎖順序而阻塞時發現了這個結構並對其引用計數加1,那么只需要簡單地對引用計數減1,對所有的東西解鎖然后返回。

如此加、解鎖太復雜,所以需要重新審視原來的設計。也可以使用散列列表鎖來保護引用計數,使事情大大簡化,結構互斥量可以用於保護foo結構中的其他任何東西。程序清單11-7反應了這種變化。

程序清單11-7 簡化的加、解鎖

#include <stdlib.h>
#include <pthread.h>

#define NHASH 29
#define HASH(fp)    (((unsigned long)fp)%NHASH)

struct foo *fh[HASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;

struct foo {
    int            f_count;    /* protected by hashlock */
    pthread_mutext_t    f_lock;
    struct foo        *f_next;    /* protected by hashlock */
    int            f_id;
    /* more stuff here */
};

struct foo *
foo_alloc(void)    /* allocate the object */
{
    struct foo     *fp;
    int        idx;

    if((fp = malloc(sizeof(struct foo))) != NULL)
    {
        fp->f_count = 1;
        if(pthread_mutex_init(&fp->f_lock, NULL) != 0)
        {
            free(fp);
            return(NULL);
        }    
        idx = HASH(fp);
        pthread_mutex_lock(&hashlock);
        fp->f_next = fh[idx];
        fh[idx] = fp;
        pthread_mutex_lock(&fp->f_lock);
        pthread_mutex_unlock(&hashlock);
        /* continue initialization */
    }
    return(fp);    
}

void 
foo_hold(struct foo *fp)    /* add a reference to the object */
{
    pthread_mutex_lock(&hashlock);
    fp->f_count++;
    pthread_mutex_unlock(&hashlock);
}

struct foo *
foo_find(int id)    /* find a existing object */
{
    struct foo     *fp;    
    int        idx;
        
    idx = HASH(fp);
    pthread_mutex_lock(&hashlock);
    for(fp = fh[idx]; fp != NULL; fp = fp->f_next)
    {
        if(fp->f_id == id)
        {
            fp->f_count++;
            break;                                    }
    }

    pthread_mutex_unlock(&hashlock);
    return(fp);
}

void
foo_rele(struct foo *fp)    /* release a reference to the object */
{
    struct foo     *tfp;
    int        idx;
        
    pthread_mutex_lock(&hashlock);
    if(--fp->f_count == 0)    /* last reference, remove from list */
    {
        idx = HASH(fp);
        tfp = fh[idx];
        if(tfp == fp)
        {
            fh[idx] = fp->f_next;        
        }
        else
        {    
            while(tfp->f_next != fp)
                tfp = tfp->f_next;
            tfp->f_next = fp->f_next;
        }

        pthread_mutex_unlock(&hashlock);
        pthread_mutex_destroy(&fp->f_lock);
        free(fp);
    }
    else
    {
        pthread_mutex_unlock(&hashlock);
    }
}

注意,與程序清單11-6中的程序相比,程序清單11-7中的程序簡單得多。兩種用途使用相同的鎖時,圍繞散列列表和引用計數的鎖的排序問題就隨之不見了。多線程的軟件設計經常要考慮這類折中處理方案。如果鎖的粒度太粗,就會出現很多線程阻塞等待相同的鎖,源自並發性的改善微乎其微。如果鎖的粒度太細,那么過多的鎖開銷會使系統性能受到影響,而且代碼變得相當復雜。作為一個程序員,需要在滿足鎖需求的情況下,在代碼復雜性和優化性能之間找到好平衡點。

對fp->f_next = fh[idx]; fh[idx] = fp;的理解: http://bbs.csdn.net/topics/330092120

 

3、pthread_mutex_timedlock函數(第三版新增)

當請求一個已經加鎖的互斥量時,如果我們想要限定線程阻塞的時間(時間到了就不再阻塞等待),這時需要使用pthread_mutex_timedlock函數。pthread_mutex_timedlock函數類似於pthread_mutex_lock,只不過一旦設置的超時值到達,pthread_mutex_timedlock函數會返回錯誤代碼ETIMEDOUT,線程不再阻塞等待。

#include <pthread.h>
#include <time.h>

int pthread_mutex_timedlock( pthread_mutex_t *restrict mutex,
                             const struct timespec *restrict tsptr );

返回值:若成功則返回0,失敗則返回錯誤代碼

超時值指定了我們要等待的時間,它使用絕對時間(而不是相對時間:我們指定線程將一直阻塞等待直到時刻X,而不是說我們將要阻塞X秒鍾。)。該時間值用timespec結構表示:秒和納秒。

實例

下面的實例說明了如何使用pthread_mutex_timedlock來避免線程無限期地阻塞。

#include "apue.h"
#include <pthread.h>
#include <sys/time.h>
#include <time.h>

int 
main(void)
{
    int                err;
    struct    timespec tout;
    struct     tm     *tmp;
    char        buf[64];
    pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;

    pthread_mutex_lock(&lock);
    printf("mutex is locked\n");
    clock_gettime(CLOCK_REALTIME, &tout);
    tmp = localtime(&tout.tv_sec);
    strftime(buf, sizeof(buf), "%r", tmp);
    printf("current time is %s\n", buf);
    tout.tv_sec += 10;    /* 10 seconds from now */
    /* caution: this could lead to deadlock */
    err = pthread_mutex_timedlock(&lock, &tout);
    clock_gettime(CLOCK_REALTIME, &tout);
    tmp = localtime(&tout.tv_sec);
    strftime(buf, sizeof(buf), "%r", tmp);
    printf("the time is now %s\n", buf);
    if(err == 0)
        printf("mutex locked again!\n");
    else    
        printf("can't lock mutex again: %s\n", strerror(err));
    exit(0);
}

注意,編譯上面程序時需要加上-lrt,否則會出現“undefined reference to‘clock_gettime’”這樣的連接錯誤。(http://blog.csdn.net/langeldep/article/details/6427780)。

運行結果如下:

未命名

4、讀寫鎖

讀寫鎖與互斥量類似,不過讀寫鎖允許更高的並行性。互斥量要么是鎖住狀態要么是不加鎖狀態,而且一次只有一個線程可以對其加鎖。讀寫鎖可以有三種狀態:讀模式下加鎖狀態,寫模式下加鎖狀態,不加鎖狀態。一次只有一個線程可以占有寫模式的讀寫鎖,但是多個線程可以同時占有讀模式的讀寫鎖

當讀寫鎖是寫加鎖狀態時,在這個鎖被解鎖之前,所有試圖對這個鎖加鎖的線程都會被阻塞當讀寫鎖在讀加鎖狀態時,所有試圖以讀模式對它進行加鎖的線程都可以得到訪問權,但是如果線程希望以寫模式對此鎖進行加鎖,它必須阻塞直到所有的線程釋放讀鎖。雖然讀寫鎖的實現各不相同,但當讀寫鎖處於讀模式鎖住狀態時,如果有另外的線程試圖以寫模式加鎖,讀寫鎖通常會阻塞隨后的讀模式鎖請求。這樣可以避免讀模式鎖長期占用,而等待的寫模式鎖請求一直得不到滿足

讀寫鎖非常適合於對數據結構讀的次數遠大於寫的情況。當讀寫鎖在寫模式下時,它所保護的數據結構就可以被安全地修改,因為當前只有一個線程可以在寫模式下擁有這個鎖。當讀寫鎖在讀模式下時,只要線程獲取了讀模式下的讀寫鎖,該鎖所保護的數據結構可以被多個獲得讀模式鎖的線程讀取。

讀寫鎖也叫做共享-獨占鎖,當讀寫鎖以讀模式鎖住時,它是以共享模式鎖住的;當它以寫模式鎖住時,它是以獨占模式鎖住的。

與互斥量一樣,讀寫鎖在使用之前必須初始化,在釋放它們底層的內存前必須銷毀。

#include <pthread.h>
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
                        const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
兩者的返回值都是:若成功則返回0,否則返回錯誤編號

讀寫鎖通過調用pthread_rwlock_init進行初始化。如果希望讀寫鎖有默認的屬性,可以傳一個空指針給attr。

在釋放讀寫鎖占用的內存之前,需要調用pthread_rwlock_destroy做清理工作。如果pthread_rwlock_init為讀寫鎖分配了資源,pthread_rwlock_destroy將釋放這些資源。如果在調用pthread_rwlock_destroy之前就釋放了讀寫鎖占用的內存空間,那么分配給這個鎖的資源就丟失了。

要在讀模式下鎖定讀寫鎖,需要調用pthread_rwlock_rdlock要在寫模式下鎖定讀寫鎖,需要調用pthread_rwlock_wrlock。不管以何種方式鎖住讀寫鎖,都可以調用pthread_rwlock_unlock進行解鎖

#include <pthread.h>
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
所有的返回值都是:若成功則返回0,否則返回錯誤編號

在實現讀寫鎖的時候可能會對共享模式下可獲取的鎖的數量進行限制,所以需要檢查pthread_rwlock_rdlock的返回值。即使pthread_rwlock_wrlock和pthread_rwlock_unlock有錯誤的返回值,如果鎖設計合理的話,也不需要檢查其返回值。錯誤返回值的定義只是針對不正確地使用讀寫鎖的情況,例如未經初始化的鎖,或者試圖獲取已擁有的鎖從而可能產生死鎖這樣的錯誤返回等。

Single UNIX Specification同樣定義了有條件的讀寫鎖原語的版本。

#include <pthread.h>
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
兩者的返回值都是:若成功則返回0,否則返回錯誤編號

可以獲取鎖時,函數返回0,否則,返回錯誤EBUSY。這些函數可以用於遵循某種鎖層次但還不能完全避免死鎖的情況。

 實例

程序清單11-8中的程序解釋了讀寫鎖的使用。作業請求隊列由單個讀寫鎖保護。實現多個工作線程獲取由單個主線程分配給它們的作業。

程序清單11-8 使用讀寫鎖

#include <stdlib.>
#include <pthread.h>

struct job {
    struct job *j_next;
    struct job *j_prev;
    pthread_t  j_id;    /* tells which thread handles this job */
    /* more stuff here */
};

struct queue {
    struct job        *q_head;
    struct job             *q_tail;
    pthread_rwlock_t    q_lock;
};

/*
* Initialize a queue. 
*/
int
queue_init(struct queue *qp)
{
    int err;
    
    qp->q_head = NULL;
    qp->q_tail = NULL;
    err = pthread_rwlock_init(&qp->q_lock, NULL);
    if(err != 0)
        return(err);

    /* continue initializationn */

    return(0);
}

/*
* Insert a job at the head of the queue.
*/
void
job_insert(struct queue *qp, struct job *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);
    jp->j_next = qp->q_head;
    jp->j_prev = NULL;
    if(qp->q_head != NULL)
        qp->q_head->j_prev = jp;
    else
        qp->q_tail = jp;    /* list was empty */
    qp->q_head = jp;
    pthread_rwlock_unlock(&qp->q_lock);
}

/*
* Append a job on the tail of the queue.
*/
void 
job_append(struct queue *qp, struct job *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);
    jp->j_next = NULL;
    jp->j_prev = qp->q_tail;
    if(qp->q_tail != NULL)
        qp->q_tail->j_next = jp;
    else    
        qp->q_head = jp; /* list was empty */
    qp->q_tail = jp;
    pthread_rwlock_unlock(&qp->q_lock);
}
    
/*
* Remove the  given job from a queue.
*/
void
job_remove(struct queue *qp, struct job *jp)
{
    pthread_rwlock_wrlock(&qp->q_lock);
    if(jp == qp->q_head)
    {
        qp->q_head = jp->j_next;
        if(qp->q_tail == jp)
            qp->q_tail = NULL;
    }
    else if (jp == qp-q_tail)
    {
        qp->q_tail = jp->j_prev;
        if(qp->q_head == jp)
            qp->q_head = NULL;
    }
    else
    {    
        jp->j_prev->j_next = jp->j_next;
        jp->j_next->j_prev = jp->j_prev;    
    }
    pthread_rwlock_unlock(&qp->q_lock);
}

/*
* Find a job for the given thread ID.
*/
struct job *
job_find(struct queue *qp, pthread_t id)
{
    struct job *jp;
    
    if(pthread_rwlock_rdlock(&qp->q_lock) != 0)
        return(NULL);

    for(jp = qp->q_head; jp != NULL; jp = jp->j_next)
        if(pthread_equal(jp->j_id, id))
            break;

    pthread_rwlock_unlock(&qp->q_lock);
    return(jp);
}

在這個例子中,不管什么時候需要增加一個作業到隊列中或者從隊列中刪除作業,都用寫模式鎖住隊列的讀寫鎖。不管何時搜索隊列,首先需要獲取讀模式下的鎖,允許所有的工作線程並發地搜索隊列。在這種情況下,只有線程搜索隊列的頻率遠遠高於增加或刪除作業時,使用讀寫鎖才能改善性能。

工作線程只能從隊列中讀取與它們的線程ID匹配的作業。既然作業結構同一時間只能由一個線程使用,所以不需要額外加鎖。

5、帶超時限制的讀寫鎖(第三版新增)

#include <pthread.h>
#include <time.h>

int pthread_rwlock_timedrdlock( pthread_rwlock_t *restrict rwlock,
               const struct timespec *restrict tsptr );

int pthread_rwlock_timedwrlock( pthread_rwlock_t *restrict rwlock,
                const struct timespec *restrict tsptr );

返回值:若成功則返回0,失敗則返回錯誤代碼

同pthread_mutex_timedlock與thread_mutex_lock的區別完全一樣。

6、條件變量

條件變量是線程可用的另一種同步機制(http://www.cnblogs.com/feisky/archive/2010/03/08/1680950.html)。條件變量給多個線程提供了一個會合的場所。條件變量與互斥量一起使用時,允許線程以無競爭的方式等待特定的條件發生。(條件變量基本概念和原理:http://hipercomer.blog.51cto.com/4415661/914841

線程同步:條件變量的使用細節分析http://blog.chinaunix.net/uid-28852942-id-3757186.html

條件本身是由互斥量保護的線程在改變條件狀態前必須首先鎖住互斥量,其他線程在獲得互斥量之前不會察覺到這種改變,因為必須鎖定互斥量以后才能計算條件

條件變量使用之前必須首先進行初始化pthread_cond_t數據類型代表的條件變量可以用兩種方式進行初始化,可以把常量PTHREAD_COND_INITIALIZER賦給靜態分配的條件變量,但是如果條件變量是動態分配的,可以使用pthread_cond_init函數進行初始化。

在釋放底層的內存空間之前,可以使用pthread_cond_destroy函數對條件變量進行去除初始化(deinitialize)。

#include <pthread.h>

int pthread_cond_init( pthread_cond_t *restrict cond,
           pthread_condattr *restrict attr );

int pthread_cond_destroy( pthread_cond_t *cond );

兩者的返回值都是:若成功則返回0,否則返回錯誤代碼

除非需要創建一個非默認屬性的條件變量,否則pthread_cond_init函數的attr參數可以設置為NULL。

使用pthread_cond_wait等待條件變為真,如果在給定的時間內條件不能滿足,那么會生成一個代碼出錯碼的返回值。

#include <pthread.h>

int pthread_cond_wait( pthread_cond_t *restrict cond,
             pthread_mutex_t *restrict mutex );

int pthread_cond_timedwait( pthread_cond_t *restrict cond,
                     pthread_mutex_t *restrict mutex,
                     const struct timespec *restrict timeout );

兩者的返回值都是:若成功則返回0,否則返回錯誤編號

傳遞給pthread_cond_wait的互斥量對條件進行保護,調用者把鎖住的互斥量傳給函數函數把調用線程放到等待條件的線程列表上,然后對互斥量解鎖,這兩個操作是原子操作。這樣就關閉了條件檢查和線程進入休眠狀態等待條件改變這兩個操作之間的時間通道,這樣線程就不會錯過條件的任何變化。pthread_cond_wait返回時,互斥量再次被鎖住。

pthread_cond_timedwait函數的工作方式與pthread_cond_wait函數相似,只是多了一個timeout。timeout值指定了等待的時間,它是通過timespec結構指定。時間值用秒數或者分秒數來表示,分秒數的單位是納秒。

struct timespec {
    time_t    tv_sec;    /* seconds */
    long      tv_nsec;   /* nanoseconds */
};

使用這個結構時,需要指定願意等待多長時間,時間值是一個絕對數而不是相對數。例如,如果能等待3分鍾,就需要把當前時間加上3分鍾再轉換到timespec結構,而不是把3分鍾轉換成timespec結構。

可以使用gettimeofday(http://www.cnblogs.com/nufangrensheng/p/3507715.html)獲取用timeval結構表示的當前時間,然后把這個時間轉換成timespec結構。要得到timeout值的絕對時間,可以使用下面的函數:

void 
maketimeout( struct timespec *tsp, long minutes )
{
    struct timeval now;

    /* get the current time */
    gettimeofday( &now );
    tsp->tv_sec = now.tv_sec;
    tsp->tv_nsec = now.tv_usec * 1000;    /* usec to nsec */
    /* add the offset to get timeout value */
    tsp->tv_sec += minutes * 60;
}

如果時間值到了但是條件還是沒有出現,pthread_cond_timedwait將重新獲取互斥量,然后返回錯誤ETIMEDOUT從pthread_cond_wait或者pthread_cond_timedwait調用成功返回時,線程需要重新計算條件,因為其他的線程可能已經在運行並改變了條件。

有兩個函數可以用於通知線程條件已經滿足。pthread_cond_signal函數將喚醒等待該條件的某個線程,而pthread_cond_broadcast函數將喚醒等待該條件的所有線程。

POSIX規范為了簡化實現,允許pthread_cond_signal在實現的時候可以喚醒不止一個線程。

#include <pthread.h>

int pthread_cond_signal( pthread_cond_t *cond );

int pthread_cond_broadcast( pthread_cond_t *cond );

兩者的返回值都是:若成功則返回0,否則返回錯誤編號

調用pthread_cond_signal或者pthread_cond_broadcast,也稱為向線程或條件發送信號。必須注意一定要在改變條件狀態以后再給線程發送信號

實例

程序清單11-9 使用條件變量

#include <pthread.h>

struct msg {
    struct msg *m_next;
    /* ... more stuff here ... */
};

struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;

void
process_msg(void)
{
    struct msg *mp;
    
    for( ; ; )
    {
        pthread_mutex_lock(&qlock);
        while(workq == NULL)      /* 從pthread_cond_wait或者pthread_cond_timedwait調用成功返回時,線程需要重新計算條件 */
        {
            pthread_cond_wait(&qready, &qlock);
        }        
        mp = workq;
        workq = mp->m_next;
        pthread_mutex_unlock(&qlock);
        /* now process the message mp */
    }
}

void
 enqueue_msg(struct msg *mp)
{
    pthread_mutex_lock(&qlock);
    mp->m_next = workq;
    workq = mp;
    pthread_mutex_unlock(&qlock);
    pthread_cond_signal(&qready);  /* 在改變條件狀態以后再給線程發送信號 */
}

7、Spin Locks(自旋鎖)(第三版新增)

自旋鎖與互斥量類似,區別是:若線程不能獲取鎖,互斥量是通過休眠的方式(線程被暫時取消調度,切換至其他可運行的線程)來阻塞線程;而自旋鎖則是通過忙等(busy-waiting,spinning)的方式(線程不會被取消調度,一直在處於運行狀態)來阻塞線程。自旋鎖適於用這樣的情況:鎖被其他線程短期持有(很快會被釋放),而且等待該鎖的線程不希望在阻塞期間被取消調度,因為這會帶來一些開銷。

自旋鎖通常被用作實現其他類型的鎖的低級原語。當一個線程在自旋等待一個鎖的時候,CPU不能做其他任何事了,這就浪費了CPU資源。這就是為什么要求自旋鎖只能被短期保持的原因。

自旋鎖在非搶占式內核中是非常有用的:除了提供互斥機制外,自旋鎖可以(以自旋的方式)阻塞中斷,這樣中斷處理程序就不會由於試圖獲取一個已經被鎖住的自旋鎖而使系統死鎖。在這些非搶占式的內核中,中斷處理程序是不能休眠的,所以它們唯一可以使用的同步原語只有自旋鎖。

然而,在用戶級,自旋鎖並不是十分有用,除非你在一個不允許搶占的實時調度類中運行。在分時調度類中運行的用戶級線程可以被取消調度:當它的時間片用完或者一個更高有效級的線程到來的時候。在這種情況下,如果被取消調度的線程原來持有一個自旋鎖,那么在該自旋鎖上阻塞的其他線程將會自旋比原來更長的時間。

自旋鎖接口與互斥量接口類似。我們可以通過pthread_spin_init函數初始化一個自旋鎖。使用pthread_spin_destroy函數對一個自旋鎖取消初始化。

#include <pthread.h>

int pthread_spin_init( pthread_spinlock_t *lock, int pshared );

int pthread_spin_destroy( pthread_spinlock_t *lock );

兩個函數的返回值:若成功則返回0,失敗則返回錯誤代碼

自旋鎖只有一個屬性,並且只有平台支持線程進程共享同步選項(Thread Process-Shared Synchronization option)該屬性才有效。參數pshared代表進程共享(process shared)屬性,它指示了自旋鎖如何取得。如果把pshared設置為PTHREAD_PROCESS_SHARED,那么自旋鎖可以被訪問該鎖的底層內存的線程獲得,即使這些線程來自於不同的進程。如果把pshared設置為PTHREAD_PROCESS_PRIVATE,那么自旋鎖只能本進程內初始化的線程獲取。

要給自旋鎖加鎖,我們可以調用pthread_spin_lock(在獲得鎖之前,線程會一直自旋),或者pthread_spin_trylock(如果自旋鎖不能立即獲得,而是返回錯誤EBUSY,注意,此時線程不自旋)。無論調用哪個函數進行加鎖,我們都可以調用pthread_spin_unlock進行解鎖。

#include <pthread.h>
int pthread_spin_lock( pthread_spinlock_t *lock );
int pthread_spin_trylock( pthread_spinlock_t *lock );
int pthread_spin_unlock( pthread_spinlock_t *lock );
返回值:若成功則返回0,失敗則返回出錯代碼

注意,如果自旋鎖當前還沒有加鎖,那么調用pthread_spin_lock可直接對該自旋鎖加鎖而不會自旋。如果線程已經對自旋鎖加了鎖,現在又調用pthread_spin_lock對其加鎖,那么結果是未定義的。如果我們對一個沒有加鎖的自旋鎖進行解鎖,其結果也是未定義的。

pthread_spin_lock或者pthread_spin_trylock函數返回0,則自旋鎖被加上了鎖。我們應該小心,不要調用會在持有自旋鎖期間休眠的函數,否則將可能會浪費大量的CPU資源。

8、Barriers(屏障、關卡)(第三版新增)

關卡是一種同步機制,它可以協調並行工作的多個線程。關卡使每一個線程等待直到所有合作的線程都到達了同一點,然后再從這一點開始繼續執行。其實我們之前已經見過一種形式的關卡——pthread_join函數,它使一個線程等待另一個線程的結束。

關卡比pthread_join函數更一般化。它允許任意數量的線程等待,直到所有的線程完成處理,但這些線程不一定退出。當所有的線程都到達這個關卡的時候它們可以繼續執行工作。

我們可以使用pthread_barrier_init函數初始化一個關卡,使用pthread_barrier_destroy函數對一個關卡進行去除初始化

#include <pthread.h>

int pthread_barrier_init( pthread_barrier_t *restrict barrier,
                     const pthread_barrierattr_t *restrict attr,
                     unsigned int count );

int pthread_barrier_destroy( pthread_barrier_t *barrier );

兩個函數的返回值:若成功則返回0,失敗則返回出錯代碼

當我們初始化一個關卡的時候,使用參數count指定在允許所有線程繼續運行之前必須達到該關卡的線程的個數。使用參數attr指定關卡的屬性,我們暫時先把attr設為NULL,初始化一個帶有默認屬性的關卡。如果調用pthread_barrier_init函數時給關卡分配了資源,這些資源會在調用pthread_barrier_destroy函數時釋放。

如果一個線程率先完成了它的工作(到達關卡),該線程可以調用pthread_barrier_wait函數來等待其他所有的線程來趕上它

#include <pthread.h>

int pthread_barrier_wait( pthread_barrier_t *barrier );

返回值:若成功則返回0或PTHREAD_BARRIER_SERIAL_THREAD,失敗則返回錯誤編號

如果沒有達到初始化函數中指定的count個數目的線程全部到達關卡,那么調用pthread_barrier_wait函數的線程會休眠。當count個線程中的最后一個線程調用pthread_barrier_wait函數時,所有的線程會被喚醒。

To one arbitrary thread, it will appear as if the pthread_barrier_wait function
returned a value of PTHREAD_BARRIER_SERIAL_THREAD. The remaining threads see
a return value of 0. This allows one thread to continue as the master to act on the results
of the work done by all of the other threads.(意思好像是說:有一個線程調用pthread_barrier_wait函數的返回值是PTHREAD_BARRIER_SERIAL_THREAD,而剩余的其他所有線程調用pthread_barrier_wait都返回0,返回PTHREAD_BARRIER_SERIAL_THREAD的那個線程就成為主線程,它負責處理所有線程完成的工作的結果。)

一旦count個線程都達到了關卡,並且所有線程都被喚醒。這個關卡就可以被再次使用,但是count不能被改變,除非我們調用pthread_barrier_destroy,然后再調用pthread_barrier_init重新初始化一個count。

實例:下面的程序展示了使用關卡來同步在單一作業上合作的多個線程

#include "apue.h"
#include <pthread.h>
#include <limits.h>
#include <sys/time.h>

#define NTHR      8                /* number of threads */
#define NUMNUM    8000000L         /* number of numbers to sort */
#define TNUM      (NUMNUM/NTHR)    /* number to sort per thread */

long nums[NUMNUM];
long snums[NUMNUM];

pthread_barrier_t b;

#ifdef SOLARIS
#define heapsort qsort
#else
extern int heapsort(void *, size_t, size_t, int (*)(const void *, const void *));
#endif
/*
* Compare two long integers (helper function for heapsort)
*/
int 
complong(const void *arg1, const void *arg2)
{
    long l1 = *(long *)arg1;
    long l2 = *(long *)arg2;

    if(l1 == l2)
        return 0;
    else if(l1 < l2)
        return -1;
    else
        return 1;
}
/*
* Worker thread to sort a portion of the set of numbers.
*/
void *
thr_fn(void *arg)
{
    long    idx = (long)arg;

    heapsort(&num[idx], TNUM, sizeof(long), complong);
    pthread_barrier_wait(&b);
    /*
    * Go off and perform more work ...
    */
    return((void *)0);
}
/*
* Merge the results of the individual sorted ranges.
*/
void
merge()
{
    long    idx[NTHR];
    long    i, minidx, sidx, num;
    
    for(i = 0; i < NTHR; i++ )
        idx[i] = i * TNUM;
    for(sidx = 0; sidx < NUMNUM; sidx++ )
    {
        num = LONG_MAX;
        for( i = 0; i < NTHR; i++ )
        {    
            if((idx[i] < (i+1)*TNUM) && (nums[idx[i]] < num ))
            {
                num = nums[idx[i]];
                minidx = i;
            }
        }        
        snums[sidx] = nums[idx[minidx]];
        idx[minidx]++;
    }
}

int main()
{
    unsigned long     i;
    struct timeval    start, end;
    long long         startusec, endusec;
    double            elapsed;
    int               err;
    pthread_T         tid;

    /*
    * Create the initial set of numbers to sort 
    */
    srandom(1);
    for(i = 0; i < NUMNUM; i++ )
        nums[i] = random();
    
    /*
    * Create 8 threads to sort the numbers.
    */
    gettimeofday(&start, NULL);
    pthread_barrier_init(&b, NULL, NTHR+1);
    for(i = 0; i < NTHR; i++)
    {
        err = pthread_create(&tid, NULL, thr_fn, (void *)(i * TNUM));
        if(err != 0)
            err_exit(err, "can't create thread");
    }
    pthread_barrier_wait(&b);
    merge();
    gettimeofday(&end, NULL);
    /*
    * Print the sorted list. 
    */
    startusec = start.tv_sec * 1000000 + start.tv_usec;
    endusec = end.tv_sec * 1000000 + end.tv_usec;
    elapsed = (double)(endusec - startusec) / 1000000.0;
    printf("sort took %.4f seconds\n", elapsed);
    for( i = 0; i < NUMNUM; i++ )
        printf("%ld\n", snums[i]);
    exit(0);
}

This example shows the use of a barrier in a simplified situation where the threads
perform only one task. In more realistic situations, the worker threads will continue
with other activities after the call to pthread_barrier_wait returns.
In the example, we use eight threads to divide the job of sorting 8 million numbers.
Each thread sorts 1 million numbers using the heapsort algorithm (see Knuth [1998] for
details). Then the main thread calls a function to merge the results.
We don’t need to use the PTHREAD_BARRIER_SERIAL_THREAD return value from
pthread_barrier_wait to decide which thread merges the results, because we use
the main thread for this task. That is why we specify the barrier count as one more than
the number of worker threads; the main thread counts as one waiter.
If we write a program to sort 8 million numbers with heapsort using 1 thread only,
we will see a performance improvement when comparing it to the program in
Figure 11.16. On a system with 8 cores, the single-threaded program sorted 8 million
numbers in 12.14 seconds. On the same system, using 8 threads in parallel and 1 thread
to merge the results, the same set of 8 million numbers was sorted in 1.91 seconds, 6
times faster.

 

本篇博文內容摘自《UNIX環境高級編程》(第二版),僅作個人學習記錄所用。關於本書可參考:http://www.apuebook.com/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM