一、多線程基本概念
多線程(multithreading),是指從軟件或者硬件上實現多個線程並發執行的技術。具有多線程能力的計算機因有硬件支持而能夠在同一時間執行多於一個線程,進而提升整體處理性能。具有這種能力的系統包括對稱多處理機、多核心處理器以及芯片級多處理或同時多線程處理器。在一個程序中,這些獨立運行的程序片段叫作“線程”(Thread),利用它編程的概念就叫作“多線程處理” .(百度) 在單核CPU單線程的處理器上,對於多線程的處理方式,只能分時切換線程,每一個線程運行一個時間片然后被換出,在這種情況下,無須擔心公共臨界區的變量的競爭問題,相反在對核心CPU中就需要非常嚴格的關注臨界區的數據競爭情況。如下圖所示分別為單核心多核心的線程調度情況:
二、多線程基本API介紹
1. 在Linux環境下多線程編程頭文件
1 #include <errno.h> // Error code head file(EBUSY,ETIMEDOUT) 2 #include <pthread.h> // Pthread head file
2. 基本線程相關函數
1. pthread_mutex_t g_mutex; // 臨界區鎖定義 2. pthread_mutex_init(g_mutex,NULL); // 鎖初始化函數 3. pthread_cond_t g_cond; // 觸發條件定義 4. pthread_cond_init(g_cond,NULL); // 初始化條件變量 5. int ret = pthread_mutex_lock(&g_mutex); // 獲取鎖,獲取失敗則阻塞在此函數直至獲取到鎖,失敗返回錯誤代碼EBUSY 6. pthread_mutex_unlock(&g_mutex); // 進行解鎖,釋放鎖 7. ret = pthread_mutex_trylock(&g_mutex); // 嘗試獲取鎖,若獲取失敗,則不阻塞直接跳過,返回值為鎖繁忙EBUSY 8. ret = pthread_mutex_timedlock(&g_mutex ,&outtime); // 嘗試在outtime時間段之內獲取鎖,若超時未獲取則不阻塞,並返回ETIMEDOUT 10. pthread_cond_wait(g_cond,g_mutex); // 先對鎖進行解鎖,並且阻塞等待cond信號觸發直至觸發完成 11. ret = pthread_cond_timedwait(g_cond,g_mutex,&outtime); // 先對鎖進行解鎖,並且等待cond信號觸發,若在截止時間之前未觸發,則跳出函數,並返回ETIMEDOUT 12. pthread_cond_signal(g_cond); // 單一發出觸發信號,觸發等待隊列中的第一個線程,假設您只對隊列添加了一個工作作業。那么只需要喚醒一個工作程序線程(再喚醒其它線程是不禮貌的!) 13. pthread_cond_broadcast(g_cond); // 發出廣播觸發信號,通知喚醒等待隊列中的所有線程 14. pthread_cond_destroy(g_cond); // 銷毀條件變量,歸還條件變量資源 15. pthread_t th1; // 定義線程對象,類似於進程的PID號 16. pthread_create(&th1,NULL,thread_func,NULL); // Create the Thread1 & Start the thread func. 17. pthread_join(th1,NULL); // Wait the Thread1 end. 18. 待補充!
如上所示為基本的多線程編程API,上述鎖的API的實現包括了多種方式,基本都能夠滿足常見的鎖需求,對應的每一個函數調用基本上都有int類型的返回值,一般返回的內容都包括在了error.h頭文件中。
其中函數pthread_cond_wait、pthread_mutex_lock會阻塞線程直至獲取到對應條件,而其他鎖相關函數不會一直阻塞線程,從而能夠滿足其他的場景。
3. 阻塞時間outtime參數設置:
\* 如下是timespec結構體的具體類型,其中都是long型 *\ struct timespec { __time_t tv_sec; /* Seconds. */ __syscall_slong_t tv_nsec; /* Nanoseconds. */ };
timespec結構體可以精確到納秒級別,包括了兩個成員變量,分別為秒tv_sec以及納秒tv_nsec;
具體在延遲阻塞中應用是,可以先獲取系統當前時間,然后在當前時間的基礎上增加線程阻塞延遲增量,然后通過指針形參傳遞給對應的線程阻塞函數即可,詳參見下:
1 #include <time.h> // 需要包含時間頭文件 2 long wt_ms; // 需要延遲阻塞的時間 3 struct timespec outtime; // Defination定義 4 clock_gettime(CLOCK_REALTIME, &outtime); // Get the current time.(獲取當前系統時間,注意一定要用CLOCK_REALTIME來獲取系統時間) 5 6 /**************************************** 7 *** 開始設置阻塞延遲時間點,在下個時間點觸發 *** 8 ****************************************/ 9 10 outtime.tv_sec += wt_ms / 1000; // 毫秒換算秒 11 time_ns = outtime.tv_nsec + (wt_ms % 1000) * 1000000; // 多出的換算成納秒 12 if(time_ns >= 1000000000){ // 溢出當前時間則需要判斷是否進位 13 outtime.tv_sec++; // 進位 14 outtime.tv_nsec = time_ns - 1000000000; // 計算進位后余 15 }else{ 16 outtime.tv_nsec = time_ns; // 無需進位直接賦值即可 17 } 18 ret = pthread_mutex_timedlock(&g_mutex ,&outtime); // 阻塞延遲一段時間,然后返回相關代碼給ret.
上述代碼將outtime參數阻塞延遲時間傳遞給了pthread_mutex_timedlock函數:
(a) 若在outtime時間節點之前獲取到鎖,則停止阻塞,返回ret=0;
(b) 若時間節點之后還未獲取鎖,則停止阻塞,返回ret=ETIMEDOUT超時。
4. 自旋鎖
自旋鎖與互斥鎖功能一樣,唯一一點不同的就是互斥量阻塞后休眠讓出cpu,而自旋鎖阻塞后不會讓出cpu,會一直忙等待,直到得到鎖!
自旋鎖在用戶態使用的比較少,在內核使用的比較多!自旋鎖的使用場景:鎖的持有時間比較短,或者說小於2次上下文切換的時間。
自旋鎖在用戶態的函數接口和互斥量一樣,把pthread_mutex_xxx()中mutex換成spin,如:pthread_spin_init()。
5. 讀寫鎖(非常適合於讀寫線程的變量共享情況)
假如現在一個線程a只是想讀一個共享變量i,因為不確定是否會有線程去寫他,所以我們還是要對它進行加鎖。
但是這時候又一個線程b試圖讀共享變量i ,於是發現被鎖住,那么b不得不等到a釋放了鎖后才能獲得鎖並讀取 i 的值,但是兩個讀取操作即使是幾乎同時發生也並不會像寫操作那樣造成競爭,因為他們不修改變量的值。
所以我們期望如果是多個線程試圖讀取共享變量值的話,那么他們應該可以立刻獲取而不需要等待前一個線程釋放因為讀而加的鎖。
讀寫鎖可以很好的解決上面的問題。他提供了比互斥量跟好的並行性。因為以讀模式加鎖后當又有多個線程僅僅是試圖再以讀模式加鎖時,並不會造成這些線程阻塞在等待鎖的釋放上。
相關讀寫鎖的API具體如下:
1 int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr); // 初始化讀寫鎖 2 int pthread_rwlockattr_destory(pthread_rwlockattr_t *attr); // 銷毀回收讀寫鎖 3 int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); // 阻塞方式獲取讀鎖 4 int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); // 阻塞方式獲取寫鎖 5 int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); // 非阻塞方式獲取讀鎖 6 int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); // 非阻塞方式獲取寫鎖 7 int pthread_rwlock_unlock(pthread_rwlock_t *rwlock); // 釋放讀寫鎖
具體使用實例代碼如下:

