perl多線程使用


原文來自:博客園(華夏35度)http://www.cnblogs.com/zhangchaoyang 作者:Orisun

<<=========================threads===========================>>

#!/usr/bin/perl
use  threads ( 'yield' ,
             'stack_size'  => 64*4096,
             'exit'  => 'threads_only' ,
             'stringify' );
 
sub  start_thread {
    my  @args  = @_ ;
    print ( 'Thread started: ' , join ( ' ' , @args ), "\n" );
}
 
##創建線程的方法
# my $thr = threads->create('func_name', ...);
# my $thr = threads->create(sub { ... }, ...);
# my $thr = threads->create(\&func, ...);
# The "->new()" method is an alias for "->create()".
my  $thr  = threads->create( 'start_thread' , 'argument1' , 'argument2' );     #通過create創建線程。返回線程實例
$thr -> join ();                #等待線程結束
threads->create( sub  { print ( "I am a thread\n" ); })-> join ();                   #創建一個線程,沒有返回值。那這個線程實例如何訪問呢?
 
my  $thr2  = async { foreach  ( @ARGS ) { print "$_\n" ; } };                              #通過async使用匿名子例程創建線程
$thr2 -> join ();
if  ( my  $err  = $thr2 ->error()) {
    warn ( "Thread error: $err\n" );
}
 
# 在隱式的列表環境中調用thread
my  $thr3  = threads->create( sub  { return  ( qw/a b c/ ); });
# 在顯式的列表環境中調用thread
my  $thr4  = threads->create({ 'context'  => 'list' },
                          sub  { return  ( qw/a b c/ ); });
# 由於創建線程時使用的子例程返回的是列表,所以這里的join函數返回的也是列表
my  @results  = $thr3 -> join ();
print  "@results\n" ;
# 把線程從主線程中分離出來
# $thr->detach();        ##報錯:Cannot detach a joined thread,因為$thr已經調用過join()
$thr4 ->detach();     ##
$tid  = $thr4 ->tid();
print  "線程4ID:$tid\n" ;
 
# Get a thread's object
$thr6  = threads->self();
$thr7  = threads->object( $tid );
 
# Get a thread's ID
$tid  = threads->tid();
$tid  = "$thr7" ;     #根據線程實例獲得線程ID
 
# 給其他線程一個運行的機會
threads->yield();
yield();
 
# 返回未分離的線程列表
my  @threads  = threads->list();
my  $thread_count  = threads->list();
 
my  @running  = threads->list(threads::running);
my  @joinable  = threads->list(threads::joinable);
 
# 判斷兩個線程是相同
if  ( $thr4  == $thr2 ) {
    print  "thread4 equals to thread2.\n" ;
}
 
# 管理線程棧大小
$stack_size  = threads->get_stack_size();
$old_size  = threads->set_stack_size(32*4096);
 
# Create a thread with a specific context and stack size
my  $thr5  = threads->create({ 'context'     => 'list' ,
                            'stack_size'  => 32*4096,
                            'exit'        => 'thread_only'  },
                          \ &start_thread );
 
# Get thread's context
my  $wantarray  = $thr -> wantarray ();
print  $wantarray , "\n" ;
 
# Check thread's state
if  ( $thr5 ->is_running()) {
    sleep (1);
}
if  ( $thr5 ->is_joinable()) {
    $thr5 -> join ();
}
 
# Send a signal to a thread
$thr5 -> kill ( 'SIGUSR1' );
 
# Exit a thread
threads-> exit ();                                    

 

<<=========================Thread========================>>

$thread = Thread->new(\&start_sub)

$thread = Thread->new(\&start_sub,@args)

start_sub指定線程要執行的子例程,args是傳給子例程的參數。

lock  VARIABLE

給變量加鎖,直到鎖超出范圍。給變量加鎖只影響到lock函數的調用--即一個線程lock var1后,另一個線程再調用lovk var1時線程就會阻塞,但lock  VARIABLE並不影響正常的對變量的訪問。

如果鎖往的是一個容器(如哈希或數組),那么其中的每一個元素並沒有全部被鎖住。比如一個線程中調用lock  @arr,在另一個線程中調用lock $arr[3]時並不會阻塞。

async  BLOCK;

async函數創建並返回一個線程實例,該線程要執行的代碼全在BLOCK中,這里BLOCK是個匿名子例程,所以其后一定加分號。

Thread->self

返回調用Thread->self函數的線程實例。

Thread->list

返回non-joined和non-detached線程實例。

cond_waitLOCKED_VARIALBLE

cond_signal  LOCKED_VARIALBLE

cond_broadcast  LOCKED_VARIALBLE

