原文來自:博客園(華夏35度)http://www.cnblogs.com/zhangchaoyang 作者:Orisun
<<=========================threads===========================>>
#!/usr/bin/perl
use
threads (
'yield'
,
'stack_size'
=> 64*4096,
'exit'
=>
'threads_only'
,
'stringify'
);
sub
start_thread {
my
@args
=
@_
;
print
(
'Thread started: '
,
join
(
' '
,
@args
),
"\n"
);
}
##創建線程的方法
# my $thr = threads->create('func_name', ...);
# my $thr = threads->create(sub { ... }, ...);
# my $thr = threads->create(\&func, ...);
# The "->new()" method is an alias for "->create()".
my
$thr
= threads->create(
'start_thread'
,
'argument1'
,
'argument2'
);
#通過create創建線程。返回線程實例
$thr
->
join
();
#等待線程結束
threads->create(
sub
{
print
(
"I am a thread\n"
); })->
join
();
#創建一個線程,沒有返回值。那這個線程實例如何訪問呢?
my
$thr2
= async {
foreach
(
@ARGS
) {
print
"$_\n"
; } };
#通過async使用匿名子例程創建線程
$thr2
->
join
();
if
(
my
$err
=
$thr2
->error()) {
warn
(
"Thread error: $err\n"
);
}
# 在隱式的列表環境中調用thread
my
$thr3
= threads->create(
sub
{
return
(
qw/a b c/
); });
# 在顯式的列表環境中調用thread
my
$thr4
= threads->create({
'context'
=>
'list'
},
sub
{
return
(
qw/a b c/
); });
# 由於創建線程時使用的子例程返回的是列表,所以這里的join函數返回的也是列表
my
@results
=
$thr3
->
join
();
print
"@results\n"
;
# 把線程從主線程中分離出來
# $thr->detach(); ##報錯:Cannot detach a joined thread,因為$thr已經調用過join()
$thr4
->detach();
##
$tid
=
$thr4
->tid();
print
"線程4ID:$tid\n"
;
# Get a thread's object
$thr6
= threads->self();
$thr7
= threads->object(
$tid
);
# Get a thread's ID
$tid
= threads->tid();
$tid
=
"$thr7"
;
#根據線程實例獲得線程ID
# 給其他線程一個運行的機會
threads->yield();
yield();
# 返回未分離的線程列表
my
@threads
= threads->list();
my
$thread_count
= threads->list();
my
@running
= threads->list(threads::running);
my
@joinable
= threads->list(threads::joinable);
# 判斷兩個線程是相同
if
(
$thr4
==
$thr2
) {
print
"thread4 equals to thread2.\n"
;
}
# 管理線程棧大小
$stack_size
= threads->get_stack_size();
$old_size
= threads->set_stack_size(32*4096);
# Create a thread with a specific context and stack size
my
$thr5
= threads->create({
'context'
=>
'list'
,
'stack_size'
=> 32*4096,
'exit'
=>
'thread_only'
},
\
&start_thread
);
# Get thread's context
my
$wantarray
=
$thr
->
wantarray
();
print
$wantarray
,
"\n"
;
# Check thread's state
if
(
$thr5
->is_running()) {
sleep
(1);
}
if
(
$thr5
->is_joinable()) {
$thr5
->
join
();
}
# Send a signal to a thread
$thr5
->
kill
(
'SIGUSR1'
);
# Exit a thread
threads->
exit
();
|
<<=========================Thread========================>>
$thread = Thread->new(\&start_sub)
$thread = Thread->new(\&start_sub,@args)
start_sub指定線程要執行的子例程,args是傳給子例程的參數。
lock VARIABLE
給變量加鎖,直到鎖超出范圍。給變量加鎖只影響到lock函數的調用--即一個線程lock var1后,另一個線程再調用lovk var1時線程就會阻塞,但lock VARIABLE並不影響正常的對變量的訪問。
如果鎖往的是一個容器(如哈希或數組),那么其中的每一個元素並沒有全部被鎖住。比如一個線程中調用lock @arr,在另一個線程中調用lock $arr[3]時並不會阻塞。
async BLOCK;
async函數創建並返回一個線程實例,該線程要執行的代碼全在BLOCK中,這里BLOCK是個匿名子例程,所以其后一定加分號。
Thread->self
返回調用Thread->self函數的線程實例。
Thread->list
返回non-joined和non-detached線程實例。
cond_waitLOCKED_VARIALBLE
cond_signal LOCKED_VARIALBLE
cond_broadcast LOCKED_VARIALBLE
上面3個函數主要用於線程問同步,都以一個已加鎖的變量作為輸入參數。當一個線程調用cond_wait后阻塞自己;當一個線程發出cond_broadcast后所有阻塞的線程得救;當一個線程發出cond_signal后只有一個阻塞的線程得救,至於是哪一個由系統內部決定。當然只有LOCKED_VARIALBLE參數相同時才為一組,大家才可以在一起玩同步。
yield
把CPU控制權交給另外一個線程,至於是哪個線程依賴於當時的運行環境。
join
等待一個線程結束並返回該線程結束時的返回值。
detach
分離的線程不允許被join。
equal
判斷兩個線程是否相同。
tid
返回線程的tid。tid是遞增的,main線程的tid為0。
done
判斷線程是否已經結束。
下面這3個函數在5005threads中還可以用,但是在ithreads中已經不可用了。
lock(\&sub) eval flags
<<============================threads::shared============================>>
默認下數據都是線程私有的,新創建的線程獲得已有變量的一份私有拷貝。threads::shared用於在線程之間共享數據結構,可共享的數據類型只有6種,標量數據、數組、散列、以及它們的引用。
聲明共享變量:
my ($scalar, @array, %hash);
share($scalar);
share(@array);
share(%hash);
share函數返回共享的值,這通常是一個引用。
也可以在編譯時標記變量為共享變量:
my ($var, %hash, @array) :shared;
my
(
$var
,
%hash
,
@array
) :shared;
my
$bork
;
# Storing scalars
$var
= 1;
$hash
{
'foo'
} =
'bar'
;
$array
[0] = 1.5;
# Storing shared refs
$var
= \
%hash
;
$hash
{
'ary'
} = \
@array
;
$array
[1] = \
$var
;
# 不能把非共享變量的引賦給一個共享變量,下面這3句是錯誤的
# $var = \$bork; # ref of non-shared variable
# $hash{'bork'} = []; # non-shared array ref
# push(@array, { 'x' => 1 }); # non-shared hash ref
|
shared_clone REF
my $obj = {'foo' => [qw/foo bar baz/]};
bless($obj, 'Foo');
my cpy=sharedclone(obj);
# Object status (i.e., the class an object is blessed into) is also cloned.
print(ref($cpy), "\n"); # Outputs 'Foo'
對於克隆空的數組或散列,下面用法是等價的:
var = &share([]); # Same asvar = shared_clone([]);
var = &share({}); # Same asvar = shared_clone({});
is_shared VARIABLE
判斷變量是否為共享變量,如果是則返回變量的內部ID(類似於refaddr函數),如果不是返回undef。
如果is_shared參數是數組或散列,它並不檢查容器中的元素是否為共享變量。如下
my
%hash
:shared;
if
(is_shared(
%hash
)) {
print
(
"\%hash is shared\n"
);
}
$hash
{
'elem'
} = 1;
if
(is_shared(
$hash
{
'elem'
})) {
##返回undef
print
(
"\$hash{'elem'} is in a shared hash\n"
);
}
|
lock VARIABLE
不能對容器內部的變量進行加鎖:
my %hash :shared;
$hash{'foo'} = 'bar';
#lock($hash{'foo'}); # Error
lock(%hash); # Works
cond_wait VARIABLE
cond_signal VARIABLE
cond_broadcast VARIABLE
這3個函數就不說了,跟threads里的一樣。
cond_wait CONDVAR, LOCKVAR
當有其他線程signal第一個參數變量CONDVAR時,第二個參數變量LOCKVAR被解鎖。
cond_timedwait VARIABLE, ABS_TIMEOUT
cond_timedwait CONDVAR, ABS_TIMEOUT, LOCKVAR
如果signal未到達,而timeout了,同樣會把變量解鎖。
# 創建一個共享的'Foo' object
my
$foo
:shared = shared_clone({});
bless
(
$foo
,
'Foo'
);
# 創建一個共享的 'Bar' object
my
$bar
:shared = shared_clone({});
bless
(
$bar
,
'Bar'
);
# 把'bar' 放到 'foo'里面
$foo
->{
'bar'
} =
$bar
;
# 通過線程重新bless the objects
threads->create(
sub
{
# Rebless the outer object
bless
(
$foo
,
'Yin'
);
# 不能直接 rebless the inner object
#bless($foo->{'bar'}, 'Yang');
# 重新取回然后 rebless the inner object
my
$obj
=
$foo
->{
'bar'
};
bless
(
$obj
,
'Yang'
);
$foo
->{
'bar'
} =
$obj
;
})->
join
();
print
(
ref
(
$foo
),
"\n"
);
# Prints 'Yin'
print
(
ref
(
$foo
->{
'bar'
}),
"\n"
);
# Prints 'Yang'
print
(
ref
(
$bar
),
"\n"
);
# Also prints 'Yang'
|
注意:如果你還想使用threads,那么你必須在"use threads::shared"之前就"use threads",否則會報告異常。
如果你把一個數組、散列或它們的引用share以后,那么容器中的元素都會丟失。
my
@arr
=
qw(foo bar baz)
;
share(
@arr
);
# @arr is now empty (i.e., == ());
# Create a 'foo' object
my
$foo
= {
'data'
=> 99 };
bless
(
$foo
,
'foo'
);
# Share the object
share(
$foo
);
# Contents are now wiped out
print
(
"ERROR: \$foo is empty\n"
)
if
(!
exists
(
$foo
->{
'data'
}));
|
所以正確的做法是你應該先把一個空的容器share,然后再往里面添加元素。
<<========================Thread::Semaphore=============================>>
use
Thread::Semaphore;
my
$s
= Thread::Semaphore->new();
$s
->down();
# P操作
# The guarded section is here
$s
->up();
# V操作
# Decrement the semaphore only if it would immediately succeed.
if
(
$s
->down_nb()) {
# 鄰界區在此
$s
->up();
}
# 強制降低信號量即使他成為負數
$s
->down_force();
# 創建信號量時指定·初始值
my
$s
= Thread::Semaphore->new(
$initial_value
);
$s
->down(
$down_value
);
$s
->up(
$up_value
);
if
(
$s
->down_nb(
$down_value
)) {
...
$s
->up(
$up_value
);
}
$s
->down_force(
$down_value
);
|
<<===========================Thread::Queue===================================>>
直接看程序是學習語言的快速方法,注釋得很清楚:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
use
strict;
use
warnings;
use
threads;
use
Thread::Queue;
my
$q
= Thread::Queue->new();
# 創建一個空的線程隊列
# Worker線程
my
$thr
= threads->create(
sub
{
while
(
my
$item
=
$q
->dequeue()) {
#處理$item
}
})->detach();
# 向線程發送 work
$q
->enqueue(
$item1
, ...);
# 計算隊列中有多少項
my
$left
=
$q
->pending();
# 非阻塞地出隊
if
(
defined
(
my
$item
=
$q
->dequeue_nb())) {
# 處理 $item
}
# 獲取隊列中的第2項,注意並沒有進行出幾隊操作
my
$item
=
$q
->peek(1);
# 在隊頭后面插入兩個元素
$q
->insert(1,
$item1
,
$item2
);
# 提取隊列中的最后兩個元素
my
(
$item1
,
$item2
) =
$q
->extract(-2, 2);
|
上面代碼中出現過的函數我就不介紹了。
下面的數據類型可以放入隊列:
普通標題數據;
標量引用;
數組引用;
哈希引用;
以上對象的組合。
my @ary = qw/foo bar baz/;
q->enqueue(\@ary); ##copy the elements 'foo', 'bar' and 'baz' from @ary intoq。
而對於共享變量,是它的引用進入隊列,而沒有發生元素的深復制。
my
@ary
:shared =
qw/foo bar baz/
;
$q
->enqueue(\
@ary
);
my
$obj
=
&shared
({});
$$obj
{
'foo'
} =
'bar'
;
$$obj
{
'qux'
} = 99;
bless
(
$obj
,
'My::Class'
);
$q
->enqueue(
$obj
);
|
->new() ##創建新隊列
->new(LIST) ##創建隊列時壓入元素
->enqueue(LIST) #入隊
->dequeue() #從隊中取出一個元素
->dequeue(COUNT) #從隊中取出COUNT個元素,如果COUNT大於隊列長度,則阻塞,下面的方法不會阻塞。
->dequeue_nb()
->dequeue_nb(COUNT)
->pending()
返回隊列中元素的個數。
{
lock
(
$q
);
# 銷往隊列,以防止其他線程中修改和刪除隊列元素
my
$item
=
$q
->peek();
if
(
$item
...) {
...
}
}
# 離開塊之后,隊列變量自動解鎖
|
->peek() #取出隊首元素,並沒有出險
->peek(INDEX) #取出指定下標的隊列元素,INDEX為負數時是從隊尾開始數起
->insert(INDEX,LIST) #在指定的位置上插入一組元素,隊首元素的INDEX為0
->extract()
->extract(INDEX)
->extract(INDEX, COUNT)
刪除並返回指定的元素。