Perl 中的線程
本文首先概述了線程的概念和意義,接着回顧了 Perl 語言中對線程支持的演化歷史,然后通過一系列示例重點介紹了 Perl 中線程的實現,數據的共享以及線程間的同步,最后歸納並總結了全文主要內容。
線程概述
線程是一個單一的執行流程,它是所有程序執行過程中最小的控制單位,即能被 CPU 所調度的最小任務單元。線程與進程之間既有聯系,又完全不同。簡單地說,一個線程必然屬於某一個進程,而一個進程包含至少一個或者多個線程。早期的計算機系統一次只能運行一個程序,因此,當有多個程序需要執行的時候,唯一的辦法就是讓它們排成隊,按順序串行執行。進程的出現打破了這種格局,CPU 資源按時間片被分割開來,分配給不同的進程使用。這樣一來,從微觀上看進程的執行雖然仍是串行的,但是從宏觀上看,不同的程序已經是在並行執行了。如果我們把同樣的思想運用到進程上,很自然地就會把進程再細分成更小的執行單位,即線程。由於一個進程又往往需要同時執行多個類似的任務,因此這些被細分的線程之間可以共享相同的代碼段,數據段和文件句柄等資源。有了進程,我們可以在一台單 CPU 計算機系統上同時運行 Firefox 和 Microsoft Office Word 等多個程序;有了線程,我們可以使 Firefox 在不同的標簽里同時加載多個不同的頁面,在 Office Word 里編輯文檔的同時進行語法錯誤檢查。因此,線程給我們帶來了更高的 CPU 利用率、更快速的程序響應、更經濟地資源使用方式和對多 CPU 的體系結構更良好的適應性。
Perl 線程的歷史
5005threads 線程模型
Perl 對線程的支持最早可以追溯到 1998 年 7 月發布的 Perl v5.005。其發布申明指出,Perl v5.005 中加入了對操作系統級線程的支持,這個新特性是一個實驗性的產品,這也就是我們現在所稱的 5005threads 線程模型。對於 5005threads 線程模型來說,默認情況下,所有數據結構都是共享的,所以用戶必須負責這些共享數據結構的同步訪問。如今 5005threads 已經不再被推薦實用,Perl v5.10 以后的版本里,也將不會再支持 5005threads 線程模型。
ithreads 線程模型
2000 年 5 月發布的 Perl v5.6.0 中開始引入了一個全新的線程模型,即 interpreter threads, 或稱為 ithreads,也正是在這個版本的發布申明中第一次提出了 5005threads 線程模型將來可能會被禁用的問題。盡管如此,ithreads 在那個時候還是一個新的實驗性的線程模型,用戶並不能直接使用它,唯一的辦法是通過 fork 函數模擬。經過兩年時間的發展,到 2002 年 7 月,Perl v5.8.0 正式發布,這時 ithreads 已經是一個相對成熟的線程模型,發布申明中也鼓勵用戶從老的 5005threads 線程模型轉換到新的 ithreads 線程模型,並明確指出 5005threads 線程模型最終將被淘汰。本文后面所討論的所有內容也都是基於新的 ithreads 線程模型。在 ithreads 線程模型中,最與眾不同的特點就在於默認情況一下一切數據結構都不是共享的,這一點我們會在后面內容中有更深刻的體會。
現有環境支持哪種線程模型
既然 Perl 中有可能存在兩種不同的線程模型,我們很自然地就需要判斷現有 Perl 環境到底支持的是哪一種線程實現方式。歸納起來,我們有兩種方法:
- 在 shell 里,我們可以通過執行 perl – V | grep usethreads 命令來獲取當前線程模型的相關信息,例如
清單 1. shell 中查詢 Perl 當前線程模型
> perl -V | grep use.*threads config_args='-des -Doptimize=-O2 -g -pipe -m32 -march=i386 \ -mtune=pentium4 -Dversion=5.8.5 -Dmyhostname=localhost -Dperladmin=root@localhost -Dcc=gcc -Dcf_by=Red Hat, Inc. -Dinstallprefix=/usr -Dprefix=/usr -Darchname=i386-linux -Dvendorprefix=/usr -Dsiteprefix=/usr -Duseshrplib -Dusethreads -Duseithreads -Duselargefiles -Dd_dosuid -Dd_semctl_semun -Di_db -Ui_ndbm -Di_gdbm -Di_shadow -Di_syslog -Dman3ext=3pm -Duseperlio -Dinstallusrbinperl -Ubincompat5005 -Uversiononly -Dpager=/usr/bin/less -isr -Dinc_version_list=5.8.4 5.8.3 5.8.2 5.8.1 5.8.0' usethreads=define use5005threads=undef useithreads=define usemultiplicity=define
從結果中不難看出,在當前的 Perl 環境中提供了對 ithreads 線程模型的支持。
- 在 Perl 程序中,我們也可以通過使用 Config 模塊來動態獲取 Perl 線程模型的相關信息,例如
清單 2. Perl 程序中動態獲取當前 Perl 線程模型
1 #!/usr/bin/perl 2 use Config; 3 4 if( $Config{useithreads} ) { 5 printf("Hello ithreads\n") 6 } 7 elsif( $Config{use5005threads} ) { 8 printf("Hello 5005threads\n"); 9 } 10 else { 11 printf("Can not support thread in your perl environment\n"); 12 exit( 1 ); 13 }
值得一提的是,對於 5005threads 和 ithreads 線程模型,Perl 同時只能支持其中的一種。你不可能在某一個 Perl 環境中同時使用這兩種線程模型。本文后面討論的所有內容都是基於 ithreads 線程模型的。
Perl 線程的生命周期
創建線程
線程作為 Perl 中的一種實體,其一生可以粗略的分為創建,運行與退出這三個階段。創建使得線程從無到有,運行則是線程完成其主要工作的階段,退出自然就是指線程的消亡。線程的運行和普通函數的執行非常類似,有其入口參數,一段特定的代碼流程以及執行完畢后返回的一個或一組結果,唯一與普通函數調用的不同之處就在於新建線程的執行與當前線程的執行是並行的。
Perl 里創建一個新的線程非常簡單,主要有兩種方法,他們分別是:
- 使用 threads 包的 create() 方法,例如
清單 3. 通過 create() 方法創建線程
1 use threads; 2 3 sub say_hello 4 { 5 printf("Hello thread! @_.\n"); 6 return( rand(10) ); 7 } 8 my $t1 = threads->create( \&say_hello, "param1", "param2" ); 9 my $t2 = threads->create( "say_hello", "param3", "param4" ); 10 my $t3 = threads->create( 11 sub { 12 printf("Hello thread! @_\n"); 13 return( rand(10) ); 14 }, "param5", "param6" );
- 使用 async{} 塊創建線程,例如
清單 4. 通過 async{} 塊創建線程
1 #!/usr/bin/perl 2 3 use threads; 4 my $t4 = async{ 5 printf("Hello thread!\n"); 6 };
join 方法和 detach 方法
線程一旦被成功創建,它就立刻開始運行了,這個時候你面臨兩種選擇,分別是 join 或者 detach 這個新建線程。當然你也可以什么都不做,不過這可不是一個好習慣,后面我們會解釋這是為什么。
我們先來看看 join 方法, 這也許是大多數情況下你想要的。從字面上來理解,join 就是把新創建的線程結合到當前的主線程中來,把它當成是主線程的一部分,使他們合二為一。join 會觸發兩個動作,首先,主線程會索取新建線程執行結束以后的返回值;其次,新建線程在執行完畢並返回結果以后會自動釋放它自己所占用的系統資源。例如
清單 5. 使用 join() 方法收割新建線程
1 #!/usr/bin/perl 2 3 use threads; 4 5 sub func { 6 sleep(1); 7 return(rand(10)); 8 } 9 my $t1 = threads->create( \&func ); 10 my $t2 = threads->create( \&func ); 11 printf("do something in the main thread\n"); 12 my $t1_res = $t1->join(); 13 my $t2_res = $t2->join(); 14 printf("t1_res = $t1_res\nt2_res = $t2_res\n");
由此我們不難發現,調用 join 的時機是一個十分有趣的問題。如果調用 join 方法太早,新建線程尚未執行完畢,自然就無法返回任何結果,那么這個時候,主線程就不得不被阻塞,直到新建線程執行完畢之后,才能獲得返回值,然后資源會被釋放,join 才能結束,這在很大程度上破話了線程之間的並行性。相反,如果調用 join 方法太晚,新建線程早已執行完畢,由於一直沒有機會返回結果,它所占用的資源就一直無法得到釋放,直到被 join 為止,這在很大程度上浪費了寶貴的系統資源。因此,join 新建線程的最好時機應該是在它剛剛執行完畢的時候,這樣既不會阻塞當前線程的執行,又可以及時釋放新建線程所占用的系統資源。
我們再來看看 detach 方法,這也許是最省心省力的處理方法了。從字面上來理解,detach 就是把新創建的線程與當前的主線程剝離開來,讓它從此和主線程無關。當你使用 detach 方法的時候,表明主線程並不關心新建線程執行以后返回的結果,新建線程執行完畢后 Perl 會自動釋放它所占用的資源。例如
清單 6. 使用 detach() 方法剝離線程
1 #!/usr/bin/perl 2 3 use threads; 4 5 use Config; 6 7 sub say_hello { 8 my ( $name ) = @_; 9 printf("Hello World! I am $name.\n"); 10 } 11 my $t1 = threads->create( \&say_hello, "Alex" ); 12 $t1->detach(); 13 printf("doing something in main thread\n"); 14 sleep(1);
一個新建線程一旦被 detach 以后,就無法再 join 了。當你使用 detach 方法剝離線程的時候,有一點需要特別注意,那就是你需要保證被創建的線程先於主線程結束,否則你創建的線程會被迫結束,除非這種結果正是你想要的,否則這也許會造成異常情況的出現,並增加程序調試的難度。
本節的開始我們提到,新線程被創建以后,如果既不 join,也不 detach 不是一個好習慣,這是因為除非明確地調用 detach 方法剝離線程,Perl 會認為你也許要在將來的某一個時間點調用 join,所以新建線程的返回值會一直被保存在內存中以備不時之需,它所占用的系統資源也一直不會得到釋放。然而實際上,你打算什么也不做,因此寶貴的系統資源直到整個 Perl 應用結束時才被釋放。同時,由於你即沒有調用 join 有沒有調用 detach,應用結束時 Perl 還會返回給你一個線程非正常結束的警告。
線程的消亡
大多數情況下,你希望你創建的線程正常退出,這就意味着線程所對應的函數體在執行完畢后返回並釋放資源。例如在清單 5 的示例中,新建線程被 join 以后的退出過程。可是,如果由於 detach 不當或者由於主線因某些意外的異常提前結束了,盡管它所創建的線程可能尚未執行完畢,但是他們還是會被強制中止,正所謂皮之不存,毛將焉附。這時你也許會得到一個類似於“Perl exited with active threads”的警告。
當然,你也可以顯示地調用 exit() 方法來結束一個線程,不過值得注意的是,默認情況下,如果你在一個線程中調用了 exit() 方法, 其他線程都會隨之一起結束,在很多情況下,這也許不是你想要的,如果你希望 exit() 方法只在調用它的線程內生效,那么你在創建該線程的時候就需要設置’ exit ’ => ’ thread_only ’。例如
清單 7. 為某個線程設置’ exit ’ => ’ thread_only ’屬性
1 #!/usr/bin/perl 2 3 4 use threads; 5 6 sub say_hello { 7 printf("Hello thread! @_.\n"); 8 sleep(10); 9 printf("Bye\n"); 10 } 11 sub quick_exit { 12 printf("I will be exit in no time\n"); 13 exit(1); 14 } 15 my $t1 = threads->create( \&say_hello, "param1", "param2" ); 16 my $t2 = threads->create( {'exit'=>'thread_only'}, \&quick_exit ); 17 $t1->join(); 18 $t2->join();
如果你希望每個線程的 exit 方法都只對自己有效,那么在每次創建一個新線程的時候都去要顯式設置’ exit ’ => ’ thread_only ’屬性顯然有些麻煩,你也可以在引入 threads 包的時候設置這個屬性在全局范圍內有效,例如
清單 8. 設置’ exit ’ => ’ thread_only ’為全局屬性
1 use threads ('exit' => 'threads_only'); 2 3 sub func { 4 ... 5 if( $condition ) { 6 exit(1); 7 } 8 } 9 my $t1 = threads->create( \&func ); 10 my $t2 = threads->create( \&func ); 11 $t1->join(); 12 $t2->join();
共享與同步
threads::shared
和現有大多數線程模型不同,在 Perl ithreads 線程模型中,默認情況下任何數據結構都不是共享的。當一個新線程被創建以后,它就已經包含了當前所有數據結構的一份私有拷貝,新建線程中對這份拷貝的數據結構的任何操作都不會在其他線程中有效。因此,如果需要使用任何共享的數據,都必須顯式地申明。threads::shared 包可以用來實現線程間共享數據的目的。
清單 9. 在線程中申明和使用共享數據
1 #!/usr/bin/perl 2 3 use threads; 4 use threads::shared; 5 use strict; 6 7 my $var :shared = 0; # use :share tag to define 8 my @array :shared = (); # use :share tag to define 9 my %hash = (); 10 share(%hash); # use share() funtion to define 11 12 sub start { 13 $var = 100; 14 @array[0] = 200; 15 @array[1] = 201; 16 $hash{'1'} = 301; 17 $hash{'2'} = 302; 18 } 19 20 sub verify { 21 sleep(1); # make sure thread t1 execute firstly 22 printf("var = $var\n"); # var=100 23 for(my $i = 0; $i < scalar(@array); $i++) { 24 printf("array[$i] = $array[$i]\n"); # array[0]=200; array[1]=201 25 } 26 27 foreach my $key ( sort( keys(%hash) ) ) { 28 printf("hash{$key} = $hash{$key}\n"); # hash{1}=301; hash{2}=302 29 } 30 } 31 my $t1 = threads->create( \&start ); 32 my $t2 = threads->create( \&verify ); 33 $t1->join(); 34 $t2->join();
鎖
多線程間既然有了共享的數據,那么就必須對共享數據進行小心地訪問,否則,沖突在所難免。Perl ithreads 線程模型中內置的 lock 方法實現了線程間共享數據的鎖機制。有趣的是,並不存在一個 unlock 方法用來顯式地解鎖,鎖的生命周期以代碼塊為單位,也就是說,當 lock 操作所在的代碼塊執行結束之后,也就是鎖被隱式釋放之時。例如
清單 10. 線程中的鎖機制
1 use threads::shared; 2 3 # in thread 1 4 { 5 lock( $share ); # lock for 3 seconds 6 sleep(3); # other threads can not lock again 7 } 8 # unlock implicitly now after the block 9 # in thread 2 10 { 11 lock($share); # will be blocked, as already locked by thread 1 12 $share++; # after thread 1 quit from the block 13 } 14 # unlock implicitly now after the block
上面的示例中,我們在 thread 1 中使用 lock 方法鎖住了一個普通的標量,這會導致 thread 2 在試圖獲取 $share 變量的鎖時被阻塞,當 thread 1 從調用 lock 的代碼塊中退出時,鎖被隱式地釋放,從而 thread 2 阻塞結束,lock 成功以后,thread 2 才可以執行 $share++ 的操作。對於數組和哈希表來說,lock 必須用在整個數據結構上,而不是用在數組或哈希表的某一個元素上。例如
清單 11. 在數組或哈希表上使用鎖機制
1 use threads; 2 use threads::shared; 3 { 4 lock(@share); # the array has been locked 5 lock(%hash); # the hash has been locked 6 sleep(3); # other threads can not lock again 7 } 8 { 9 lock($share[1]); # error will occur 10 lock($hash{key}); # error will occur 11 }
假如一個線程對某一個共享變量實施了鎖操作,在它沒有釋放鎖之前,如果另外一個線程也對這個共享變量實施鎖操作,那么這個線程就會被阻塞,阻塞不會被自動中止而是直到前一個線程將鎖釋放為止。這樣的模式就帶來了我們常見的死鎖問題。例如
清單 12. 線程中的死鎖
1 use threads; 2 use threads::shared; 3 # in thread 1 4 { 5 lock($a); # lock for 3 seconds 6 sleep(3); # other threads can not lock again 7 lock($b); # dead lock here 8 } 9 10 # in thread 2 11 { 12 lock($b); # will be blocked, as already locked by thread 1 13 sleep(3); # after thread 1 quit from the block 14 lock($a); # dead lock here 15 }
死鎖常常是多線程程序中最隱蔽的問題,往往難以發現與調試,也增加了排查問題的難度。為了避免在程序中死鎖的問題,在程序中我們應該盡量避免同時獲取多個共享變量的鎖,如果無法避免,那么一是要盡量使用相同的順序來獲取多個共享變量的鎖,另外也要盡可能地細化上鎖的粒度,減少上鎖的時間。
信號量
Thread::Semaphore 包為線程提供了信號量的支持。你可以創建一個自己的信號量,並通過 down 操作和 up 操作來實現對資源的同步訪問。實際上,down 操作和 up 操作對應的就是我們所熟知的 P 操作和 V 操作。從內部實現上看,Thread::Semaphore 本質上就是加了鎖的共享變量,無非是把這個加了鎖的共享變量封裝成了一個線程安全的包而已。由於信號量不必與任何變量綁定,因此,它非常靈活,可以用來控制你想同步的任何數據結構和程序行為。例如
清單 13. 線程中的信號量
1 use threads; 2 use threads::shared; 3 use Thread::Semaphore; 4 5 my $s = Thread::Semaphore->new(); 6 $s->down(); # P operation 7 ... 8 $s->up(); # V operation
從本質上說,信號量是一個共享的整型變量的引用。默認情況下,它的初始值為 1,down 操作使它的值減 1,up 操作使它的值加 1。當然,你也可以自定義信號量初始值和每次 up 或 down 操作時信號量的變化。例如
清單 14. 線程中的信號量
1 use threads; 2 use Thread::Semaphore; 3 4 my $s = Thread::Semaphore->new(5); 5 printf("s = " . ${$s} . "\n"); # s = 5 6 $s->down(3); 7 printf("s = " . ${$s} . "\n"); # s = 2 8 ... 9 $s->up(4); 10 printf("s = " . ${$s} . "\n"); # s = 6
線程隊列
Thread::Queue 包為線程提供了線程安全的隊列支持。與信號量類似,從內部實現上看,Thread::Queue 也是把一個通過鎖機制實現同步訪問的共享隊列封裝成了一個線程安全的包,並提供統一的使用接口。Thread::Queue 在某些情況下可以大大簡化線程間通信的難度和成本。例如在生產者 - 消費者模型中,生產者可以不斷地在線程隊列上做 enqueue 操作,而消費者只需要不斷地在線程隊列上做 dequeue 操作,這就很簡單地實現了生產者和消費者之間同步的問題。例如
清單 15. 生產者 - 消費者模型中對線程隊列的使用
1 #!/usr/bin/perl 2 3 use threads; 4 use Thread::Queue; 5 6 my $q = Thread::Queue->new(); 7 sub produce { 8 my $name = shift; 9 while(1) { 10 my $r = int(rand(100)); 11 $q->enqueue($r); 12 printf("$name produce $r\n"); 13 sleep(int(rand(3))); 14 } 15 } 16 17 sub consume { 18 my $name = shift; 19 while(my $r = $q->dequeue()) { 20 printf("consume $r\n"); 21 } 22 } 23 24 my $producer1 = threads->create(\&produce, "producer1"); 25 my $producer2 = threads->create(\&produce, "producer2"); 26 my $consumer1 = threads->create(\&consume, "consumer2"); 27 28 $producer1->join(); 29 $producer2->join(); 30 $consumer1->join();
其他有用的非核心包
本文前面討論的所有內容都在 Perl 線程核心包的范疇之內。其實 CPAN 上還有其他一些與線程相關的非核心包,它們往往也會給 Perl 線程的使用帶來很大的便利,這里我們選出兩個稍加介紹,拋磚引玉。
Thread::Pool 包允許你在程序中創建一批線程去完成多個類似的任務。例如當你希望創建一個多線程程序去完成檢驗 1000 個 ip 地址是否都能 ping 通的任務時,Thread::Pool 包可以給你帶來便利。
Thread::RWLock 包為線程中的讀寫操作提供了鎖機制的支持。例如當你有多個 reader 和 writer 線程共同訪問某一個或幾個文件時,Thread::RWLock 包可以給你帶來便利。
總結
本文主要介紹了 Perl 中線程的使用方法,包括線程的創建、執行與消亡,如何在線程中使用共享變量並通過鎖機制、信號量和線程隊列的方法來實現線程間的同步。Perl ithreads 線程模型與主流線程模型最大的不同之處在於默認情況下任何數據結構都是非共享的,或者說 Perl 中的 ithreads 是一個“非輕量級”的線程模型。雖然這樣的線程模型增加了程序的開銷,但它並不會在線程的功能性上打折扣,同時它也使得線程間的通訊和共享變得更加簡單。這也符合了 Perl 一貫的簡單而強大的理念和原則。
原文: https://www.ibm.com/developerworks/cn/linux/l-cn-perl-thread/