Blaise Barney, Lawrence Livermore National Laboratory
- 摘要
- 譯者序
- Pthreads 概述
- Pthreads API編譯多線程程序
- 線程管理
- 互斥量(Mutex Variables)
- 條件變量(Condition Variable)
- 沒有覆蓋的主題
- Pthread 庫API參考
- 參考資料
在多處理器共享內存的架構中(如:對稱多處理系統SMP),線程可以用於實現程序的並行性。歷史上硬件銷售商實現了各種私有版本的多線程庫,使得軟件開發者不得不關心它的移植性。對於UNIX系統,IEEE POSIX 1003.1標准定義了一個C語言多線程編程接口。依附於該標准的實現被稱為POSIX theads 或 Pthreads。
該教程介紹了Pthreads的概念、動機和設計思想。內容包含了Pthreads API主要的三大類函數:線程管理(Thread Managment)、互斥量(Mutex Variables)和條件變量(Condition Variables)。向剛開始學習Pthreads的程序員提供了演示例程。
適於:剛開始學習使用線程實現並行程序設計;對於C並行程序設計有基本了解。不熟悉並行程序設計的可以參考EC3500: Introduction To Parallel Computing。
概述 |
- 技術上,線程可以定義為:可以被操作系統調度的獨立的指令流。但是這是什么意思呢?
- 對於軟件開發者,在主程序中運行的“函數過程”可以很好的描述線程的概念。
- 進一步,想象下主程序(a.out)包含了許多函數,操作系統可以調度這些函數,使之同時或者(和)獨立的執行。這就描述了“多線程”程序。
- 怎樣完成的呢?
- 在理解線程之前,應先對UNIX進程(process)有所了解。進程被操作系統創建,需要相當多的“額外開銷”。進程包含了程序的資源和執行狀態信息。如下:
- 進程ID,進程group ID,用戶ID和group ID
- 環境
- 工作目錄
- 程序指令
- 寄存器
- 棧
- 堆
- 文件描述符
- 信號動作(Signal actions)
- 共享庫
- 進程間通信工具(如:消息隊列,管道,信號量或共享內存)
|
|
UNIX PROCESS |
THREADS WITHIN A UNIX PROCESS |
- 線程使用並存在於進程資源中,還可以被操作系統調用並獨立地運行,這主要是因為線程僅僅復制必要的資源以使自己得以存在並執行。
- 獨立的控制流得以實現是因為線程維持着自己的:
- 堆棧指針
- 寄存器
- 調度屬性(如:策略或優先級)
- 待定的和阻塞的信號集合(Set of pending and blocked signals)
- 線程專用數據(TSD:Thread Specific Data.)
- 因此,在UNIX環境下線程:
- 存在於進程,使用進程資源
- 擁有自己獨立的控制流,只要父進程存在並且操作系統支持
- 只復制必可以使得獨立調度的必要資源
- 可以和其他線程獨立(或非獨立的)地共享進程資源
- 當父進程結束時結束,或者相關類似的
- 是“輕型的”,因為大部分額外開銷已經在進程創建時完成了
- 因為在同一個進程中的線程共享資源:
- 一個線程對系統資源(如關閉一個文件)的改變對所有其它線程是可以見的
- 兩個同樣值的指針指向相同的數據
- 讀寫同一個內存位置是可能的,因此需要成員顯式地使用同步
Pthreads 概述 |
- 歷史上,硬件銷售商實現了私有版本的多線程庫。這些實現在本質上各自不同,使得程序員難於開發可移植的應用程序。
- 為了使用線程所提供的強大優點,需要一個標准的程序接口。對於UNIX系統,IEEE POSIX 1003.1c(1995)標准制訂了這一標准接口。依賴於該標准的實現就稱為POSIX threads 或者Pthreads。現在多數硬件銷售商也提供Pthreads,附加於私有的API。
- Pthreads 被定義為一些C語言類型和函數調用,用pthread.h頭(包含)文件和線程庫實現。這個庫可以是其它庫的一部分,如libc。
Pthreads 概述 |
- 使用Pthreads的主要動機是提高潛在程序的性能。
- 當與創建和管理進程的花費相比,線程可以使用操作系統較少的開銷,管理線程需要較少的系統資源。
例如,下表比較了fork()函數和pthread_create()函數所用的時間。計時反應了50,000個進程/線程的創建,使用時間工具實現,單位是秒,沒有優化標志。
備注:不要期待系統和用戶時間加起來就是真實時間,因為這些SMP系統有多個CPU同時工作。這些都是近似值。
平台 |
fork() |
pthread_create() |
||||
real |
user |
sys |
real |
user |
sys |
|
AMD 2.4 GHz Opteron (8cpus/node) |
41.07 |
60.08 |
9.01 |
0.66 |
0.19 |
0.43 |
IBM 1.9 GHz POWER5 p5-575 (8cpus/node) |
64.24 |
30.78 |
27.68 |
1.75 |
0.69 |
1.10 |
IBM 1.5 GHz POWER4 (8cpus/node) |
104.05 |
48.64 |
47.21 |
2.01 |
1.00 |
1.52 |
INTEL 2.4 GHz Xeon (2 cpus/node) |
54.95 |
1.54 |
20.78 |
1.64 |
0.67 |
0.90 |
INTEL 1.4 GHz Itanium2 (4 cpus/node) |
54.54 |
1.07 |
22.22 |
2.03 |
1.26 |
0.67 |
- 在同一個進程中的所有線程共享同樣的地址空間。較於進程間的通信,在許多情況下線程間的通信效率比較高,且易於使用。
- 較於沒有使用線程的程序,使用線程的應用程序有潛在的性能增益和實際的優點:
- CPU使用I/O交疊工作:例如,一個程序可能有一個需要較長時間的I/O操作,當一個線程等待I/O系統調用完成時,CPU可以被其它線程使用。
- 優先/實時調度:比較重要的任務可以被調度,替換或者中斷較低優先級的任務。
- 異步事件處理:頻率和持續時間不確定的任務可以交錯。例如,web服務器可以同時為前一個請求傳輸數據和管理新請求。
- 考慮在SMP架構上使用Pthreads的主要動機是獲的最優的性能。特別的,如果一個程序使用MPI在節點通信,使用Pthreads可以使得節點數據傳輸得到顯著提高。
- 例如:
- MPI庫經常用共享內存實現節點任務通信,這至少需要一次內存復制操作(進程到進程)。
- Pthreads沒有中間的內存復制,因為線程和一個進程共享同樣的地址空間。沒有數據傳輸。變成cache-to-CPU或memory-to-CPU的帶寬(最壞情況),速度是相當的快。
- 比較如下:
Platform |
MPI Shared Memory Bandwidth |
Pthreads Worst Case |
AMD 2.4 GHz Opteron |
1.2 |
5.3 |
IBM 1.9 GHz POWER5 p5-575 |
4.1 |
16 |
IBM 1.5 GHz POWER4 |
2.1 |
4 |
Intel 1.4 GHz Xeon |
0.3 |
4.3 |
Intel 1.4 GHz Itanium 2 |
1.8 |
6.4 |
Pthreads 概述 |
並行編程:
- 在現代多CPU機器上,pthread非常適於並行編程。可以用於並行程序設計的,也可以用於pthread程序設計。
- 並行程序要考慮許多,如下:
- 用什么並行程序設計模型?
- 問題划分
- 加載平衡(Load balancing)
- 通信
- 數據依賴
- 同步和競爭條件
- 內存問題
- I/O問題
- 程序復雜度
- 程序員的努力/花費/時間
- ...
- 包含這些主題超出本教程的范圍,有興趣的讀者可以快速瀏覽下“Introduction to Parallel Computing”教程。
- 大體上,為了使用Pthreads的優點,必須將任務組織程離散的,獨立的,可以並發執行的。例如,如果routine1和routine2可以互換,相互交叉和(或者)重疊,他們就可以線程化。
- 擁有下述特性的程序可以使用pthreads:
- 工作可以被多個任務同時執行,或者數據可以同時被多個任務操作。
- 阻塞與潛在的長時間I/O等待。
- 在某些地方使用很多CPU循環而其他地方沒有。
- 對異步事件必須響應。
- 一些工作比其他的重要(優先級中斷)。
- Pthreads 也可以用於串行程序,模擬並行執行。很好例子就是經典的web瀏覽器,對於多數人,運行於單CPU的桌面/膝上機器,許多東西可以同時“顯示”出來。
- 使用線程編程的幾種常見模型:
- 管理者/工作者(Manager/worker):一個單線程,作為管理器將工作分配給其它線程(工作者),典型的,管理器處理所有輸入和分配工作給其它任務。至少兩種形式的manager/worker模型比較常用:靜態worker池和動態worker池。
- 管道(Pipeline):任務可以被划分為一系列子操作,每一個被串行處理,但是不同的線程並發處理。汽車裝配線可以很好的描述這個模型。
- Peer: 和manager/worker模型相似,但是主線程在創建了其它線程后,自己也參與工作。
共享內存模型(Shared Memory Model):
- 所有線程可以訪問全局,共享內存
- 線程也有自己私有的數據
- 程序員負責對全局共享數據的同步存取(保護)
線程安全(Thread-safeness):
- 線程安全:簡短的說,指程序可以同時執行多個線程卻不會“破壞“共享數據或者產生“競爭”條件的能力。
- 例如:假設你的程序創建了幾個線程,每一個調用相同的庫函數:
- 這個庫函數存取/修改了一個全局結構或內存中的位置。
- 當每個線程調用這個函數時,可能同時去修改這個全局結構活內存位置。
- 如果函數沒有使用同步機制去阻止數據破壞,這時,就不是線程安全的了。
- 如果你不是100%確定外部庫函數是線程安全的,自己負責所可能引發的問題。
- 建議:小心使用庫或者對象,當不能明確確定是否是線程安全的。若有疑慮,假設其不是線程安全的直到得以證明。可以通過不斷地使用不確定的函數找出問題所在。
- Pthreads API在ANSI/IEEE POSIX 1003.1 – 1995標准中定義。不像MPI,該標准不是免費的,必須向IEEE購買。
- Pthreads API中的函數可以非正式的划分為三大類:
- 線程管理(Thread management): 第一類函數直接用於線程:創建(creating),分離(detaching),連接(joining)等等。包含了用於設置和查詢線程屬性(可連接,調度屬性等)的函數。
- 互斥量(Mutexes): 第二類函數是用於線程同步的,稱為互斥量(mutexes),是"mutual exclusion"的縮寫。Mutex函數提供了創建,銷毀,鎖定和解鎖互斥量的功能。同時還包括了一些用於設定或修改互斥量屬性的函數。
- 條件變量(Condition variables):第三類函數處理共享一個互斥量的線程間的通信,基於程序員指定的條件。這類函數包括指定的條件變量的創建,銷毀,等待和受信(signal)。設置查詢條件變量屬性的函數也包含其中。
- 命名約定:線程庫中的所有標識符都以pthread開頭
Routine Prefix |
Functional Group |
pthread_ |
線程本身和各種相關函數 |
pthread_attr_ |
線程屬性對象 |
pthread_mutex_ |
互斥量 |
pthread_mutexattr_ |
互斥量屬性對象 |
pthread_cond_ |
條件變量 |
pthread_condattr_ |
條件變量屬性對象 |
pthread_key_ |
線程數據鍵(Thread-specific data keys) |
- 在API的設計中充滿了不透明對象的概念,基本調用可以創建或修改不透明對象。不透明的對象可以被一些屬性函數調用修改。
- Pthread API包含了60多個函數。該教程僅限於一部分(對於剛開始學習Pthread的程序是非常有用的)。
- 為了可移植性,使用Pthread庫時,pthread.h頭文件必須在每個源文件中包含。
- 現行POSIX標准僅定義了C語言的使用。Fortran程序員可以嵌入C函數調用使用,有些Fortran編譯器(像IBM AIX Fortran)可能提供了Fortran pthreads API。
- 關於Pthreads有些比較優秀的書籍。其中一些在該教程的參考一節列出。
- 下表列出了一些編譯使用了pthreads庫程序的命令:
Compiler / Platform |
Compiler Command |
Description |
IBM |
xlc_r / cc_r |
C (ANSI / non-ANSI) |
xlC_r |
C++ |
|
xlf_r -qnosave |
Fortran - using IBM's Pthreads API (non-portable) |
|
INTEL |
icc -pthread |
C |
icpc -pthread |
C++ |
|
PathScale |
pathcc -pthread |
C |
pathCC -pthread |
C++ |
|
PGI |
pgcc -lpthread |
C |
pgCC -lpthread |
C++ |
|
GNU |
gcc -pthread |
GNU C |
g++ -pthread |
GNU C++ |
函數:
pthread_create (thread,attr,start_routine,arg) pthread_exit (status) pthread_attr_init (attr) pthread_attr_destroy (attr) |
創建線程:
- 最初,main函數包含了一個缺省的線程。其它線程則需要程序員顯式地創建。
- pthread_create 創建一個新線程並使之運行起來。該函數可以在程序的任何地方調用。
- pthread_create參數:
- thread:返回一個不透明的,唯一的新線程標識符。
- attr:不透明的線程屬性對象。可以指定一個線程屬性對象,或者NULL為缺省值。
- start_routine:線程將會執行一次的C函數。
- arg: 傳遞給start_routine單個參數,傳遞時必須轉換成指向void的指針類型。沒有參數傳遞時,可設置為NULL。
- 一個進程可以創建的線程最大數量取決於系統實現。
- 一旦創建,線程就稱為peers,可以創建其它線程。線程之間沒有指定的結構和依賴關系。
|
|
Q:一個線程被創建后,怎么知道操作系統何時調度該線程使之運行? A:除非使用了Pthreads的調度機制,否則線程何時何地被執行取決於操作系統的實現。強壯的程序應該不依賴於線程執行的順序。 |
線程屬性:
- 線程被創建時會帶有默認的屬性。其中的一些屬性可以被程序員用線程屬性對象來修改。
- pthread_attr_init 和 pthread_attr_destroy用於初始化/銷毀先成屬性對象。
- 其它的一些函數用於查詢和設置線程屬性對象的指定屬性。
- 一些屬性下面將會討論。
結束終止:
- 結束線程的方法有一下幾種:
- 線程從主線程(main函數的初始線程)返回。
- 線程調用了pthread_exit函數。
- 其它線程使用 pthread_cancel函數結束線程。
- 調用exec或者exit函數,整個進程結束。
- pthread_exit用於顯式退出線程。典型地,pthread_exit()函數在線程完成工作時,不在需要時候被調用,退出線程。
- 如果main()在其他線程創建前用pthread_exit()退出了,其他線程將會繼續執行。否則,他們會隨着main的結束而終止。
- 程序員可以可選擇的指定終止狀態,當任何線程連接(join)該線程時,該狀態就返回給連接(join)該線程的線程。
- 清理:pthread_exit()函數並不會關閉文件,任何在線程中打開的文件將會一直處於打開狀態,知道線程結束。
- 討論:對於正常退出,可以免於調用pthread_exit()。當然,除非你想返回一個返回值。然而,在main中,有一個問題,就是當main結束時,其它線程還沒有被創建。如果此時沒有顯式的調用pthread_exit(),當main結束時,進程(和所有線程)都會終止。可以在main中調用pthread_exit(),此時盡管在main中已經沒有可執行的代碼了,進程和所有線程將保持存活狀態,。
例子: Pthread 創建和終止
- 該例用pthread_create()創建了5個線程。每一個線程都會打印一條“Hello World”的消息,然后調用pthread_exit()終止線程。
Example Code - Pthread Creation and Termination #include <pthread.h> #include <stdio.h> #define NUM_THREADS 5
void *PrintHello(void *threadid) { int tid; tid = (int)threadid; printf("Hello World! It's me, thread #%d!/n", tid); pthread_exit(NULL); }
int main (int argc, char *argv[]) { pthread_t threads[NUM_THREADS]; int rc, t; for(t=0; t<NUM_THREADS; t++){ printf("In main: creating thread %d/n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *)t); if (rc){ printf("ERROR; return code from pthread_create() is %d/n", rc); exit(-1); } } pthread_exit(NULL); }
|
線程管理 |
- pthread_create()函數允許程序員想線程的start routine傳遞一個參數。當多個參數需要被傳遞時,可以通過定義一個結構體包含所有要傳的參數,然后用pthread_create()傳遞一個指向改結構體的指針,來打破傳遞參數的個數的限制。
- 所有參數都應該傳引用傳遞並轉化成(void*)。
|
|
Q:怎樣安全地向一個新創建的線程傳遞數據? A:確保所傳遞的數據是線程安全的(不能被其他線程修改)。下面三個例子演示了那個應該和那個不應該。 |
Example 1 - Thread Argument Passing 下面的代碼片段演示了如何向一個線程傳遞一個簡單的整數。主線程為每一個線程使用一個唯一的數據結構,確保每個線程傳遞的參數是完整的。 int *taskids[NUM_THREADS];
for(t=0; t<NUM_THREADS; t++) { taskids[t] = (int *) malloc(sizeof(int)); *taskids[t] = t; printf("Creating thread %d/n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *) taskids[t]); ... }
|
Example 2 - Thread Argument Passing 例子展示了用結構體向線程設置/傳遞參數。每個線程獲得一個唯一的結構體實例。 struct thread_data{ int thread_id; int sum; char *message; };
struct thread_data thread_data_array[NUM_THREADS];
void *PrintHello(void *threadarg) { struct thread_data *my_data; ... my_data = (struct thread_data *) threadarg; taskid = my_data->thread_id; sum = my_data->sum; hello_msg = my_data->message; ... }
int main (int argc, char *argv[]) { ... thread_data_array[t].thread_id = t; thread_data_array[t].sum = sum; thread_data_array[t].message = messages[t]; rc = pthread_create(&threads[t], NULL, PrintHello, (void *) &thread_data_array[t]); ... }
|
Example 3 - Thread Argument Passing (Incorrect) 例子演示了錯誤地傳遞參數。循環會在線程訪問傳遞的參數前改變傳遞給線程的地址的內容。 int rc, t;
for(t=0; t<NUM_THREADS; t++) { printf("Creating thread %d/n", t); rc = pthread_create(&threads[t], NULL, PrintHello, (void *) &t); ... }
|
線程管理 |
函數:
pthread_join (threadid,status) pthread_detach (threadid,status) pthread_attr_setdetachstate (attr,detachstate) pthread_attr_getdetachstate (attr,detachstate) |
連接:
- “連接”是一種在線程間完成同步的方法。例如:
- pthread_join()函數阻賽調用線程知道threadid所指定的線程終止。
- 如果在目標線程中調用pthread_exit(),程序員可以在主線程中獲得目標線程的終止狀態。
- 連接線程只能用pthread_join()連接一次。若多次調用就會發生邏輯錯誤。
- 兩種同步方法,互斥量(mutexes)和條件變量(condition variables),稍后討論。
可連接(Joinable or Not)?
- 當一個線程被創建,它有一個屬性定義了它是可連接的(joinable)還是分離的(detached)。只有是可連接的線程才能被連接(joined),若果創建的線程是分離的,則不能連接。
- POSIX標准的最終草案指定了線程必須創建成可連接的。然而,並非所有實現都遵循此約定。
- 使用pthread_create()的attr參數可以顯式的創建可連接或分離的線程,典型四步如下:
- 聲明一個pthread_attr_t數據類型的線程屬性變量
- 用 pthread_attr_init()初始化改屬性變量
- 用pthread_attr_setdetachstate()設置可分離狀態屬性
- 完了后,用pthread_attr_destroy()釋放屬性所占用的庫資源
分離(Detaching):
- pthread_detach()可以顯式用於分離線程,盡管創建時是可連接的。
- 沒有與pthread_detach()功能相反的函數
建議:
- 若線程需要連接,考慮創建時顯式設置為可連接的。因為並非所有創建線程的實現都是將線程創建為可連接的。
- 若事先知道線程從不需要連接,考慮創建線程時將其設置為可分離狀態。一些系統資源可能需要釋放。
例子: Pthread Joining
Example Code - Pthread Joining 這個例子演示了用Pthread join函數去等待線程終止。因為有些實現並不是默認創建線程是可連接狀態,例子中顯式地將其創建為可連接的。 #include <pthread.h> #include <stdio.h> #define NUM_THREADS 3
void *BusyWork(void *null) { int i; double result=0.0; for (i=0; i<1000000; i++) { result = result + (double)random(); } printf("result = %e/n",result); pthread_exit((void *) 0); }
int main (int argc, char *argv[]) { pthread_t thread[NUM_THREADS]; pthread_attr_t attr; int rc, t; void *status;
/* Initialize and set thread detached attribute */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for(t=0; t<NUM_THREADS; t++) { printf("Creating thread %d/n", t); rc = pthread_create(&thread[t], &attr, BusyWork, NULL); if (rc) { printf("ERROR; return code from pthread_create() is %d/n", rc); exit(-1); } }
/* Free attribute and wait for the other threads */ pthread_attr_destroy(&attr); for(t=0; t<NUM_THREADS; t++) { rc = pthread_join(thread[t], &status); if (rc) { printf("ERROR; return code from pthread_join() is %d/n", rc); exit(-1); } printf("Completed join with thread %d status= %ld/n",t, (long)status); }
pthread_exit(NULL); }
|
線程管理 |
函數:
pthread_attr_getstacksize (attr, stacksize) pthread_attr_setstacksize (attr, stacksize) pthread_attr_getstackaddr (attr, stackaddr) pthread_attr_setstackaddr (attr, stackaddr) |
防止棧問題:
- POSIX標准並沒有指定線程棧的大小,依賴於實現並隨實現變化。
- 很容易超出默認的棧大小,常見結果:程序終止或者數據損壞。
- 安全和可移植的程序應該不依賴於默認的棧限制,但是取而代之的是用pthread_attr_setstacksize分配足夠的棧大小。
- pthread_attr_getstackaddr和pthread_attr_setstackaddr函數可以被程序用於將棧設置在指定的內存區域。
在LC上的一些實際例子:
- 默認棧大小經常變化很大,最大值也變化很大,可能會依賴於每個節點的線程數目。
Node |
#CPUs |
Memory (GB) |
Default Size |
AMD Opteron |
8 |
16 |
2,097,152 |
Intel IA64 |
4 |
8 |
33,554,432 |
Intel IA32 |
2 |
4 |
2,097,152 |
IBM Power5 |
8 |
32 |
196,608 |
IBM Power4 |
8 |
16 |
196,608 |
IBM Power3 |
16 |
16 |
98,304 |
例子: 棧管理
Example Code - Stack Management 這個例子演示了如何去查詢和設定線程棧大小。 #include <pthread.h> #include <stdio.h> #define NTHREADS 4 #define N 1000 #define MEGEXTRA 1000000
pthread_attr_t attr;
void *dowork(void *threadid) { double A[N][N]; int i,j,tid; size_t mystacksize;
tid = (int)threadid; pthread_attr_getstacksize (&attr, &mystacksize); printf("Thread %d: stack size = %li bytes /n", tid, mystacksize); for (i=0; i<N; i++) for (j=0; j<N; j++) A[i][j] = ((i*j)/3.452) + (N-i); pthread_exit(NULL); }
int main(int argc, char *argv[]) { pthread_t threads[NTHREADS]; size_t stacksize; int rc, t;
pthread_attr_init(&attr); pthread_attr_getstacksize (&attr, &stacksize); printf("Default stack size = %li/n", stacksize); stacksize = sizeof(double)*N*N+MEGEXTRA; printf("Amount of stack needed per thread = %li/n",stacksize); pthread_attr_setstacksize (&attr, stacksize); printf("Creating threads with stack size = %li bytes/n",stacksize); for(t=0; t<NTHREADS; t++){ rc = pthread_create(&threads[t], &attr, dowork, (void *)t); if (rc){ printf("ERROR; return code from pthread_create() is %d/n", rc); exit(-1); } } printf("Created %d threads./n", t); pthread_exit(NULL); } |
線程管理 |
pthread_self () pthread_equal (thread1,thread2) |
- pthread_self返回調用該函數的線程的唯一,系統分配的線程ID。
- pthread_equal比較兩個線程ID,若不同返回0,否則返回非0值。
- 注意這兩個函數中的線程ID對象是不透明的,不是輕易能檢查的。因為線程ID是不透明的對象,所以C語言的==操作符不能用於比較兩個線程ID。
pthread_once (once_control, init_routine) |
- pthread_once 在一個進程中僅執行一次init_routine。任何線程第一次調用該函數會執行給定的init_routine,不帶參數,任何后續調用都沒有效果。
- init_routine函數一般是初始化的程序
- once_control參數是一個同步結構體,需要在調用pthread_once前初始化。例如:
pthread_once_t once_control = PTHREAD_ONCE_INIT;
- 互斥量(Mutex)是“mutual exclusion”的縮寫。互斥量是實現線程同步,和保護同時寫共享數據的主要方法
- 互斥量對共享數據的保護就像一把鎖。在Pthreads中,任何時候僅有一個線程可以鎖定互斥量,因此,當多個線程嘗試去鎖定該互斥量時僅有一個會成功。直到鎖定互斥量的線程解鎖互斥量后,其他線程才可以去鎖定互斥量。線程必須輪着訪問受保護數據。
- 互斥量可以防止“競爭”條件。下面的例子是一個銀行事務處理時發生了競爭條件:
Thread 1 |
Thread 2 |
Balance |
Read balance: $1000 |
|
$1000 |
|
Read balance: $1000 |
$1000 |
|
Deposit $200 |
$1000 |
Deposit $200 |
|
$1000 |
Update balance $1000+$200 |
|
$1200 |
|
Update balance $1000+$200 |
$1200 |
- 上面的例子,當一個線程使用共享數據資源時,應該用一個互斥量去鎖定“Balance”。
- 一個擁有互斥量的線程經常用於更新全局變量。確保了多個線程更新同樣的變量以安全的方式運行,最終的結果和一個線程處理的結果是相同的。這個更新的變量屬於一個“臨界區(critical section)”。
- 使用互斥量的典型順序如下:
- 創建和初始一個互斥量
- 多個線程嘗試去鎖定該互斥量
- 僅有一個線程可以成功鎖定改互斥量
- 鎖定成功的線程做一些處理
- 線程解鎖該互斥量
- 另外一個線程獲得互斥量,重復上述過程
- 最后銷毀互斥量
- 當多個線程競爭同一個互斥量時,失敗的線程會阻塞在lock調用處。可以用“trylock”替換“lock”,則失敗時不會阻塞。
- 當保護共享數據時,程序員有責任去確認是否需要使用互斥量。如,若四個線程會更新同樣的數據,但僅有一個線程用了互斥量,則數據可能會損壞。
互斥量(Mutex Variables) |
函數:
pthread_mutex_init (mutex,attr) pthread_mutex_destroy (mutex) pthread_mutexattr_init (attr) pthread_mutexattr_destroy (attr) |
用法:
- 互斥量必須用類型pthread_mutex_t類型聲明,在使用前必須初始化,這里有兩種方法可以初始化互斥量:
- 聲明時靜態地,如:
pthread_mutex_t mymutex = PTHREAD_MUTEX_INITIALIZER; - 動態地用pthread_mutex_init()函數,這種方法允許設定互斥量的屬性對象attr。
- 聲明時靜態地,如:
互斥量初始化后是解鎖的。
- attr對象用於設置互斥量對象的屬性,使用時必須聲明為pthread_mutextattr_t類型,默認值可以是NULL。Pthreads標准定義了三種可選的互斥量屬性:
-
- 協議(Protocol): 指定了協議用於阻止互斥量的優先級改變
- 優先級上限(Prioceiling):指定互斥量的優先級上限
- 進程共享(Process-shared):指定進程共享互斥量
注意所有實現都提供了這三個可先的互斥量屬性。
- pthread_mutexattr_init()和pthread_mutexattr_destroy()函數分別用於創建和銷毀互斥量屬性對象。
- pthread_mutex_destroy()應該用於釋放不需要再使用的互斥量對象。
互斥量(Mutex Variables) |
函數:
pthread_mutex_lock (mutex) pthread_mutex_trylock (mutex) pthread_mutex_unlock (mutex) |
用法:
- 線程用pthread_mutex_lock()函數去鎖定指定的mutex變量,若該mutex已經被另外一個線程鎖定了,該調用將會阻塞線程直到mutex被解鎖。
- pthread_mutex_trylock() will attempt to lock a mutex. However, if the mutex is already locked, the routine will return immediately with a "busy" error code. This routine may be useful in
- pthread_mutex_trylock()嘗試着去鎖定一個互斥量,然而,若互斥量已被鎖定,程序會立刻返回並返回一個忙錯誤值。該函數在優先級改變情況下阻止死鎖是非常有用的。
- 線程可以用pthread_mutex_unlock()解鎖自己占用的互斥量。在一個線程完成對保護數據的使用,而其它線程要獲得互斥量在保護數據上工作時,可以調用該函數。若有一下情形則會發生錯誤:
- 互斥量已經被解鎖
- 互斥量被另一個線程占用
- 互斥量並沒有多么“神奇”的,實際上,它們就是參與的線程的“君子約定”。寫代碼時要確信正確地鎖定,解鎖互斥量。下面演示了一種邏輯錯誤:
· Thread 1 Thread 2 Thread 3
· Lock Lock
· A = 2 A = A+1 A = A*B
· Unlock Unlock
|
|
Q:有多個線程等待同一個鎖定的互斥量,當互斥量被解鎖后,那個線程會第一個鎖定互斥量? A:除非線程使用了優先級調度機制,否則,線程會被系統調度器去分配,那個線程會第一個鎖定互斥量是隨機的。 |
例子:使用互斥量
Example Code - Using Mutexes 例程演示了線程使用互斥量處理一個點積(dot product)計算。主數據通過一個可全局訪問的數據結構被所有線程使用,每個線程處理數據的不同部分,主線程等待其他線程完成計算並輸出結果。 #include <pthread.h> #include <stdio.h> #include <malloc.h>
/* The following structure contains the necessary information to allow the function "dotprod" to access its input data and place its output into the structure. */
typedef struct { double *a; double *b; double sum; int veclen; } DOTDATA;
/* Define globally accessible variables and a mutex */
#define NUMTHRDS 4 #define VECLEN 100 DOTDATA dotstr; pthread_t callThd[NUMTHRDS]; pthread_mutex_t mutexsum;
/* The function dotprod is activated when the thread is created. All input to this routine is obtained from a structure of type DOTDATA and all output from this function is written into this structure. The benefit of this approach is apparent for the multi-threaded program: when a thread is created we pass a single argument to the activated function - typically this argument is a thread number. All the other information required by the function is accessed from the globally accessible structure. */
void *dotprod(void *arg) {
/* Define and use local variables for convenience */
int i, start, end, offset, len ; double mysum, *x, *y; offset = (int)arg;
len = dotstr.veclen; start = offset*len; end = start + len; x = dotstr.a; y = dotstr.b;
/* Perform the dot product and assign result to the appropriate variable in the structure. */
mysum = 0; for (i=start; i<end ; i++) { mysum += (x[i] * y[i]); }
/* Lock a mutex prior to updating the value in the shared structure, and unlock it upon updating. */ pthread_mutex_lock (&mutexsum); dotstr.sum += mysum; pthread_mutex_unlock (&mutexsum);
pthread_exit((void*) 0); }
/* The main program creates threads which do all the work and then print out result upon completion. Before creating the threads, the input data is created. Since all threads update a shared structure, we need a mutex for mutual exclusion. The main thread needs to wait for all threads to complete, it waits for each one of the threads. We specify a thread attribute value that allow the main thread to join with the threads it creates. Note also that we free up handles when they are no longer needed. */
int main (int argc, char *argv[]) { int i; double *a, *b; void *status; pthread_attr_t attr;
/* Assign storage and initialize values */ a = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double)); b = (double*) malloc (NUMTHRDS*VECLEN*sizeof(double));
for (i=0; i<VECLEN*NUMTHRDS; i++) { a[i]=1.0; b[i]=a[i]; }
dotstr.veclen = VECLEN; dotstr.a = a; dotstr.b = b; dotstr.sum=0;
pthread_mutex_init(&mutexsum, NULL);
/* Create threads to perform the dotproduct */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for(i=0; i<NUMTHRDS; i++) { /* Each thread works on a different set of data. The offset is specified by 'i'. The size of the data for each thread is indicated by VECLEN. */ pthread_create( &callThd[i], &attr, dotprod, (void *)i); }
pthread_attr_destroy(&attr);
/* Wait on the other threads */ for(i=0; i<NUMTHRDS; i++) { pthread_join( callThd[i], &status); }
/* After joining, print out the results and cleanup */ printf ("Sum = %f /n", dotstr.sum); free (a); free (b); pthread_mutex_destroy(&mutexsum); pthread_exit(NULL); } Serial version |
- 條件變量提供了另一種同步的方式。互斥量通過控制對數據的訪問實現了同步,而條件變量允許根據實際的數據值來實現同步。
- 沒有條件變量,程序員就必須使用線程去輪詢(可能在臨界區),查看條件是否滿足。這樣比較消耗資源,因為線程連續繁忙工作。條件變量是一種可以實現這種輪詢的方式。
- 條件變量往往和互斥一起使用
- 使用條件變量的代表性順序如下:
主線程(Main Thread) o 聲明和初始化需要同步的全局數據/變量(如“count”) o 生命和初始化一個條件變量對象 o 聲明和初始化一個相關的互斥量 o 創建工作線程A和B |
|
Thread A o 工作,一直到一定的條件滿足(如“count”等於一個指定的值) o 鎖定相關互斥量並檢查全局變量的值 o 調用pthread_cond_wait()阻塞等待Thread-B的信號。注意pthread_cond_wait()能夠自動地並且原子地解鎖相關的互斥量,以至於它可以被Thread-B使用。 o 當收到信號,喚醒線程,互斥量被自動,原子地鎖定。 o 顯式解鎖互斥量 o 繼續 |
Thread B o 工作 o 鎖定相關互斥量 o 改變Thread-A所等待的全局變量 o 檢查全局變量的值,若達到需要的條件,像Thread-A發信號。 o 解鎖互斥量 o 繼續 |
Main Thread Join / Continue |
條件變量(Condition Variables) |
Routines:
pthread_cond_init (condition,attr) pthread_cond_destroy (condition) pthread_condattr_init (attr) pthread_condattr_destroy (attr) |
Usage:
- 條件變量必須聲明為pthread_cond_t類型,必須在使用前初始化。有兩種方式可以初始條件變量:
- 聲明時靜態地。如:
pthread_cond_t myconvar = PTHREAD_COND_INITIALIZER; - 用pthread_cond_init()函數動態地。創建的條件變量ID通過condition參數返回給調用線程。該方式允許設置條件變量對象的屬性,attr。
- 聲明時靜態地。如:
- 可選的attr對象用於設定條件變量的屬性。僅有一個屬性被定義:線程共享(process-shared),可以使條件變量被其它進程中的線程看到。若要使用屬性對象,必須定義為pthread_condattr_t類型(可以指定為NULL設為默認)。
注意所有實現都提供了線程共享屬性。
- pthread_condattr_init()和pthread_condattr_destroy()用於創建和銷毀條件變量屬性對象。
- 條件變量不需要再使用時,應用pthread_cond_destroy()釋放條件變量。
條件變量(Condition Variables) |
在條件變量上等待(Waiting)和發送信號(Signaling)
函數:
pthread_cond_wait (condition,mutex) pthread_cond_signal (condition) pthread_cond_broadcast (condition) |
用法:
- pthread_cond_wait()阻塞調用線程直到指定的條件受信(signaled)。該函數應該在互斥量鎖定時調用,當在等待時會自動解鎖互斥量。在信號被發送,線程被激活后,互斥量會自動被鎖定,當線程結束時,由程序員負責解鎖互斥量。
- pthread_cond_signal()函數用於向其他等待在條件變量上的線程發送信號(激活其它線程)。應該在互斥量被鎖定后調用。
- 若不止一個線程阻塞在條件變量上,則應用pthread_cond_broadcast()向其它所以線程發生信號。
- 在調用pthread_cond_wait()前調用pthread_cond_signal()會發生邏輯錯誤。
|
使用這些函數時適當的鎖定和解鎖相關的互斥量是非常重要的。如:
|
例子:使用條件變量
Example Code - Using Condition Variables 例子演示了使用Pthreads條件變量的幾個函數。主程序創建了三個線程,兩個線程工作,根系“count”變量。第三個線程等待count變量值達到指定的值。 #include <pthread.h> #include <stdio.h>
#define NUM_THREADS 3 #define TCOUNT 10 #define COUNT_LIMIT 12
int count = 0; int thread_ids[3] = {0,1,2}; pthread_mutex_t count_mutex; pthread_cond_t count_threshold_cv;
void *inc_count(void *idp) { int j,i; double result=0.0; int *my_id = idp;
for (i=0; i<TCOUNT; i++) { pthread_mutex_lock(&count_mutex); count++;
/* Check the value of count and signal waiting thread when condition is reached. Note that this occurs while mutex is locked. */ if (count == COUNT_LIMIT) { pthread_cond_signal(&count_threshold_cv); printf("inc_count(): thread %d, count = %d Threshold reached./n", *my_id, count); } printf("inc_count(): thread %d, count = %d, unlocking mutex/n", *my_id, count); pthread_mutex_unlock(&count_mutex);
/* Do some work so threads can alternate on mutex lock */ for (j=0; j<1000; j++) result = result + (double)random(); } pthread_exit(NULL); }
void *watch_count(void *idp) { int *my_id = idp;
printf("Starting watch_count(): thread %d/n", *my_id);
/* Lock mutex and wait for signal. Note that the pthread_cond_wait routine will automatically and atomically unlock mutex while it waits. Also, note that if COUNT_LIMIT is reached before this routine is run by the waiting thread, the loop will be skipped to prevent pthread_cond_wait from never returning. */ pthread_mutex_lock(&count_mutex); if (count<COUNT_LIMIT) { pthread_cond_wait(&count_threshold_cv, &count_mutex); printf("watch_count(): thread %d Condition signal received./n", *my_id); } pthread_mutex_unlock(&count_mutex); pthread_exit(NULL); }
int main (int argc, char *argv[]) { int i, rc; pthread_t threads[3]; pthread_attr_t attr;
/* Initialize mutex and condition variable objects */ pthread_mutex_init(&count_mutex, NULL); pthread_cond_init (&count_threshold_cv, NULL);
/* For portability, explicitly create threads in a joinable state */ pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); pthread_create(&threads[0], &attr, inc_count, (void *)&thread_ids[0]); pthread_create(&threads[1], &attr, inc_count, (void *)&thread_ids[1]); pthread_create(&threads[2], &attr, watch_count, (void *)&thread_ids[2]);
/* Wait for all threads to complete */ for (i=0; i<NUM_THREADS; i++) { pthread_join(threads[i], NULL); } printf ("Main(): Waited on %d threads. Done./n", NUM_THREADS);
/* Clean up and exit */ pthread_attr_destroy(&attr); pthread_mutex_destroy(&count_mutex); pthread_cond_destroy(&count_threshold_cv); pthread_exit(NULL);
}
|
Pthread API的幾個特性在該教程中並沒有包含。把它們列在下面:
- 線程調度
- 線程如何調度的實現往往是不同的,在大多數情況下,默認的機制是可以勝任的。
- Pthreads API提供了顯式設定線程調度策略和優先級的函數,它們可以重載默認機制。
- API不需要實現去支持這些特性
- Keys:線程數據(TSD)
- 互斥量的Protocol屬性和優先級管理
- 跨進程的條件變量共享
- Thread Cancellation
- 取消線程(Thread Cancellation )
- 多線程和信號(Threads and Signals)
Pthread Functions |
|
Thread Management |
|
Thread-Specific Data |
|
Thread Cancellation |
|
pthread_getcancelstate |
|
Thread Scheduling |
|
Signals |
|
Pthread Attribute Functions |
|
Basic Management |
|
Detachable or Joinable |
|
Specifying Stack Information |
|
Thread Scheduling Attributes |
|
Mutex Functions |
|
Mutex Management |
|
Priority Management |
|
Mutex Attribute Functions |
|
Basic Management |
|
Sharing |
|
Protocol Attributes |
|
Priority Management |
|
Condition Variable Functions |
|
Basic Management |
|
Condition Variable Attribute Functions |
|
Basic Management |
|
Sharing |
|
- Author: Blaise Barney, Livermore Computing.
- "Pthreads Programming". B. Nichols et al. O'Reilly and Associates.
- "Threads Primer". B. Lewis and D. Berg. Prentice Hall
- "Programming With POSIX Threads". D. Butenhof. Addison Wesley
www.awl.com/cseng/titles/0-201-63392-2 - "Programming With Threads". S. Kleiman et al. Prentice Hall
-
本篇及其英文原文: http://download.csdn.net/source/992256
-
Programing with POSIX thread(強烈推薦): http://download.csdn.net/source/992239
-
Pthread Primer(強烈推薦): http://download.csdn.net/source/992213
author: david(Heaven.Hell.Or@gmail.com)
code page:http://code.google.com/p/heavenhell/
* PthreaddCond.cpp
*
* Created on: Nov 11, 2013
* Author: zsf
*/
簡單的生產者消費者模型
#include <stdio.h>
#include <pthread.h>
#define MAXSIZE 1024
struct Data {
char buf;
pthread_cond_t full;
pthread_cond_t empty;
pthread_mutex_t lock;
int writeops, readops;
};
struct Data data;
void init(struct Data *data) {
pthread_cond_init(&data->full, NULL);
pthread_cond_init(&data->empty, NULL);
pthread_mutex_init(&data->lock, NULL);
data->readops = 0;
data->writeops = 0;
}
void put(struct Data *data, int num) {
pthread_mutex_lock(&data->lock);
while (data->writeops != 0) {
printf("wait empty\n");
pthread_cond_wait(&data->empty, &data->lock);
}
data->buf = num;
data->writeops++;
pthread_cond_signal(&data->full);
pthread_mutex_unlock(&data->lock);
}
int get(struct Data *data) {
int num;
pthread_mutex_lock(&data->lock);
while (data->writeops == 0) {
printf("wait full\n");
pthread_cond_wait(&data->full, &data->lock);
}
num = data->buf;
data->writeops--;
pthread_cond_signal(&data->empty);
pthread_mutex_unlock(&data->lock);
return num;
}
void *product(void *) {
for (int i = 0; i < 10; i++) {
put(&data, i);
printf("put -->%d", i);
}
put(&data,-1);
printf("product over\n");
return NULL;
}
void *consumer(void *) {
int ret;
while (1) {
ret = get(&data);
if(ret==-1)break;
printf("get -->%d\n", ret);
}
printf("consumer stop\n");
return NULL;
}
int main() {
pthread_t pthread[2];
void *interval;
int tmp;
tmp = pthread_create(&pthread[0], NULL, product, NULL);
tmp = pthread_create(&pthread[1], NULL, consumer, NULL);
pthread_join(pthread[0], &interval);
pthread_join(pthread[1], &interval);
return 0;
}
/*