在了解了《同步與互斥的區別 》之后,我們來看看幾個經典的線程同步的例子。相信通過具體場景可以讓我們學會分析和解決這類線程同步的問題,以便以后應用在實際的項目中。
一、生產者-消費者問題
問題描述:
一組生產者進程和一組消費者進程共享一個初始為空、大小為 n 的緩沖區,只有緩沖區沒滿時,生產者才能把消息放入到緩沖區,否則必須等待;只有緩沖區不空時,消費者才能從中取出消息,否則必須等待。由於緩沖區是臨界資源,它只允許一個生產者放入消息,或者一個消費者從中取出消息。
分析:
關系分析:生產者和消費者對緩沖區互斥訪問是互斥關系,同時生產者和消費者又是一個相互協作的關系,只有生產者生產之后,消費者才能消費,它們也是同步關系。
整理思路:這里比較簡單,只有生產者和消費者兩個進程,且這兩個進程存在着互斥關系和同步關系。那么需要解決的是互斥和同步的PV操作的位置。
信號量設置:信號量mutex作為互斥信號量,用於控制互斥訪問緩沖池,初值為1;信號量full用於記錄當前緩沖池中“滿”緩沖區數,初值為 0;信號量empty用於記錄當前緩沖池中“空”緩沖區數,初值為n。
代碼示例:(semaphore類的封裝見下文)
#include<iostream> #include<unistd.h> // sleep #include<pthread.h> #include"semaphore.h" using namespace std; #define N 5 semaphore mutex("/", 1); // 臨界區互斥信號量 semaphore empty("/home", N); // 記錄空緩沖區數,初值為N semaphore full("/home/songlee",0); // 記錄滿緩沖區數,初值為0 int buffer[N]; // 緩沖區,大小為N int i=0; int j=0; void* producer(void* arg) { empty.P(); // empty減1 mutex.P(); buffer[i] = 10 + rand() % 90; printf("Producer %d write Buffer[%d]: %d\n",arg,i+1,buffer[i]); i = (i+1) % N; mutex.V(); full.V(); // full加1 } void* consumer(void* arg) { full.P(); // full減1 mutex.P(); printf(" \033[1;31m"); printf("Consumer %d read Buffer[%d]: %d\n",arg,j+1,buffer[j]); printf("\033[0m"); j = (j+1) % N; mutex.V(); empty.V(); // empty加1 } int main() { pthread_t id[10]; // 開10個生產者線程,10個消費者線程 for(int k=0; k<10; ++k) pthread_create(&id[k], NULL, producer, (void*)(k+1)); for(int k=0; k<10; ++k) pthread_create(&id[k], NULL, consumer, (void*)(k+1)); sleep(1); return 0; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
編譯運行輸出結果:
Producer 1 write Buffer[1]: 83 Producer 2 write Buffer[2]: 26 Producer 3 write Buffer[3]: 37 Producer 5 write Buffer[4]: 35 Producer 4 write Buffer[5]: 33 Consumer 1 read Buffer[1]: 83 Producer 6 write Buffer[1]: 35 Consumer 2 read Buffer[2]: 26 Consumer 3 read Buffer[3]: 37 Consumer 4 read Buffer[4]: 35 Consumer 5 read Buffer[5]: 33 Consumer 6 read Buffer[1]: 35 Producer 7 write Buffer[2]: 56 Producer 8 write Buffer[3]: 22 Producer 10 write Buffer[4]: 79 Consumer 9 read Buffer[2]: 56 Consumer 10 read Buffer[3]: 22 Producer 9 write Buffer[5]: 11 Consumer 7 read Buffer[4]: 79 Consumer 8 read Buffer[5]: 11
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
二、讀者-寫者問題
問題描述:
有讀者和寫者兩組並發線程,共享一個文件,當兩個或以上的讀線程同時訪問共享數據時不會產生副作用,但若某個寫線程和其他線程(讀線程或寫線程)同時訪問共享數據時則可能導致數據不一致的錯誤。因此要求:
- 允許多個讀者可以同時對文件執行讀操作;
- 只允許一個寫者往文件中寫信息;
- 任一寫者在完成寫操作之前不允許其他讀者或寫者工作;
- 寫者執行寫操作前,應讓已有的讀者和寫者全部退出。
分析:
關系分析:由題目分析可知,讀者和寫者是互斥的,寫者和寫者也是互斥的,而讀者和讀者不存在互斥問題。
整理思路:寫者是比較簡單的,它與任何線程互斥,用互斥信號量的 PV 操作即可解決。讀者的問題比較復雜,它必須實現與寫者的互斥,多個讀者還可以同時讀。所以,在這里用到了一個計數器,用它來判斷當前是否有讀者讀文件。當有讀者的時候寫者是無法寫文件的,此時讀者會一直占用文件,當沒有讀者的時候寫者才可以寫文件。同時,不同的讀者對計數器的訪問也應該是互斥的。
信號量設置:首先設置一個計數器count,用來記錄當前的讀者數量,初值為0;設置互斥信號量mutex,用於保護更新 count 變量時的互斥;設置互斥信號量rw用於保證讀者和寫者的互斥訪問。
代碼示例:
#include<iostream> #include<unistd.h> // sleep #include<pthread.h> #include"semaphore.h" using namespace std; int count = 0; // 記錄當前的讀者數量 semaphore mutex("/",1); // 用於保護更新count變量時的互斥 semaphore rw("/home",1); // 用於保證讀者和寫者的互斥 void* writer(void* arg) { rw.P(); // 互斥訪問共享文件 printf(" Writer %d start writing...\n", arg); sleep(1); printf(" Writer %d finish writing...\n", arg); rw.V(); // 釋放共享文件 } void* reader(void* arg) { mutex.P(); // 互斥訪問count變量 if(count == 0) // 當第一個讀線程讀文件時 rw.P(); // 阻止寫線程寫 ++count; // 讀者計數器加1 mutex.V(); // 釋放count變量 printf("Reader %d start reading...\n", arg); sleep(1); printf("Reader %d finish reading...\n", arg); mutex.P(); // 互斥訪問count變量 --count; // 讀者計數器減1 if(count == 0) // 當最后一個讀線程讀完文件 rw.V(); // 允許寫線程寫 mutex.V(); // 釋放count變量 } int main() { pthread_t id[8]; // 開6個讀線程,2個寫線程 pthread_create(&id[0], NULL, reader, (void*)1); pthread_create(&id[1], NULL, reader, (void*)2); pthread_create(&id[2], NULL, writer, (void*)1); pthread_create(&id[3], NULL, writer, (void*)2); pthread_create(&id[4], NULL, reader, (void*)3); pthread_create(&id[5], NULL ,reader, (void*)4); sleep(2); pthread_create(&id[6], NULL, reader, (void*)5); pthread_create(&id[7], NULL ,reader, (void*)6); sleep(4); return 0; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
編譯運行的結果如下:
Reader 2 start reading... Reader 1 start reading... Reader 3 start reading... Reader 4 start reading... Reader 1 finish reading... Reader 2 finish reading... Reader 3 finish reading... Reader 4 finish reading... Writer 1 start writing... Writer 1 finish writing... Writer 2 start writing... Writer 2 finish writing... Reader 5 start reading... Reader 6 start reading... Reader 5 finish reading... Reader 6 finish reading...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
三、哲學家進餐問題
問題描述:
一張圓桌上坐着 5 名哲學家,桌子上每兩個哲學家之間擺了一根筷子,桌子的中間是一碗米飯,如圖所示:
哲學家們傾注畢生精力用於思考和進餐,哲學家在思考時,並不影響他人。只有當哲學家飢餓的時候,才試圖拿起左、右兩根筷子(一根一根拿起)。如果筷子已在他人手上,則需等待。飢餓的哲學家只有同時拿到了兩根筷子才可以開始進餐,當進餐完畢后,放下筷子繼續思考。
分析:
關系分析:5名哲學家與左右鄰居對其中間筷子的訪問是互斥關系。
整理思路:顯然這里有 5 個線程,那么要如何讓一個哲學家拿到左右兩個筷子而不造成死鎖或飢餓現象?解決方法有兩個,一個是讓他們同時拿兩個筷子;二是對每個哲學家的動作制定規則,避免飢餓或死鎖現象的發生。
信號量設置:定義互斥信號量數組chopstick[5] = {1,1,1,1,1}用於對 5 根筷子的互斥訪問。
示例代碼:
semaphore chopstick[5] = {1,1,1,1,1} // 信號量數組 Pi() // i號哲學家的線程 { do { P(chopstick[i]); // 取左邊筷子 P(chopstick[(i+1)%5]); // 取右邊筷子 eat; // 進餐 V(chopstick[i]); // 放回左邊筷子 V(chopstick[(i+1)%5]); // 放回右邊筷子 think; // 思考 }while(1); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
上面的偽代碼存在一個問題:當五個哲學家都想要進餐,分別拿起他們左邊筷子的時候(都恰好執行完P(chopstick[i])),筷子已經被拿光了,等到他們再想拿右邊的筷子的時候,就全被阻塞了,這就出現了死鎖。
為了防止死鎖的發生,可以對哲學家線程施加一些限制條件,比如:
- 至多允許四個哲學家同時進餐;
- 僅當一個哲學家左右兩邊的筷子都可用時才允許他抓起筷子;
- 對哲學家順序編號,要求奇數號哲學家先抓左邊的筷子,然后再抓他右邊的筷子,而偶數號哲學家剛好相反。
這里,我們采用第二種方法來改進上面的算法,即當一個哲學家左右兩邊的筷子都可用時,才允許他抓起筷子。
#include<iostream> #include<vector> #include<unistd.h> // sleep #include<pthread.h> #include"semaphore.h" using namespace std; vector<semaphore*> chopstick; // 信號量數組 semaphore mutex("/", 1); // 設置取左右筷子的信號量 <-- 關鍵 void* P1(void* arg) // 第1個哲學家線程 { mutex.P(); // 在取筷子前獲得互斥量 chopstick[0]->P(); // 取左邊筷子 chopstick[1]->P(); // 取右邊筷子 mutex.V(); // 釋放取筷子的信號量 printf("Philosopher 1 eat.\n"); chopstick[0]->V(); // 放回左邊筷子 chopstick[1]->V(); // 放回右邊筷子 } void* P2(void* arg) // 第2個哲學家線程 { mutex.P(); // 在取筷子前獲得互斥量 chopstick[1]->P(); // 取左邊筷子 chopstick[2]->P(); // 取右邊筷子 mutex.V(); // 釋放取筷子的信號量 printf("Philosopher 2 eat.\n"); chopstick[1]->V(); // 放回左邊筷子 chopstick[2]->V(); // 放回右邊筷子 } void* P3(void* arg) // 第3個哲學家線程 { mutex.P(); // 在取筷子前獲得互斥量 chopstick[2]->P(); // 取左邊筷子 chopstick[3]->P(); // 取右邊筷子 mutex.V(); // 釋放取筷子的信號量 printf("Philosopher 3 eat.\n"); chopstick[2]->V(); // 放回左邊筷子 chopstick[3]->V(); // 放回右邊筷子 } void* P4(void* arg) // 第4個哲學家線程 { mutex.P(); // 在取筷子前獲得互斥量 chopstick[3]->P(); // 取左邊筷子 chopstick[4]->P(); // 取右邊筷子 mutex.V(); // 釋放取筷子的信號量 printf("Philosopher 4 eat.\n"); chopstick[3]->V(); // 放回左邊筷子 chopstick[4]->V(); // 放回右邊筷子 } void* P5(void* arg) // 第5個哲學家線程 { mutex.P(); // 在取筷子前獲得互斥量 chopstick[4]->P(); // 取左邊筷子 chopstick[0]->P(); // 取右邊筷子 mutex.V(); // 釋放取筷子的信號量 printf("Philosopher 5 eat.\n"); chopstick[4]->V(); // 放回左邊筷子 chopstick[0]->V(); // 放回右邊筷子 } int main() { semaphore *sem1 = new semaphore("/home", 1); semaphore *sem2 = new semaphore("/home/songlee", 1); semaphore *sem3 = new semaphore("/home/songlee/java", 1); semaphore *sem4 = new semaphore("/home/songlee/ADT", 1); semaphore *sem5 = new semaphore("/home/songlee/Test", 1); chopstick.push_back(sem1); chopstick.push_back(sem2); chopstick.push_back(sem3); chopstick.push_back(sem4); chopstick.push_back(sem5); pthread_t id; pthread_create(&id, NULL, P1, NULL); pthread_create(&id, NULL, P2, NULL); pthread_create(&id, NULL, P3, NULL); pthread_create(&id, NULL, P4, NULL); pthread_create(&id, NULL, P5, NULL); sleep(1); delete sem1; delete sem2; delete sem3; delete sem4; delete sem5; return 0; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
編譯運行的結果如下:
Philosopher 2 eat. Philosopher 1 eat. Philosopher 3 eat. Philosopher 4 eat. Philosopher 5 eat.
注意:創建信號量時的 路徑參數 請改成你的系統中存在的路徑!!!
附:semaphore類的封裝
上面的代碼中都使用了這個semaphore類,實現如下:
#pragma once #include<iostream> #include<cstdio> #include<cstdlib> #include<sys/sem.h> using namespace std; // 聯合體,用於semctl初始化 union semun { int val; /*for SETVAL*/ struct semid_ds *buf; unsigned short *array; }; class semaphore { private: int sem_id; int init_sem(int); public: semaphore(const char*, int); /*構造函數*/ ~semaphore(); /*析構函數*/ void P(); /*P操作*/ void V(); /*V操作*/ };
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
#include"semaphore.h" semaphore::semaphore(const char* path, int value) { key_t key; /*獲取key值*/ if((key = ftok(path, 'z')) < 0) { perror("ftok error"); exit(1); } /*創建信號量集,其中只有一個信號量*/ if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1) { perror("semget error"); exit(1); } init_sem(value); } semaphore::~semaphore() { union semun tmp; if(semctl(sem_id, 0, IPC_RMID, tmp) == -1) { perror("Delete Semaphore Error"); exit(1); } } void semaphore::P() { struct sembuf sbuf; sbuf.sem_num = 0; /*序號*/ sbuf.sem_op = -1; /*P操作*/ sbuf.sem_flg = SEM_UNDO; if(semop(sem_id, &sbuf, 1) == -1) { perror("P operation Error"); } } void semaphore::V() { struct sembuf sbuf; sbuf.sem_num = 0; /*序號*/ sbuf.sem_op = 1; /*V操作*/ sbuf.sem_flg = SEM_UNDO; if(semop(sem_id, &sbuf, 1) == -1) { perror("V operation Error"); } } // 初始化信號量 int semaphore::init_sem(int value) { union semun tmp; tmp.val = value; if(semctl(sem_id, 0, SETVAL, tmp) == -1) { perror("Init Semaphore Error"); return -1; } return 0; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
在這里,要創建不同的信號量,必須傳遞不同的路徑參數(這樣獲取的 key 值才會不一樣)。
注意,本文的關注點並不在於 Linux 下如何創建信號量以及如何封裝起來才更方便,而是通過幾個經典的同步實例,了解在多線程環境下如何解決這類線程同步問題。