上面3個函數主要用於線程問同步,都以一個已加鎖的變量作為輸入參數。當一個線程調用cond_wait后阻塞自己;當一個線程發出cond_broadcast后所有阻塞的線程得救;當一個線程發出cond_signal后只有一個阻塞的線程得救,至於是哪一個由系統內部決定。當然只有LOCKED_VARIALBLE參數相同時才為一組,大家才可以在一起玩同步。

yield

把CPU控制權交給另外一個線程,至於是哪個線程依賴於當時的運行環境。

join

等待一個線程結束並返回該線程結束時的返回值。

detach

分離的線程不允許被join。

equal

判斷兩個線程是否相同。

tid

返回線程的tid。tid是遞增的,main線程的tid為0。

done

判斷線程是否已經結束。

下面這3個函數在5005threads中還可以用,但是在ithreads中已經不可用了。

lock(\&sub)    eval    flags

<<============================threads::shared============================>>

默認下數據都是線程私有的,新創建的線程獲得已有變量的一份私有拷貝。threads::shared用於在線程之間共享數據結構,可共享的數據類型只有6種,標量數據、數組、散列、以及它們的引用。

聲明共享變量:

my ($scalar, @array, %hash);             

share($scalar);             

share(@array);           

 share(%hash);

share函數返回共享的值,這通常是一個引用。

也可以在編譯時標記變量為共享變量:

my ($var, %hash, @array) :shared;

my  ( $var , %hash , @array ) :shared;
              my  $bork ;
 
              # Storing scalars
              $var  = 1;
              $hash { 'foo' } = 'bar' ;
              $array [0] = 1.5;
 
              # Storing shared refs
              $var  = \ %hash ;
              $hash { 'ary' } = \ @array ;
              $array [1] = \ $var ;
 
              # 不能把非共享變量的引賦給一個共享變量,下面這3句是錯誤的
              #   $var = \$bork;                    # ref of non-shared variable
              #   $hash{'bork'} = [];               # non-shared array ref
              #   push(@array, { 'x' => 1 });       # non-shared hash ref

shared_clone REF

 my $obj = {'foo' => [qw/foo bar baz/]};           

 bless($obj, 'Foo');             

my cpy=sharedclone(obj); 

# Object status (i.e., the class an object is blessed into) is also  cloned.           

print(ref($cpy), "\n");         # Outputs 'Foo'

對於克隆空的數組或散列,下面用法是等價的:

var = &share([]);   # Same asvar = shared_clone([]);             

var = &share({});   # Same asvar = shared_clone({});

is_shared VARIABLE

判斷變量是否為共享變量,如果是則返回變量的內部ID(類似於refaddr函數),如果不是返回undef。

如果is_shared參數是數組或散列,它並不檢查容器中的元素是否為共享變量。如下

my  %hash  :shared;
              if  (is_shared( %hash )) {
                  print ( "\%hash is shared\n" );
              }
 
              $hash { 'elem' } = 1;
              if  (is_shared( $hash { 'elem' })) {                          ##返回undef
                  print ( "\$hash{'elem'} is in a shared hash\n" );
              }

 lock VARIABLE

不能對容器內部的變量進行加鎖:

 my %hash :shared;             

$hash{'foo'} = 'bar';           

 #lock($hash{'foo'});          # Error           

 lock(%hash);                  # Works

cond_wait VARIABLE

cond_signal VARIABLE

cond_broadcast VARIABLE

這3個函數就不說了,跟threads里的一樣。

cond_wait CONDVAR, LOCKVAR

當有其他線程signal第一個參數變量CONDVAR時,第二個參數變量LOCKVAR被解鎖。

cond_timedwait VARIABLE, ABS_TIMEOUT       

cond_timedwait CONDVAR, ABS_TIMEOUT, LOCKVAR

如果signal未到達,而timeout了,同樣會把變量解鎖。

#  創建一個共享的'Foo' object
       my  $foo  :shared = shared_clone({});
       bless ( $foo , 'Foo' );
 
       # 創建一個共享的 'Bar' object
       my  $bar  :shared = shared_clone({});
       bless ( $bar , 'Bar' );
 
       # 把'bar' 放到 'foo'里面
       $foo ->{ 'bar' } = $bar ;
 
       # 通過線程重新bless the objects
       threads->create( sub  {
           # Rebless the outer object
           bless ( $foo , 'Yin' );
 
           # 不能直接 rebless the inner object
           #bless($foo->{'bar'}, 'Yang');
 
           # 重新取回然后 rebless the inner object
           my  $obj  = $foo ->{ 'bar' };
           bless ( $obj , 'Yang' );
           $foo ->{ 'bar' } = $obj ;
 
       })-> join ();
 
       print ( ref ( $foo ),          "\n" );    # Prints 'Yin'
       print ( ref ( $foo ->{ 'bar' }), "\n" );    # Prints 'Yang'
       print ( ref ( $bar ),          "\n" );    # Also prints 'Yang'

