[ Perl ] 多線程 並發編程


https://www.cnblogs.com/yeungchie/

記錄一些常用的 模塊 / 方法 。

多線程

使用模塊 threads

use 5.010;
use threads;

# 定義一個需要並發的子函數
sub func {
  my $id = shift;
  sleep 1;
  print "This is thread - $id\n";
}

創建線程

  • new
sub start {
  my $id = shift;
  my $t = new threads \&func, $id;
  return $t;
}
  • create
sub start {
  my $id = shift;
  my $t = create threads \&func, $id;
  return $t;
}
  • async

可以不通過子函數來編寫需要並發的過程,類似一個 "lambda" 。

sub start {
  my $id = shift;
  my $t = async { &func( $id ) };
  return $t;
}

線程收屍

  • 阻塞 join
&start( 'join' )->join;
say 'Done';

This is thread - join
Done
# 父線程被子線程阻塞,成功收屍。

  • 非阻塞 detach
&start( 'detach' )->detach;
say 'Done';

Done
# 由於非阻塞,父線程已經退出,子線程變成孤兒線程,無法收屍。

數據共享

使用模塊 threads::shared

use threads::shared;

標記共享變量

有幾種不同的寫法

  • 依次標記 :shared
my $scalar :shared;
my @array  :shared;
my %hash   :shared;
  • 批量標記 :shared
my ( $scalar, @array, %hash ) :shared;
  • 用函數標記 share()
my ( $scalar, @array, %hash );
share $scalar;
share @array;
share %hash;

克隆 shared_clone

向共享的變量中加入新的元素時,需要注意的地方。

my @newArray = qw( YEUNG CHIE 1 2 3 );
my $clone = shared_clone [@newArray];
push @array, $clone;
$hash{ keyName } = $clone;

lock

多個線程同時編輯一個共享變量時,需要注意的地方。

經典的取錢問題:
1 - 輸出額度 $amount = 500
2 - withdraw() 函數模擬取錢,每次取 300
3 - 當 $amount < 300 時,則無法取錢

  • 沒加鎖的情況
my $amount :shared = 500;

sub withdraw {
    unless ( $amount < 300 ) {
        sleep 1;  # 睡眠一秒模擬延遲
        $amount -= 300;
    }
}

# 這里兩個線程模擬,兩次取錢同時進行
my $t1 = new threads \&withdraw;
my $t2 = new threads \&withdraw;
$t1->join;
$t2->join;

say $amount;

-100
# 結果被取了兩次,剩余額度為 -100

  • 加了鎖的情況

調整一下子函數 withdraw(), 加個鎖。

...
sub withdraw {
    lock $amount;
    unless ( $amount < 300 ) {
        sleep 1;
        $amount -= 300;
    }
}
...

200
# 結果正確

線程隊列

使用模塊 Thread::Queue

use Thread::Queue;

創建隊列

my $queue = new Thread::Queue;

入隊 enqueue

my $var = 'YEUNG';
$queue->enqueue( $var );
$queue->enqueue( qw( CHIE 1 2 3 ) );

出隊 dequeue

  • 默認出隊一個項目
say $queue->dequeue;

YEUNG

  • 指定多個項目出隊
say for $queue->dequeue( 3 );

CHIE
1
2

非阻塞出隊 dequeue_nb

  • 如果是阻塞出隊
my $queue = new Thread::Queue qw( YEUNG CHIE );
say while $_ = $queue->dequeue;

YEUNG
CHIE
# 程序會卡在這里,等待隊列中新的項目加入

  • 使用非阻塞出隊
my $queue = new Thread::Queue qw( YEUNG CHIE );
say while $_ = $queue->dequeue_nb;

YEUNG
CHIE

剩余 pending

pending 方法可以返回未出隊的項目數量。

my $queue = new Thread::Queue qw( YEUNG CHIE );
say $queue->dequeue;
say $queue->pending;
say $queue->dequeue;
say $queue->pending;

YEUNG
1
CHIE
0

查看 peek

只是看看但是不出隊。

my $queue = new Thread::Queue qw( YEUNG CHIE );
say $queue->peek;
say $queue->pending;
say $queue->peek( 2 );
say $queue->pending;

YEUNG
2
CHIE
2

入隊結束 end

除了上面用 dequeue_nb 非阻塞出隊,之外還可以用 end 方法來

my $queue = new Thread::Queue qw( YEUNG CHIE );
$queue->end;
say while $_ = $queue->dequeue;

# 這樣雖然沒有用 dequeue_nb 方法,程序也不會卡住了。

不過這個方法需要模塊版本 >= 3.01,一般系統自帶 Perl 是不支持的,但是我們也可以自己來實現這個效果:

  • 共享變量

    共享一個全局變量標記入隊結束。

    my $endFlag :shared;
    
  • 生產者線程

    當入隊結束時,$endFlag 賦值為真。

    $endFlag = 1;
    
  • 消費者線程

    循環操作非阻塞出隊。

    while ( 1 ) {
        my $item = $queue->dequeue_nb;
        if ( defined $item ) {
            say $item;
        }
        else {
            # 當出隊失敗且入隊結束時,退出循環
            last if $endFlag;
        }
    }
    

線程信號量

使用模塊 Thread::Semaphore

use Thread::Semaphore;

線程池

使用模塊 Thread::Pool

use Thread::Pool;

參考資料/拓展


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM