多線程編程可以說每個程序員的基本功,同時也是開發中的難點之一,本文以Linux C為例,講述了線程的創建及常用的幾種線程同步的方式,最后對多線程編程進行了總結與思考並給出代碼示例。
一、創建線程
多線程編程的第一步,創建線程。創建線程其實是增加了一個控制流程,使得同一進程中存在多個控制流程並發或者並行執行。
線程創建函數,其他函數這里不再列出,可以參考pthread.h
。
#include<pthread.h>
int pthread_create(
pthread_t *restrict thread, /*線程id*/
const pthread_attr_t *restrict attr, /*線程屬性,默認可置為NULL,表示線程屬性取缺省值*/
void *(*start_routine)(void*), /*線程入口函數*/
void *restrict arg /*線程入口函數的參數*/
);
代碼示例:
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
char* thread_func1(void* arg) {
pid_t pid = getpid();
pthread_t tid = pthread_self();
printf("%s pid: %u, tid: %u (0x%x)\n", (char*)arg, (unsigned int)pid, (unsigned int)tid, (unsigned int)tid);
char* msg = "thread_func1";
return msg;
}
void* thread_func2(void* arg) {
pid_t pid = getpid();
pthread_t tid = pthread_self();
printf("%s pid: %u, tid: %u (0x%x)\n", (char*)arg, (unsigned int)pid, (unsigned int)tid, (unsigned int)tid);
char* msg = "thread_func2 ";
while(1) {
printf("%s running\n", msg);
sleep(1);
}
return NULL;
}
int main() {
pthread_t tid1, tid2;
if (pthread_create(&tid1, NULL, (void*)thread_func1, "new thread:") != 0) {
printf("pthread_create error.");
exit(EXIT_FAILURE);
}
if (pthread_create(&tid2, NULL, (void*)thread_func2, "new thread:") != 0) {
printf("pthread_create error.");
exit(EXIT_FAILURE);
}
pthread_detach(tid2);
char* rev = NULL;
pthread_join(tid1, (void *)&rev);
printf("%s return.\n", rev);
pthread_cancel(tid2);
printf("main thread end.\n");
return 0;
}
二、線程同步
有時候我們需要多個線程相互協作來執行,這時需要線程間同步。線程間同步的常用方法有:
- 互斥
- 信號量
- 條件變量
我們先看一個未進行線程同步的示例:
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#define LEN 100000
int num = 0;
void* thread_func(void* arg) {
for (int i = 0; i< LEN; ++i) {
num += 1;
}
return NULL;
}
int main() {
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, (void*)thread_func, NULL);
pthread_create(&tid2, NULL, (void*)thread_func, NULL);
char* rev = NULL;
pthread_join(tid1, (void *)&rev);
pthread_join(tid2, (void *)&rev);
printf("correct result=%d, wrong result=%d.\n", 2*LEN, num);
return 0;
}
運行結果:correct result=200000, wrong result=106860.
。
【1】互斥
這個是最容易理解的,在訪問臨界資源時,通過互斥,限制同一時刻最多只能有一個線程可以獲取臨界資源。
其實互斥的邏輯就是:如果訪問臨街資源發現沒有其他線程上鎖,就上鎖,獲取臨界資源,期間如果其他線程執行到互斥鎖發現已鎖住,則線程掛起等待解鎖,當前線程訪問完臨界資源后,解鎖並喚醒其他被該互斥鎖掛起的線程,等待再次被調度執行。
“掛起等待”和“喚醒等待線程”的操作如何實現?每個Mutex有一個等待隊列,一個線程要在Mutex上掛起等待,首先在把自己加入等待隊列中,然后置線程狀態為睡眠,然后調用調度器函數切換到別的線程。一個線程要喚醒等待隊列中的其它線程,只需從等待隊列中取出一項,把它的狀態從睡眠改為就緒,加入就緒隊列,那么下次調度器函數執行時就有可能切換到被喚醒的線程。
主要函數如下:
#include <pthread.h>
int pthread_mutex_init(pthread_mutex_t *restrict mutex,
const pthread_mutexattr_t *restrict attr); /*初始化互斥量*/
int pthread_mutex_destroy(pthread_mutex_t *mutex); /*銷毀互斥量*/
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
用互斥解決上面計算結果錯誤的問題,示例如下:
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
#define LEN 100000
int num = 0;
void* thread_func(void* arg) {
pthread_mutex_t* p_mutex = (pthread_mutex_t*)arg;
for (int i = 0; i< LEN; ++i) {
pthread_mutex_lock(p_mutex);
num += 1;
pthread_mutex_unlock(p_mutex);
}
return NULL;
}
int main() {
pthread_mutex_t m_mutex;
pthread_mutex_init(&m_mutex, NULL);
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, (void*)thread_func, (void*)&m_mutex);
pthread_create(&tid2, NULL, (void*)thread_func, (void*)&m_mutex);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
pthread_mutex_destroy(&m_mutex);
printf("correct result=%d, result=%d.\n", 2*LEN, num);
return 0;
}
運行結果:correct result=200000, result=200000.
如果在互斥中還嵌套有其他互斥代碼,需要注意死鎖問題。
產生死鎖的兩種情況:
- 一種情況是:如果同一個線程先后兩次調用lock,在第二次調用時,由於鎖已經被占用,該線程會掛起等待別的線程釋放鎖,然而鎖正是被自己占用着的,該線程又被掛起而沒有機會釋放鎖,因此就永遠處於掛起等待狀態了,產生死鎖。
- 另一種典型的死鎖情形是:線程A獲得了鎖1,線程B獲得了鎖2,這時線程A調用lock試圖獲得鎖2,結果是需要掛起等待線程B釋放鎖2,而這時線程B也調用lock試圖獲得鎖1,結果是需要掛起等待線程A釋放鎖1,於是線程A和B都永遠處於掛起狀態了。
如何避免死鎖:
- 不用互斥鎖(這個很多時候很難辦到)
- 寫程序時應該盡量避免同時獲得多個鎖。
- 如果一定有必要這么做,則有一個原則:如果所有線程在需要多個鎖時都按相同的先后順序(常見的是按Mutex變量的地址順序)獲得鎖,則不會出現死鎖。 (比如一個程序中用到鎖1、鎖2、鎖3,它們所對應的Mutex變量的地址是鎖1<鎖2<鎖3,那么所有線程在需要同時獲得2個或3個鎖時都應該按鎖1、鎖2、鎖3的順序獲得。如果要為所有的鎖確定一個先后順序比較困難,則應該盡量使用
pthread_mutex_trylock
調用代替pthread_mutex_lock
調用,以避免死鎖。)
【2】條件變量
條件變量概括起來就是:一個線程需要等某個條件成立(而這個條件是由其他線程決定的)才能繼續往下執行,現在這個條件不成立,線程就阻塞等待,等到其他線程在執行過程中使這個條件成立了,就喚醒線程繼續執行。
相關函數如下:
#include <pthread.h>
int pthread_cond_destroy(pthread_cond_t *cond);
int pthread_cond_init(pthread_cond_t *restrict cond,
const pthread_condattr_t *restrict attr);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
int pthread_cond_wait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex);
int pthread_cond_broadcast(pthread_cond_t *cond);
int pthread_cond_signal(pthread_cond_t *cond);
舉個最容易理解條件變量的例子,“生產者-消費者”模式中,生產者線程向隊列中發送數據,消費者線程從隊列中取數據,當消費者線程的處理速度大於生產者線程時,會產生隊列中沒有數據了,一種處理辦法是等待一段時間再次“輪詢”,但這種處理方式不太好,你不知道應該等多久,這時候條件變量可以很好的解決這個問題。下面是代碼:
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<errno.h>
#include<string.h>
#define LIMIT 1000
struct data {
int n;
struct data* next;
};
pthread_cond_t condv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;
struct data* phead = NULL;
void producer(void* arg) {
printf("producer thread running.\n");
int count = 0;
for (;;) {
int n = rand() % 100;
struct data* nd = (struct data*)malloc(sizeof(struct data));
nd->n = n;
pthread_mutex_lock(&mlock);
struct data* tmp = phead;
phead = nd;
nd->next = tmp;
pthread_mutex_unlock(&mlock);
pthread_cond_signal(&condv);
count += n;
if(count > LIMIT) {
break;
}
sleep(rand()%5);
}
printf("producer count=%d\n", count);
}
void consumer(void* arg) {
printf("consumer thread running.\n");
int count = 0;
for(;;) {
pthread_mutex_lock(&mlock);
if (NULL == phead) {
pthread_cond_wait(&condv, &mlock);
} else {
while(phead != NULL) {
count += phead->n;
struct data* tmp = phead;
phead = phead->next;
free(tmp);
}
}
pthread_mutex_unlock(&mlock);
if (count > LIMIT)
break;
}
printf("consumer count=%d\n", count);
}
int main() {
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, (void*)producer, NULL);
pthread_create(&tid2, NULL, (void*)consumer, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
return 0;
}
條件變量中的執行邏輯:
關鍵是理解執行到int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex)
這里時發生了什么,其他的都比較容易理解。執行這條函數前需要先獲取互斥鎖,判斷條件是否滿足,如果滿足執行條件,則繼續向下執行后釋放鎖;如果判斷不滿足執行條件,則釋放鎖,線程阻塞在這里,一直等到其他線程通知執行條件滿足,喚醒線程,再次加鎖,向下執行后釋放鎖。(簡而言之就是:釋放鎖-->阻塞等待-->喚醒后加鎖返回)
上面的例子可能有些繁瑣,下面的這個代碼示例則更為簡潔:
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<errno.h>
#include<string.h>
#define NUM 3
pthread_cond_t condv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mlock = PTHREAD_MUTEX_INITIALIZER;
void producer(void* arg) {
int n = NUM;
while(n--) {
sleep(1);
pthread_cond_signal(&condv);
printf("producer thread send notify signal. %d\t", NUM-n);
}
}
void consumer(void* arg) {
int n = 0;
while (1) {
pthread_cond_wait(&condv, &mlock);
printf("recv producer thread notify signal. %d\n", ++n);
if (NUM == n) {
break;
}
}
}
int main() {
pthread_t tid1, tid2;
pthread_create(&tid1, NULL, (void*)producer, NULL);
pthread_create(&tid2, NULL, (void*)consumer, NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
return 0;
}
運行結果:
producer thread send notify signal. 1 recv producer thread notify signal. 1
producer thread send notify signal. 2 recv producer thread notify signal. 2
producer thread send notify signal. 3 recv producer thread notify signal. 3
【3】信號量
信號量適用於控制一個僅支持有限個用戶的共享資源。用於保持在0至指定最大值之間的一個計數值。當線程完成一次對該semaphore
對象的等待時,該計數值減一;當線程完成一次對semaphore
對象的釋放時,計數值加一。當計數值為0時,線程掛起等待,直到計數值超過0.
主要函數如下:
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
int sem_wait(sem_t *sem);
int sem_trywait(sem_t *sem);
int sem_post(sem_t * sem);
int sem_destroy(sem_t * sem);
代碼示例如下:
#include<sys/types.h>
#include<unistd.h>
#include<stdlib.h>
#include<stdio.h>
#include<pthread.h>
#include<errno.h>
#include<string.h>
#include<semaphore.h>
#define NUM 5
int queue[NUM];
sem_t psem, csem;
void producer(void* arg) {
int pos = 0;
int num, count = 0;
for (int i=0; i<12; ++i) {
num = rand() % 100;
count += num;
sem_wait(&psem);
queue[pos] = num;
sem_post(&csem);
printf("producer: %d\n", num);
pos = (pos+1) % NUM;
sleep(rand()%2);
}
printf("producer count=%d\n", count);
}
void consumer(void* arg){
int pos = 0;
int num, count = 0;
for (int i=0; i<12; ++i) {
sem_wait(&csem);
num = queue[pos];
sem_post(&psem);
printf("consumer: %d\n", num);
count += num;
pos = (pos+1) % NUM;
sleep(rand()%3);
}
printf("consumer count=%d\n", count);
}
int main() {
sem_init(&psem, 0, NUM);
sem_init(&csem, 0, 0);
pthread_t tid[2];
pthread_create(&tid[0], NULL, (void*)producer, NULL);
pthread_create(&tid[1], NULL, (void*)consumer, NULL);
pthread_join(tid[0], NULL);
pthread_join(tid[1], NULL);
sem_destroy(&psem);
sem_destroy(&csem);
return 0;
}
信號量的執行邏輯:
當需要獲取共享資源時,先檢查信號量,如果值大於0,則值減1,訪問共享資源,訪問結束后,值加1,如果發現有被該信號量掛起的線程,則喚醒其中一個線程;如果檢查到信號量為0,則掛起等待。
可參考源碼sem_post.c
三、多線程編程總結與思考
最后,我們對多線程編程進行總結與思考。
- 第一點就是在進行多線程編程時一定注意考慮同步的問題,因為多數情況下我們創建多線程的目的是讓他們協同工作,如果不進行同步,可能會出現問題。
- 第二點,死鎖的問題。在多個線程訪問多個臨界資源時,處理不當會發生死鎖。如果遇到編譯通過,運行時卡住了,有可能是發生死鎖了,可以先思考一下是那些線程會訪問多個臨界資源,這樣查找問題會快一些。
- 第三點,臨界資源的處理,多線程出現問題,很大原因是多個線程訪問臨界資源時的問題,一種處理方式是將對臨界資源的訪問與處理全部放到一個線程中,用這個線程服務其他線程的請求,這樣只有一個線程訪問臨界資源就會解決很多問題。
- 第四點,線程池,在處理大量短任務時,我們可以先創建好一個線程池,線程池中的線程不斷從任務隊列中取任務執行,這樣就不用大量創建線程與銷毀線程,這里不再細述。
參考文檔:pthread.h - threads