注意:如果你還想使用threads,那么你必須在"use threads::shared"之前就"use threads",否則會報告異常。

如果你把一個數組、散列或它們的引用share以后,那么容器中的元素都會丟失。

my  @arr  = qw(foo bar baz) ;
         share( @arr );
         # @arr is now empty (i.e., == ());
 
         # Create a 'foo' object
         my  $foo  = { 'data'  => 99 };
         bless ( $foo , 'foo' );
 
         # Share the object
         share( $foo );        # Contents are now wiped out
         print ( "ERROR: \$foo is empty\n" )
             if  (! exists ( $foo ->{ 'data' }));

所以正確的做法是你應該先把一個空的容器share,然后再往里面添加元素。

<<========================Thread::Semaphore=============================>>

use  Thread::Semaphore;
            my  $s  = Thread::Semaphore->new();
            $s ->down();   # P操作
            # The guarded section is here
            $s ->up();     # V操作
 
            # Decrement the semaphore only if it would immediately succeed.
            if  ( $s ->down_nb()) {
                # 鄰界區在此
                $s ->up();
            }
 
            # 強制降低信號量即使他成為負數
            $s ->down_force();
 
            # 創建信號量時指定·初始值
            my  $s  = Thread::Semaphore->new( $initial_value );
            $s ->down( $down_value );
            $s ->up( $up_value );
            if  ( $s ->down_nb( $down_value )) {
                ...
                $s ->up( $up_value );
            }
            $s ->down_force( $down_value );

<<===========================Thread::Queue===================================>>

直接看程序是學習語言的快速方法,注釋得很清楚:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
use  strict;
            use  warnings;
 
            use  threads;
            use  Thread::Queue;
 
            my  $q  = Thread::Queue->new();     # 創建一個空的線程隊列
 
            # Worker線程
            my  $thr  = threads->create( sub  {
                                        while  ( my  $item  $q ->dequeue()) {
                                            #處理$item
                                        }
                                     })->detach();
 
            # 向線程發送 work
            $q ->enqueue( $item1 , ...);
 
 
            # 計算隊列中有多少項
            my  $left  $q ->pending();
 
            # 非阻塞地出隊
            if  ( defined ( my  $item  $q ->dequeue_nb())) {
                # 處理 $item
            }
 
            # 獲取隊列中的第2項,注意並沒有進行出幾隊操作
            my  $item  $q ->peek(1);
 
            # 在隊頭后面插入兩個元素
            $q ->insert(1,  $item1 $item2 );
 
            # 提取隊列中的最后兩個元素
            my  ( $item1 $item2 ) =  $q ->extract(-2, 2);

上面代碼中出現過的函數我就不介紹了。

下面的數據類型可以放入隊列:
普通標題數據;

標量引用;

數組引用;

哈希引用;

以上對象的組合。

my @ary = qw/foo bar baz/;         

 q->enqueue(\@ary);     ##copy the elements 'foo', 'bar' and 'baz' from @ary intoq。

而對於共享變量,是它的引用進入隊列,而沒有發生元素的深復制。

my  @ary  :shared = qw/foo bar baz/ ;
            $q ->enqueue(\ @ary );
 
            my  $obj  = &shared ({});
            $$obj { 'foo' } = 'bar' ;
            $$obj { 'qux' } = 99;
            bless ( $obj , 'My::Class' );
            $q ->enqueue( $obj );

->new()    ##創建新隊列

->new(LIST)  ##創建隊列時壓入元素

->enqueue(LIST)    #入隊

->dequeue()    #從隊中取出一個元素

->dequeue(COUNT)    #從隊中取出COUNT個元素,如果COUNT大於隊列長度,則阻塞,下面的方法不會阻塞。

->dequeue_nb()       

->dequeue_nb(COUNT)

->pending()

返回隊列中元素的個數。

{
       lock ( $q );   # 銷往隊列,以防止其他線程中修改和刪除隊列元素
       my  $item  = $q ->peek();
       if  ( $item  ...) {
           ...
       }
   }
   # 離開塊之后,隊列變量自動解鎖

->peek()      #取出隊首元素,並沒有出險

->peek(INDEX)    #取出指定下標的隊列元素,INDEX為負數時是從隊尾開始數起

->insert(INDEX,LIST)    #在指定的位置上插入一組元素,隊首元素的INDEX為0

->extract()

->extract(INDEX)       

->extract(INDEX, COUNT)

刪除並返回指定的元素。

原文來自:博客園(華夏35度)http://www.cnblogs.com/zhangchaoyang 作者:Orisun


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM