php 使用beanstalk 消息隊列


Beanstalkd 消息隊列

一.基本信息
Beanstalkd,一個高性能、輕量級的分布式內存隊列系統,最初設計的目的是想通過后台異步執行耗時的任務來降低高容量Web應用系統的頁面訪問延遲,支持過有9.5 million用戶的Facebook Causes應用。后來開源,現在有PostRank大規模部署和使用,每天處理百萬級任務。Beanstalkd是典型的類Memcached設計,協議和使用方式都是同樣的風格,所以使用過memcached的用戶會覺得Beanstalkd似曾相識。

 

二 特性

Beanstalkd 是一個輕量級消息中間件,它最大特點是將自己定位為基於管道  (tube) 和任務 (job) 的工作隊列 (work-queue):

Beanstalkd 支持任務優先級 (priority), 延時 (delay), 超時重發 (time-to-run) 和預留 (buried), 能夠很好的支持分布式的后台任務和定時任務處理。

它的內部實現采用 libevent, 服務器-客戶端之間用類似 memcached 的輕量級通訊協議,具有有很高的性能。

盡管是內存隊列, beanstalkd 提供了 binlog 機制, 當重啟 beanstalkd 時,當前任務狀態能夠從紀錄的本地 binlog 中恢復。

1)管道(tube)

管道類似於消息主題 (topic), 在一個 Beanstalkd 中可以支持多個管道, 每個管道都有自己的發布者 (producer) 和消費者 (consumer). 管道之間互相不影響。

2)任務(job)

Beanstalkd 用任務 (job) 代替消息 (message) 的概念。與消息不同,任務有一系列狀態:

READY- 需要立即處理的任務,當延時 (DELAYED) 任務到期后會自動成為當前任務;
DELAYED- 延遲執行的任務, 當消費者處理任務后, 可以用將消息再次放回 DELAYED 隊列延遲執行;
RESERVED- 已經被消費者獲取, 正在執行的任務。Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;
BURIED- 保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回隊列;
DELETED- 消息被徹底刪除。Beanstalkd 不再維持這些消息。

3)任務優先級 (priority)

任務 (job) 可以有 0~2^32 個優先級, 0 代表最高優先級。 beanstalkd 采用最大最小堆 (Min-max heap) 處理任務優先級排序, 任何時刻調用 reserve 命令的消費者總是能拿到當前優先級最高的任務, 時間復雜度為 O(logn).

4)延時任務 (delay)

有兩種方式可以延時執行任務 (job): 生產者發布任務時指定延時;或者當任務處理完畢后, 消費者再次將任務放入隊列延時執行 (RELEASE with <delay>)。

5)任務超時重發 (time-to-run)

Beanstalkd 把任務返回給消費者以后:消費者必須在預設的 TTR (time-to-run) 時間內發送 delete / release/ bury 改變任務狀態;否則 Beanstalkd 會認為消息處理失敗,然后把任務交給另外的消費者節點執行。如果消費者預計在 TTR (time-to-run) 時間內無法完成任務, 也可以發送 touch 命令, 它的作用是讓 Beanstalkd 從系統時間重新計算 TTR (time-to-run).

6)任務預留 (buried)

如果任務因為某些原因無法執行, 消費者可以把任務置為 buried 狀態讓 Beanstalkd 保留這些任務。管理員可以通過 peek buried 命令查詢被保留的任務,並且進行人工干預。簡單的, kick <n> 能夠一次性把 n 條被保留的任務踢回隊列。

 

二 安裝

請看安裝教程:http://www.cnblogs.com/sien6/p/8177023.html

 

三 Beanstalkd 協議

Beanstalkd 采用類 memcached 協議, 客戶端通過文本命令與服務器交互。

這些命令可以簡單的分成三組:

1)生產類 - use <tube> / put <priority> <delay> <ttr> [bytes]:  

生產者用 use 選擇一個管道 (tube), 然后用 put 命令向管道發布任務 (job).    

2)消費類  - watch <tubes> / reserve / delete <id> / release <id> <priority> <delay> / bury <id> / touch <id>

消費者用 watch 選擇多個管道 (tube), 然后用 reserve 命令獲取待執行的任務,這個命令是阻塞的。客戶端直到有任務可執行才返回。當任務處理完畢后, 消費者可以徹底刪除任務 (DELETE), 釋放任務讓別人處理 (RELEASE), 或者保留 (BURY) 任務。

3) 維護類 - peek job / peek delayed / peek ready / peek buried / kick <n>

用於維護管道內的任務狀態, 在不改變任務狀態的條件下獲取任務。可以用消費類命令改變這些任務的狀態。

被保留 (buried) 的任務可以用 kick 命令 "踢" 回隊列。

 

應用:

生成者:

require_once 'beanstalk/Client.php';
$config = include 'beanstalk/config.php';
$beanstalk = new Beanstalk\Client($config);
$beanstalk->connect();
$beanstalk->useTube('u_tube'); //使用u_tabe 管道
$param = $argv;
$beanstalk->put(0,0,10,json_encode($param));

 

消費者:

require_once 'beanstalk/Client.php';
$config = include 'beanstalk/config.php';
$Beanstalk= new Beanstalk\Client($config);
$Beanstalk->connect();
$Beanstalk->useTube('u_tube');
$Beanstalk->watch('u_tube');    // 監控u_tube 通道
$Beanstalk->ignore('default');  // 取消默認通道監控

while(true){    // 持續監控
    $job = $Beanstalk->reserve();   // 取出待執行任務,並執行
//    print_r($Beanstalk->statsTube('foo'));
    if($job){
        $Beanstalk->bury($job['id'],'u_tube');  // 任務加入保留隊列
    }
//    $Beanstalk->kick(3);  // 開啟保留隊列
    $Beanstalk->delete($job['id']);     //刪除吧任務
}

 

參考:http://blog.csdn.net/black_ox/article/details/24792489

https://www.cnblogs.com/fuland/p/4245386.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM