linux線程基礎篇----線程同步與互斥


linux線程基礎----線程同步與互斥

一、同步的概念

  1.同步概念

    所謂同步,即同時起步,協調一致。不同的對象,對“同步”的理解方式略有不同。如,設備同步,是指在兩個設備

    之間規定一個共同的時間參考;數據庫同步,是指讓兩個或多個數據庫內容保持一致,或者按需要部分保持一致;

    文件同步,是指讓兩個或多個文件夾里的文件保持一致等等。而編程中、通信中所說的同步與生活中大家印象中的

    同步概念略有差異。“同”字應是指協同、協助、互相配合。主旨在協同步調,按預定的先后次序運行。

  2.數據混亂的原因

   1. 資源共享(獨享資源則不會)       

    2. 調度隨機(意味着數據訪問會出現競爭)  

    3. 線程間缺乏必要的同步機制。

         以上3點中,前兩點不能改變,欲提高效率,傳遞數據,資源必須共享。只要共享資源,就一定會出現競爭。只要存在競爭關系,

    數據就很容易出現混亂。所以只能從第三點着手解決。使多個線程在訪問共享資源的時候,出現互斥。

   3.線程同步

  同步即協同步調,按預定的先后次序運行。

        線程同步,指一個線程發出某一功能調用時,在沒有得到結果之前,該調用不返回。同時其它線程為保證數據一致性,不能調用

   該功能。同步”的目的,是為了避免數據混亂,解決與時間有關的錯誤。實際上,不僅線程間需要同步,進程間、信號間等等都

   需要同步機制。因此,所有“多個控制流,共同操作一個共享資源”的情況,都需要同步。

 

二、線程同步

    線程同步主要有互斥鎖,條件變量,讀寫鎖和信號量(還有自旋鎖但在用戶層不常用,具體參考APUE11.6.7自旋鎖)

   1.互斥鎖

  Linux中提供一把互斥鎖mutex(也稱之為互斥量)。

   每個線程在對資源操作前都嘗試先加鎖,成功加鎖才能操作,操作結束解鎖。

        資源還是共享的,線程間也還是競爭的,                                                                

       但通過“鎖”就將資源的訪問變成互斥操作,而后與時間有關的錯誤也不會再產生了

  

  但,應注意:同一時刻,只能有一個線程持有該鎖。

       當A線程對某個全局變量加鎖訪問,B在訪問前嘗試加鎖,拿不到鎖,B阻塞。

  C線程不去加鎖,而直接訪問該全局變量,依然能夠訪問,但會出現數據混亂。

       所以,互斥鎖實質上是操作系統提供的一把“建議鎖”(又稱“協同鎖”)

  建議程序中有多線程訪問共享資源的時候使用該機制。但並沒有強制限定。

  因此,即使有了mutex,如果有線程不按規則來訪問數據,依然會造成數據混亂。

  主要應用函數:

  pthread_mutex_init函數

      pthread_mutex_destroy函數

      pthread_mutex_lock函數

      pthread_mutex_trylock函數

       pthread_mutex_unlock函數

   以上5個函數的返回值都是:成功返回0, 失敗返回錯誤號。   

  pthread_mutex_t 類型,其本質是一個結構體。為簡化理解,應用時可忽略其實現細節,簡單當成整數看待。

  pthread_mutex_t mutex; 變量mutex只有兩種取值1、0。

  pthread_mutex_init函數

  初始化一個互斥鎖(互斥量) ---> 初值可看作1

       int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

       參1:傳出參數,調用時應傳 &mutex      

       restrict關鍵字:只用於限制指針,告訴編譯器,所有修改該指針指向內存中內容的操作,只能通過本指針完成。

  不能通過除本指針以外的其他變量或指針修改

       參2:互斥量屬性。是一個傳入參數,通常傳NULL,選用默認屬性(線程間共享)。 參APUE.12.4同步屬性

  1. 靜態初始化:如果互斥鎖 mutex 是靜態分配的(定義在全局,或加了static關鍵字修飾),可以直接使用宏進行初始化。
  2. e.g.  pthead_mutex_t muetx = PTHREAD_MUTEX_INITIALIZER;
  3. 動態初始化:局部變量應采用動態初始化。e.g.  pthread_mutex_init(&mutex, NULL)

    pthread_mutex_destroy函數

  銷毀一個互斥鎖

       int pthread_mutex_destroy(pthread_mutex_t *mutex);

    pthread_mutex_lock函數

  加鎖。可理解為將mutex--(或-1)

       int pthread_mutex_lock(pthread_mutex_t *mutex);

  pthread_mutex_unlock函數

  解鎖。可理解為將mutex ++(或+1)

       int pthread_mutex_unlock(pthread_mutex_t *mutex);

  pthread_mutex_trylock函數

  嘗試加鎖

      int pthread_mutex_trylock(pthread_mutex_t *mutex);

  

    加鎖與解鎖

  lock與unlock:

        lock嘗試加鎖,如果加鎖不成功,線程阻塞,阻塞到持有該互斥量的其他線程解鎖為止。

        unlock主動解鎖函數,同時將阻塞在該鎖上的所有線程全部喚醒,至於哪個線程先被喚醒,取決於優先級、調度。默認:先阻塞、先喚醒。

        例如:T1 T2 T3 T4 使用一把mutex鎖。T1加鎖成功,其他線程均阻塞,直至T1解鎖。T1解鎖后,T2 T3 T4均被喚醒,並自動再次嘗試加鎖。

        可假想mutex鎖 init成功初值為1。 lock 功能是將mutex--, unlock將mutex++

     lock與trylock:

        lock加鎖失敗會阻塞,等待鎖釋放。

        trylock加鎖失敗直接返回錯誤號(如:EBUSY),不阻塞。

  示例代碼:生產者與消費者,頭文件參考UNPV22E