1 #include <stdio.h> 2 #include <errno.h> // Error code head file(EBUSY) 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <pthread.h> // Pthread head file 6 7 pthread_rwlock_t rw_lock = PTHREAD_RWLOCK_INITIALIZER; 8 int global = 0; 9 10 void *thread_read_func(void *arg); 11 void *thread_write_func(void *arg); 12 13 int main(void) 14 { 15 pthread_t th1,th2; 16 printf("Mutil_Thread_Sys Starting...\n"); // Show the Starting point. 17 pthread_mutex_unlock(GMemory->g_mutex); 18 pthread_create(&th2,NULL,thread_read_func,"Read-TH"); // Create the Thread2 & Start the thread func2. 19 pthread_create(&th1,NULL,thread_write_func,"Write_TH"); // Create the Thread1 & Start the thread func1. 20 21 while(1){ // Main Processing thread. 22 ; 23 } 24 25 pthread_join(th2,NULL); // wait the Thread2 end. 26 pthread_join(th1,NULL); // Wait the Thread1 end. 27 28 printf("System Exit.\n"); // Show the Ending point. 29 return 0; 30 } 31 32 void *thread_read_func(void *arg) 33 { 34 bool ret; 35 char *pthr_name = (char *)arg; 36 while(1){ 37 ret = pthread_rwlock_rdlock(&rw_lock); 38 printf("The %s Read value:%d\n",pthr_name,global); 39 usleep(5000); 40 pthread_rwlock_unlock(&rw_lock); 41 } 42 } 43 44 void *thread_write_func(void *arg) 45 { 46 bool ret; 47 char *pthr_name = (char *)arg; 48 while(1){ 49 ret = pthread_rwlock_wrlock(&rw_lock); 50 global++; 51 usleep(5000); 52 printf("The %s Write thread value:%d\n",pthr_name,global); 53 pthread_rwlock_unlock(&rw_lock); 54 } 55 }
運行結果如下:
三、線程間通訊方式
1. 全局變量通信
由於線程使用的棧空間和堆空間都是進程的,而多線程都屬於進程,故而全局變量能夠被多個線程同時訪問(為了防止使用混亂,采用鎖機制來對全局變量進行訪問即可);
1 typedef struct Global_Memory{ // #define new struct type var with mutex lock and data also using-time. 2 pthread_mutex_t *g_mutex; // The mutex lock variable define 3 unsigned int Memory[10]; // The truly Data you will deal with. 4 }GMem;
設計如上所示的結構體變量,並在全局定義,此時即可將Memory空間和g_mutex鎖變量進行了綁定,只有獲取鎖的狀態下才可以修改Memory當中的內容,從而能夠有序的完成線程間同信。
具體實現代碼如下:

1 #include <stdio.h> 2 #include <errno.h> // Error code head file(EBUSY) 3 #include <stdlib.h> 4 #include <unistd.h> 5 #include <pthread.h> // Pthread head file 6 7 typedef struct Global_Memory{ // #define new struct type var with mutex lock and data also using-time. 8 pthread_mutex_t *g_mutex; // The mutex lock variable define 9 unsigned int Memory[10]; // The truly Data you will deal with. 10 int using_time; // Using time setup. 11 }GMem; 12 13 GMem *GMemory = NULL; // #define the mutex-lock & Memory variable 14 15 void *thread_func1(void *arg); // #define the thread functions 16 void *thread_func2(void *arg); 17 18 int main(void) 19 { 20 pthread_t th1,th2; 21 GMemory = (GMem *)malloc(sizeof(GMem)); // Allocal the storage spcae for Memory Data and time_using. 22 GMemory->g_mutex = (pthread_mutex_t *)malloc(sizeof(pthread_mutex_t)); // Allocal the memory space for the mutex variable. 23 24 printf("Mutil_Thread_Sys Starting...\n"); // Show the Starting point. 25 pthread_mutex_init(GMemory->g_mutex,NULL); // Initialize the mutex. 26 pthread_create(&th1,NULL,thread_func1,NULL); // Create the Thread1 & Start the thread func1. 27 pthread_create(&th2,NULL,thread_func2,NULL); // Create the Thread2 & Start the thread func2. 28 29 while(1){ // Main Processing thread. 30 printf("This is the Main Processing...\n"); 31 sleep(1); // Delay time cost 1second. 32 } 33 34 pthread_join(th1,NULL); // Wait the Thread1 end. 35 pthread_join(th2,NULL); // wait the Thread2 end. 36 printf("System Exit.\n"); // Show the Ending point. 37 return 0; 38 } 39 40 void *thread_func1(void *arg) // Func1 Point address Define.(void *) 41 { 42 time_t t_s; // Define a time_t var for storage the thread ending timestamp. 43 while(1){ 44 pthread_mutex_lock(GMemory->g_mutex); // try to get the mutex lock var, if success, the thread is going to run and deal with the Memory Data. 45 printf("********************************************************************************Func1 Locked.\n"); // Print the mutex lock var's state. 46 t_s = time(NULL) + 1; // setup the ending time of the thread. 47 while(time(NULL) <= t_s){ // Control the ending time. 48 GMemory->Memory[1]++; // Deal with the Memory Data. 49 printf("This is thread_func1 Running:Memory[1]=%d Memory[2]=%d\n",GMemory->Memory[1],GMemory->Memory[2]); // show the result the thread just did. 50 usleep(1000); // us delay for testing the thread time control. 51 } 52 pthread_mutex_unlock(GMemory->g_mutex); // Unlock the mutex variable so that other thread could use the mutex lock variable. 53 printf("********************************************************************************Func1 Unlocked.\n"); // Print the mutex lock var's state. 54 } 55 } 56 57 void *thread_func2(void *arg) 58 { 59 time_t t_s; 60 while(1){ 61 // int ret = pthread_mutex_trylock(GMemory->g_mutex); // Use the Trylock function will make the func2 to be sleeped, Use lock to make sure the Func2 thread could be called. 62 // if(EBUSY == ret) continue; 63 pthread_mutex_lock(GMemory->g_mutex); 64 printf("********************************************************************************Func2 Locked.\n"); 65 t_s = time(NULL) + 1; 66 while(time(NULL) <= t_s){ 67 GMemory->Memory[2]++; 68 printf("This is thread_func2 Running:Memory[1]=%d Memory[2]=%d\n",GMemory->Memory[1],GMemory->Memory[2]); 69 usleep(1000); 70 } 71 pthread_mutex_unlock(GMemory->g_mutex); 72 printf("********************************************************************************Func2 Unlocked.\n"); 73 } 74 }
2. 消息隊列
3. 信號量喚醒
信號量作為基本的通信方式,在線程阻塞睡眠喚醒的過程當中,充當着非常重要的角色,如下所示為條件信號量的定義及使用:
1 pthread_cond_t *g_cond; // condition semaphore define 2 ret = pthread_cond_timedwait(g_cond,GMemory->g_mutex,&outtime); // Waitting for the trigger signal and active the threads in queue while the outtime. 3 ret = pthread_cond_wait(g_cond, GMemory->g_mutex); // unlock the g_mutex lock and wait for the condition trigger. 4 pthread_cond_broadcast(g_cond); // broadcast trigger all threads in the queue. 5 pthread_cond_signal(g_cond); // Only wake up the first thread in the front of wait-queue.
2,3行在線程中用來等待其他線程使用4,5行來進行cond信號量喚醒。
四、生產者消費者模型
1. 如下圖所示為基本的線程狀態切換流程:
2. 基本相關多線程框架案例
(a) 延遲阻塞多線程框架:
框架介紹:多個線程在不同時間節點啟動,獲取同一公共資源,每個線程都能設置自身獨立的阻塞等待的時間,若獲取資源等待超時,則放棄等待。
1 bool th_queue_run(long wt_ms, long choke_ms,int th) 2 { 3 int ret; 4 long time_ns=0; 5 struct timespec outtime; 6 clock_gettime(CLOCK_REALTIME, &outtime); // Get the current time. 7 outtime.tv_sec += wt_ms / 1000; 8 time_ns = outtime.tv_nsec + (wt_ms % 1000) * 1000000; 9 if(time_ns >= 1000000000){ 10 outtime.tv_sec++; 11 outtime.tv_nsec = time_ns - 1000000000; 12 }else{ 13 outtime.tv_nsec = time_ns; 14 } 15 ret = pthread_mutex_timedlock(GMemory->g_mutex ,&outtime); 16 if(ret != 0){ 17 if(ret == ETIMEDOUT){ 18 printf("%d Timeout | Compete, Running Failed!\n",th); 19 return false; 20 } 21 } 22 printf("%d Running successfully.\n",th); 23 GMemory->Memory[th]++; 24 printf("Memory[3] Memory[4] **** %d %d\n",GMemory->Memory[3],GMemory->Memory[4]); 25 sleep(choke_ms); 26 pthread_mutex_unlock(GMemory->g_mutex); 27 return true; 28 }
(b) 未完待續
五、關於線程調度
1. Linux下的sleep()和sched_yield()
sched_yield()的man手冊描述如下:
DESCRIPTIONsched_yield() causes the calling thread to relinquish the CPU. The thread is moved to the end of the queue for its static priority and a new thread gets to run.RETURN VALUEOn success, sched_yield() returns 0. On error, -1 is returned, and errno is set appropriately.ERRORSIn the Linux implementation, sched_yield() always succeeds.
翻譯一下,sched_yield()會讓出當前線程的CPU占有權,然后把線程放到靜態優先隊列的尾端,然后一個新的線程會占用CPU,那這個和sleep()有啥區別呢?
- sched_yield()這個函數可以使用另一個級別等於或高於當前線程的線程先運行。如果沒有符合條件的線程,那么這個函數將會立刻返回然后繼續執行當前線程的程序。
- 而sleep則是等待一定時間后等待CPU的調度,然后去獲得CPU資源(這也是usleep()為什么不准的原因)。
那么什么時候使用sched_yield()呢?
Strategic calls to sched_yield() can improve performance by giving other threads or processes a chance to run when (heavily) contended resources (e.g., mutexes) have been released by the caller.
翻譯一下,有策略的調用sched_yield()能在資源競爭情況很嚴重時,通過給其他的線程或進程運行機會的方式來提升程序的性能。也就是說,調用sched_yield()能讓你的當前線程讓出資源,通過一定的策略調用sched_yield()滿足你的業務要求可以保證各個線程或進程都有機會運行。
Reference:
樹莓派多線程點燈:https://github.com/embedded-learning-group/Linux_Learning/tree/master/20190818_Lesson7
相關thread API函數接口介紹:https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_timedlock.html
POSIX線程詳解(第三部分):https://www.ibm.com/developerworks/cn/linux/thread/posix_thread3/index.html
線程同步及消息隊列:https://www.cnblogs.com/noticeable/p/8549788.html
消息隊列通信詳解:https://blog.csdn.net/anonymalias/article/details/9799645