/* include main */
#include    "unpipc.h"

#define    MAXNITEMS         1000000
#define    MAXNTHREADS            100

int        nitems;            /* read-only by producer and consumer */
struct {
  pthread_mutex_t    mutex;
  int    buff[MAXNITEMS];
  int    nput;
  int    nval;
} shared = { PTHREAD_MUTEX_INITIALIZER };

void    *produce(void *), *consume(void *);

int
main(int argc, char **argv)
{
    int            i, nthreads, count[MAXNTHREADS];
    pthread_t    tid_produce[MAXNTHREADS], tid_consume;

    if (argc != 3)
        err_quit("usage: prodcons2 <#items> <#threads>");
    nitems = min(atoi(argv[1]), MAXNITEMS);
    nthreads = min(atoi(argv[2]), MAXNTHREADS);

    Set_concurrency(nthreads);
        /* 4start all the producer threads */
    for (i = 0; i < nthreads; i++) {
        count[i] = 0;
        Pthread_create(&tid_produce[i], NULL, produce, &count[i]);
    }

        /* 4wait for all the producer threads */
    for (i = 0; i < nthreads; i++) {
        Pthread_join(tid_produce[i], NULL);
        printf("count[%d] = %d\n", i, count[i]);    
    }

        /* 4start, then wait for the consumer thread */
    Pthread_create(&tid_consume, NULL, consume, NULL);
    Pthread_join(tid_consume, NULL);

    exit(0);
}
/* end main */

/* include producer */
void *
produce(void *arg)
{
    for ( ; ; ) {
        Pthread_mutex_lock(&shared.mutex);
        if (shared.nput >= nitems) {
            Pthread_mutex_unlock(&shared.mutex);
            return(NULL);        /* array is full, we're done */
        }
        shared.buff[shared.nput] = shared.nval;
        shared.nput++;
        shared.nval++;
        Pthread_mutex_unlock(&shared.mutex);
        *((int *) arg) += 1;
    }
}

void *
consume(void *arg)
{
    int        i;

    for (i = 0; i < nitems; i++) {
        if (shared.buff[i] != i)
            printf("buff[%d] = %d\n", i, shared.buff[i]);
    }
    return(NULL);
}
/* end producer */
mutex_prodcons2.c

 

  

  2.條件變量

   條件本身不是鎖!但它也可以造成線程阻塞。通常與互斥鎖配合使用。給多線程提供一個會合的場所。

   互斥鎖用於上鎖,條件變量用於等待。

  主要應用函數:

         pthread_cond_init函數

         pthread_cond_destroy函數

         pthread_cond_wait函數

         pthread_cond_timedwait函數

         pthread_cond_signal函數

         pthread_cond_broadcast函數

    以上6 個函數的返回值都是:成功返回0, 失敗直接返回錯誤號。

         pthread_cond_t類型      用於定義條件變量

         pthread_cond_t cond;

   pthread_cond_init函數

   初始化一個條件變量,定義在全局,因為要在子線程中使用。

   int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);                

   參2:attr表條件變量屬性,通常為默認值,傳NULL即可

   也可以使用靜態初始化的方法,初始化條件變量,定義在全局:

   pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

   pthread_cond_destroy函數

  銷毀一個條件變量

  int pthread_cond_destroy(pthread_cond_t *cond);

  pthread_cond_wait函數

  阻塞等待一個條件變量

       int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

  函數作用:

      1.阻塞等待條件變量cond(參1)滿足 

      2.釋放已掌握的互斥鎖(解鎖互斥量)相當於pthread_mutex_unlock(&mutex);

   1.2.兩步為一個原子操作,不可分割。

      3.當被喚醒,pthread_cond_wait函數返回時,解除阻塞並重新申請獲取互斥鎖pthread_mutex_lock(&mutex);

  pthread_cond_timedwait函數

  限時等待一個條件變量,使用相對時間,所以要先使用time()函數獲取當前時間。

  int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);

         參3:

                   struct timespec {

                            time_t tv_sec;          /* seconds */ 秒

                            long   tv_nsec;      /* nanosecondes*/ 納秒

                   }                                                                        

  形參abstime:絕對時間。                                                                                     

  如:time(NULL)返回的就是絕對時間。而alarm(1)是相對時間,相對當前時間定時1秒鍾。   

                            struct timespec t = {1, 0};

                            sem_timedwait(&sem, &t); 這樣只能定時到 1970年1月1日  00:00:01秒(早已經過去)

 

      正確用法:

                            time_t cur = time(NULL); 獲取當前時間。

        struct timespec t;    定義timespec 結構體變量t

                            t.tv_sec = cur+1; 定時1秒

        pthread_cond_timedwait (&cond, &t); 傳參                                              參APUE.11.6線程同步

     在講解setitimer函數時我們還提到另外一種時間類型:

              struct timeval {

                  time_t      tv_sec;  /* seconds */ 秒

                  suseconds_t tv_usec; /* microseconds */ 微秒

              };

  

  pthread_cond_signal函數

  喚醒至少一個阻塞在條件變量上的線程

  int pthread_cond_signal(pthread_cond_t *cond);

  pthread_cond_broadcast函數

  喚醒全部阻塞在條件變量上的線程

      int pthread_cond_broadcast(pthread_cond_t *cond);

  示例代碼:生產者消費者模型

/*借助條件變量模擬 生產者-消費者 問題*/ #include <stdlib.h> #include <unistd.h> #include <pthread.h> #include <stdio.h>

/*鏈表作為公享數據,需被互斥量保護*/
struct msg { struct msg *next; int num; }; struct msg *head; /* 靜態初始化 一個條件變量 和 一個互斥量*/ pthread_cond_t has_product = PTHREAD_COND_INITIALIZER; pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; void *consumer(void *p) { struct msg *mp; for (;;) { pthread_mutex_lock(&lock); while (head == NULL) {           //頭指針為空,說明沒有節點 可以為if嗎
            pthread_cond_wait(&has_product, &lock); } mp = head; head = mp->next;                //模擬消費掉一個產品
        pthread_mutex_unlock(&lock); printf("-Consume %lu---%d\n", pthread_self(), mp->num); free(mp); sleep(rand() % 4); } } void *producer(void *p) { struct msg *mp; for (;;) { mp = malloc(sizeof(struct msg)); mp->num = rand() % 1000 + 1;        //模擬生產一個產品
        printf("-Produce -------------%d\n", mp->num); pthread_mutex_lock(&lock); mp->next = head; head = mp; pthread_mutex_unlock(&lock); pthread_cond_signal(&has_product);  //將等待在該條件變量上的一個線程喚醒
 sleep(rand() % 4); } } int main(int argc, char *argv[]) { pthread_t pid, cid; srand(time(NULL)); pthread_create(&pid, NULL, producer, NULL); pthread_create(&cid, NULL, consumer, NULL); pthread_create(&cid, NULL, consumer, NULL); pthread_create(&cid, NULL, consumer, NULL); pthread_create(&cid, NULL, consumer, NULL); pthread_join(pid, NULL); pthread_join(cid, NULL); return 0; }

  條件變量的優點:

 

       相較於mutex而言,條件變量可以減少競爭。如直接使用mutex,除了生產者、消費者之間要競爭互斥量以外,

       消費者之間也需要競爭互斥量,但如果匯聚(鏈表)中沒有數據,消費者之間競爭互斥鎖是無意義的。

  有了條件變量機制以后,只有生產者完成生產,才會引起消費者之間的競爭。提高了程序效率。

 

  3.讀寫鎖

  與互斥量類似,但讀寫鎖允許更高的並行性。其特性為:寫獨占,讀共享。

  讀寫鎖狀態:

  一把讀寫鎖具備三種狀態:

         1. 讀模式下加鎖狀態 (讀鎖)

         2. 寫模式下加鎖狀態 (寫鎖)

         3. 不加鎖狀態

  讀寫鎖特性: 

  1.讀寫鎖是“寫模式加鎖”時, 解鎖前,所有對該鎖加鎖的線程都會被阻塞。

  2.讀寫鎖是“讀模式加鎖”時, 如果線程以讀模式對其加鎖會成功;如果線程以寫模式加鎖會阻塞。

  3.讀寫鎖是“讀模式加鎖”時, 既有試圖以寫模式加鎖的線程,也有試圖以讀模式加鎖的線程。

  那么讀寫鎖會阻塞隨后的讀模式鎖請求。優先滿足寫模式鎖。讀鎖、寫鎖並行阻塞,寫鎖優先級高

       讀寫鎖也叫共享-獨占鎖。當讀寫鎖以讀模式鎖住時,它是以共享模式鎖住的;當它以寫模式鎖住時,它是以獨占模式鎖住的。寫獨占、讀共享。

       讀寫鎖非常適合於對數據結構讀的次數遠大於寫的情況。

  主要應用函數:

       pthread_rwlock_init函數

       pthread_rwlock_destroy函數

       pthread_rwlock_rdlock函數 

       pthread_rwlock_wrlock函數

       pthread_rwlock_tryrdlock函數

       pthread_rwlock_trywrlock函數

       pthread_rwlock_unlock函數

  以上7 個函數的返回值都是:成功返回0, 失敗直接返回錯誤號。  

      pthread_rwlock_t類型   用於定義一個讀寫鎖變量。

      pthread_rwlock_t rwlock;

  pthread_rwlock_init函數

  初始化一把讀寫鎖

       int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

       參2:attr表讀寫鎖屬性,通常使用默認屬性,傳NULL即可。

  pthread_rwlock_destroy函數

  銷毀一把讀寫鎖

       int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

  pthread_rwlock_rdlock函數

  以讀方式請求讀寫鎖。(常簡稱為:請求讀鎖)

       int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

  pthread_rwlock_wrlock函數

  以寫方式請求讀寫鎖。(常簡稱為:請求寫鎖)

     int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

  pthread_rwlock_unlock函數

  解鎖

       int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

  pthread_rwlock_tryrdlock函數

  非阻塞以讀方式請求讀寫鎖(非阻塞請求讀鎖)

  int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

     pthread_rwlock_trywrlock函數

  非阻塞以寫方式請求讀寫鎖(非阻塞請求寫鎖)

       int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);

  示例代碼:同時有多個線程對同一全局數據讀、寫操作。 

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>

int counter;
pthread_rwlock_t rwlock;

/* 3個線程不定時寫同一全局資源,5個線程不定時讀同一全局資源 */
void *th_write(void *arg)
{
    int t;
    int i = (int)arg;
    while (1) {
        pthread_rwlock_wrlock(&rwlock);
        t = counter;   
        usleep(1000);
        printf("=======write %d: %lu: counter=%d ++counter=%d\n", i, pthread_self(), t, ++counter);
        pthread_rwlock_unlock(&rwlock);
        usleep(10000);
    }
    return NULL;
}
void *th_read(void *arg)
{
    int i = (int)arg;

    while (1) {
        pthread_rwlock_rdlock(&rwlock);
        printf("----------------------------read %d: %lu: %d\n", i, pthread_self(), counter);
        pthread_rwlock_unlock(&rwlock);
        usleep(2000);
    }
    return NULL;
}

int main(void)
{
    int i;
    pthread_t tid[8];

    pthread_rwlock_init(&rwlock, NULL);

    for (i = 0; i < 3; i++)
        pthread_create(&tid[i], NULL, th_write, (void *)i);

    for (i = 0; i < 5; i++)
        pthread_create(&tid[i+3], NULL, th_read, (void *)i);

    for (i = 0; i < 8; i++)
        pthread_join(tid[i], NULL);

    pthread_rwlock_destroy(&rwlock);

    return 0;
}

 

  

  4.信號量

  信號量有posix有名信號量和無名信號量,還有system V信號量,在這里主要介紹posix無名信號量用於線程同步。

  進化版的互斥鎖(1 --> N)

        由於互斥鎖的粒度比較大,如果我們希望在多個線程間對某一對象的部分數據進行共享,使用互斥鎖是沒有辦法實現的,只能將整個數據對象鎖住。

   這樣雖然達到了多線程操作共享數據時保證數據正確性的目的,卻無形中導致線程的並發性下降。線程從並行執行,變成了串行執行。與直接使用單進程無異。

      信號量,是相對折中的一種處理方式,既能保證同步,數據不混亂,又能提高線程並發

  主要應用函數:

         sem_init函數

         sem_destroy函數

         sem_wait函數

         sem_trywait函數  

         sem_timedwait函數      

         sem_post函數

    以上6 個函數的返回值都是:成功返回0, 失敗返回-1,同時設置errno。(注意,它們沒有pthread前綴)

   可以使用perror函數打印出錯信息。

        sem_t類型,本質仍是結構體。但應用期間可簡單看作為整數,忽略實現細節(類似於使用文件描述符)。

        sem_t sem; 規定信號量sem不能 < 0。頭文件 <semaphore.h>

  信號量基本操作:

  sem_wait:        1. 信號量大於0,則信號量--                (類比pthread_mutex_lock)

           |                   2. 信號量等於0,造成線程阻塞

         對應

           |

       sem_post:     將信號量++,同時喚醒阻塞在信號量上的線程         (類比pthread_mutex_unlock)

  但,由於sem_t的實現對用戶隱藏,所以所謂的++、--操作只能通過函數來實現,而不能直接++、--符號。

  信號量的初值,決定了占用信號量的線程的個數。

  sem_init函數

  初始化一個信號量

       int sem_init(sem_t *sem, int pshared, unsigned int value);

       參1:sem信號量 

    參2:pshared取0用於線程間;取非0用於進程間        

  參3:value指定信號量初值

  sem_destroy函數

  銷毀一個信號量

        int sem_destroy(sem_t *sem);

   sem_wait函數

  給信號量加鎖 --

       int sem_wait(sem_t *sem);

  sem_post函數

  給信號量解鎖 ++

       int sem_post(sem_t *sem); 

  sem_trywait函數

  嘗試對信號量加鎖 --    (與sem_wait的區別類比lock和trylock)

       int sem_trywait(sem_t *sem);     

  sem_timedwait函數

  限時嘗試對信號量加鎖 --

       int sem_timedwait(sem_t *sem, const struct timespec *abs_timeout);

       參2:abs_timeout采用的是絕對時間。                      

      定時1秒:

                   time_t cur = time(NULL); 獲取當前時間。

       struct timespec t;    定義timespec 結構體變量t

                   t.tv_sec = cur+1; 定時1秒

      sem_timedwait(&sem, &t); 傳參

  示例代碼:生成者消費者模型,一個生產者多個消費者  

/*信號量實現 生產者 消費者問題*/
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <stdio.h>
#include <semaphore.h>

#define NUM 5       

int idex = 0;    
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;    //解決多個消費者之間的競爭   
int queue[NUM];                                     //全局數組實現環形隊列
sem_t blank_number, product_number;                 //空格子信號量, 產品信號量
void *producer(void *arg)
{
    int i = 0;
    while (1) {
        sem_wait(&blank_number);                    //生產者將空格子數--,為0則阻塞等待
        queue[i] = rand() % 1000 + 1;               //生產一個產品
        printf("----Produce---%d\n", queue[i]);        
        sem_post(&product_number);                  //將產品數++

        i = (i+1) % NUM;                            //借助下標實現環形
        sleep(rand()%1);
    }
}

void *consumer(void *arg)
{
    while (1) {
        sem_wait(&product_number);                  //消費者將產品數--,為0則阻塞等待
        printf("-Consume---%d      %lu\n", queue[idex], pthread_self());
        queue[idex] = 0;                               //消費一個產品 
        sem_post(&blank_number);                    //消費掉以后,將空格子數++

        pthread_mutex_lock(&lock);
        idex = (idex+1) % NUM;
        pthread_mutex_unlock(&lock);
        sleep(rand()%1);
    }
}

int main(int argc, char *argv[])
{
    pthread_t pid, cid;

    sem_init(&blank_number, 0, NUM);                //初始化空格子信號量為5
    sem_init(&product_number, 0, 0);                //產品數為0

    pthread_create(&pid, NULL, producer, NULL);
    
    pthread_create(&cid, NULL, consumer, NULL);
    pthread_create(&cid, NULL, consumer, NULL);
    pthread_create(&cid, NULL, consumer, NULL);
    pthread_create(&cid, NULL, consumer, NULL);
    pthread_create(&cid, NULL, consumer, NULL);
    
    pthread_join(pid, NULL);
    pthread_join(cid, NULL);

    sem_destroy(&blank_number);
    sem_destroy(&product_number);

    return 0;
}

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM