一:介紹
Beanstalkd 是一個輕量級的內存型隊列。它是典型的類Memcached設計,協議和使用方式都是同樣風格。
github:https://github.com/beanstalkd
官網:https://beanstalkd.github.io/
二:功能特性
2.1 優先級
任務job可以有0~2^32 個優先級, 0 代表最高優先級,默認優先級為1024。
2.2 延遲 delay
比如說多長時間后執行某個任務
2.3 持久化
可以通過binlog將job及其狀態記錄到文件里面,在Beanstalkd下次啟動時可以通過讀取binlog來恢復之前的job及狀態。
2.4 超時控制
為了防止某個consumer長時間占用任務但不能處理的情況,Beanstalkd為reserve操作設置了timeout時間,如果該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其他consumer執行。
2.5 分布式容錯
因為它是類Memcached設計,beanstalkd各個server之間並不知道彼此的存在,都是通過client來實現分布式以及根據tube名稱去特定server獲取job。
三:使用場景
- 用作延時隊列:比如可以用於如果用戶30分鍾內不操作,任務關閉。
- 用作定時任務:比如可以用於專門的后台任務。
- 用作異步操作:這是所有消息隊列都最常用的,先將任務仍進去,順序執行。
- 用作循環隊列:用release命令可以循環執行任務,比如可以做負載均衡任務分發。
- 用作兜底機制:比如一個請求有失敗的概率,可以用Beanstalk不斷重試,設定超時時間,時間內嘗試到成功為止。
其實我們最重要的使用場景是把它作為延遲隊列類使用,比如:
- 下訂單后多長時間沒有付款,要取消訂單,並退庫存
- 用戶注冊成功后,發送一封郵件
- 定期檢查退款狀態的訂單是否退款成功
四:Beanstalkd設計基本概念
4.1 核心概念
- job:一個需要異步處理的任務,是Beanstalkd中的基本單元,需要放在一個tube中
- tube:一個有名的隊列,用來存儲統一類型的job,是producer和consumer操作的對象。tube可以稱為管道
- producer:Job 的生產者,通過 put 命令來將一個 job 放到一個 tube 中
- consumer:Job的消費者,通過 reserve/release/bury/delete 命令來獲取 job 或改變 job 的狀態
4.2 job的生命周期
生產者生成任務,並根據業務需求將任務放到不同的管道中。比如與注冊有關的任務放到注冊管道中,和訂單有關的任務放到訂單管道中。
任務進入管道到離開管道一共有5個狀態 :
(ready,delayed,reserved,buried,delete)
- **READY **- 需要立即處理的任務,當延時 (DELAYED) 任務到期后會自動成為當前任務;
- **DELAYED **- 延遲執行的任務, 當消費者處理任務后, 可以用將消息再次放回 DELAYED 隊列延遲執行;
- **RESERVED **- 已經被消費者獲取, 正在執行的任務。Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;
- **BURIED **- 保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回隊列;
- **DELETED **- 消息被徹底刪除。Beanstalkd 不再維持這些消息。
狀態流程
1.生產任務:
當producer生產者put一個job到tube時,這個job就處於 ready
狀態,等待consumer來消費處理;
producer生產者也可以選擇延遲put一個job,這時job就先達到 delayed
狀態(比如設置一個5秒延遲的job,那么5秒之后,這個job才會變成 ready 狀態,才可以被consumer消費)
2. 消費任務:
consumer獲取了當前 ready
的job后,該job就會遷移到 reserved
狀態,這樣其他的consumer就不能在操作該job
3. 消費完任務后:
當consumer消費完該job后,可以選擇delete, release 或者 bury 3種操作。
- delete操作:job從系統消亡,之后不能在獲取;
- release操作:可以重新把該job狀態遷移回
ready
(也可以延遲狀態delayed
操作),使其他的consumer可以繼 續獲取和執行該job; - bury操作: 把job置為
buried
狀態,及是把該job休眠,等到需要的時候,還可以將休眠的 job 重新置為ready
狀態, 也可以delete掉 buried 狀態的job。
也就是說:當消費者處理完任務后,任務的狀態可能是delete(刪除,處理成功),可能是buried(預留,意味着先把任務放一邊,等待條件成熟還要用),可能是ready,也可能是delayed,需要根據具體業務場景自己進行判斷定義
示意圖:
五:例子
我們使用php來作為例子的練習
使用的PHP庫:https://github.com/pheanstalk/pheanstalk
直接用composer進行安裝
composer.json
{
"require": {
"pda/pheanstalk": "^4.0"
}
}
**5.1 生產者和消費者例子**
**生產者: producer.php**
<?php
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = Pheanstalk::create('192.168.1.109', 11301);
$tubeName = 'use_email_list';
$jobData = array(
'uid' => time(),
'email' => 'test@11.com',
'message' => 'hello beanstalkd',
'dtime' => date('Y-m-d H:i:s'),
);
$pheanstalk->useTube($tubeName)->put(json_encode($jobData));
echo json_encode($jobData);
運行程序: php ./producer.php
**消費者:consumer.php**
<?php
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = Pheanstalk::create('192.168.1.109', 11301);
$tubeName = 'use_email_list';
while(true) {
//獲取隊列信息,reserve 阻塞獲取
$job = $pheanstalk->watch($tubeName)->ignore('default')->reserve();
$data = $job->getData();
//執行相關邏輯
$result = file_put_contents('./send_email.log', $data, FILE_APPEND | LOCK_EX);
if ($result) {
echo 'success :'.$data.PHP_EOF;
$pheanstalk->delete($job);
}
//暫停(不可能是百分百的准確,跟系統的調度、CPU時鍾周期等有一定關系)
usleep(500000);
}
echo 'Success !';
運行程序:php ./consumer.php
**5.2 監控例子**
monitor.php
<?php
//監控服務狀態
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = Pheanstalk::create('192.168.1.109', 11301);
//可以開發監控面板,監控數據的,有多少tube,多少隊列,多少延遲等等
//查看beanstalkd狀態
var_export($pheanstalk->stats());
//查看有多少個tube
var_export($pheanstalk->listTubes());
//設置要監聽的tube
$pheanstalk->watch('use_email_list');
//取消對默認tube的監聽,可以省略
$pheanstalk->ignore('default');
//查看監聽的tube列表
var_export($pheanstalk->listTubesWatched());
//查看use_email_list的tube當前的狀態
var_export($pheanstalk->statsTube('use_email_list'));
//單個job信息
// var_export($pheanstalk->statsJob($job));
六: pheanstalk一些方法說明
6.1 整個 beanstalkd 當前狀態信息
var_export($pheanstalk->stats()) //beanstalkd 當前狀態信息
//output:
Pheanstalk\Response\ArrayResponse::__set_state(array(
'current-jobs-urgent' => '0', // 優先級小於1024狀態為ready的job數量
'current-jobs-ready' => '0', // 狀態為ready的job數量
'current-jobs-reserved' => '0', // 狀態為reserved的job數量
'current-jobs-delayed' => '0', // 狀態為delayed的job數量
'current-jobs-buried' => '0', // 狀態為buried的job數量
'cmd-put' => '0', // 總共執行put指令的次數
'cmd-peek' => '0', // 總共執行peek指令的次數
'cmd-peek-ready' => '0', // 總共執行peek-ready指令的次數
'cmd-peek-delayed' => '0', // 總共執行peek-delayed指令的次數
'cmd-peek-buried' => '0', // 總共執行peek-buried指令的次數
'cmd-reserve' => '0', // 總共執行reserve指令的次數
'cmd-reserve-with-timeout' => '0',
'cmd-delete' => '0',
'cmd-release' => '0',
'cmd-use' => '0', // 總共執行use指令的次數
'cmd-watch' => '0', // 總共執行watch指令的次數
'cmd-ignore' => '0',
'cmd-bury' => '0',
'cmd-kick' => '0',
'cmd-touch' => '0',
'cmd-stats' => '2',
'cmd-stats-job' => '0',
'cmd-stats-tube' => '0',
'cmd-list-tubes' => '0',
'cmd-list-tube-used' => '0',
'cmd-list-tubes-watched' => '0',
'cmd-pause-tube' => '0',
'job-timeouts' => '0', // 所有超時的job的總共數量
'total-jobs' => '0', // 創建的所有job數量
'max-job-size' => '65535', // job的數據部分最大長度
'current-tubes' => '1', // 當前存在的tube數量
'current-connections' => '1', // 當前打開的連接數
'current-producers' => '0', // 當前所有的打開的連接中至少執行一次put指令的連接數量
'current-workers' => '0', // 當前所有的打開的連接中至少執行一次reserve指令的連接數量
'current-waiting' => '0', // 當前所有的打開的連接中執行reserve指令但是未響應的連接數量
'total-connections' => '11', // 總共處理的連接數
'pid' => '4839', // 服務器進程的id
'version' => '1.10', // 服務器版本號
'rusage-utime' => '0.000000', // 進程總共占用的用戶CPU時間
'rusage-stime' => '0.001478', // 進程總共占用的系統CPU時間
'uptime' => '12031', // 服務器進程運行的秒數
'binlog-oldest-index' => '2', // 開始儲存jobs的binlog索引號
'binlog-current-index' => '2', // 當前儲存jobs的binlog索引號
'binlog-records-migrated' => '0',
'binlog-records-written' => '0', // 累積寫入的記錄數
'binlog-max-size' => '10485760', // binlog的最大容量
'id' => '4b005307e8af5b37', // 一個隨機字符串,在beanstalkd進程啟動時產生
'hostname' => 'xing',
))
6.2 單個job任務的信息:
//單個job信息
var_export($pheanstalk->statsJob($job1));
'id' => '1', // job id
'tube' => 'use_email_list', // job 所在的管道
'state' => 'reserved', // job 當前的狀態
'pri' => '1024', // job 的優先級
'age' => '5222', // 自 job 創建時間為止 單位:秒
'delay' => '0',
'ttr' => '60', // time to run
'time-left' => '58', // 僅在job狀態為reserved或者delayed時有意義,當job狀態為reserved時表示剩余的超時時間
'file' => '2', // 表示包含此job的binlog序號,如果沒有開啟它將為0
'reserves' => '10', // 表示job被reserved的次數
'timeouts' => '0', // 表示job處理的超時時間
'releases' => '1', // 表示job被released的次數
'buries' => '0', // 表示job被buried的次數
'kicks' => '0', // 表示job被kiced的次數
6.3 tube 管道的信息:
//查看有多少個tube
var_export($pheanstalk->listTubes());
//設置要監聽的tube
$pheanstalk->watch('use_email_list');
//取消對默認tube的監聽,可以省略
$pheanstalk->ignore('default');
//查看監聽的tube列表
var_export($pheanstalk->listTubesWatched());
//查看use_email_list的tube當前的狀態
var_export($pheanstalk->statsTube('use_email_list'));
6.4 生產者調用方法
// put 任務 方式一; 返回新 job 的任務標識,整型值;
$pheanstalk->useTube('use_email_list')->put(
'hello, world', // 任務內容
23, // 任務的優先級, 默認為 1024
0, // 不等待直接放到ready隊列中.
60 // 處理任務的時間(單位為秒)
);
// put 任務 方式二; 返回新 job 的任務標識,整型值;
$pheanstalk->putInTube(
'use_email_list', // 管道名稱
'hello, world', // 任務內容
23, // 任務的優先級, 默認為 1024
0, // 不等待直接放到ready隊列中. 如值為 60 表示 60秒;
60 // 處理任務的時間(單位為秒)
);
// 給管道里所有新任務設置延遲
$pheanstalk->pauseTube('use_email_list', 30);
// 取消管道延遲
$pheanstalk->resumeTube('use_email_list');
概念說明:
- 任務優先級
任務 (job
) 可以有 0~2^32 個優先級, 0 代表最高優先級。beanstalkd
采用最大最小堆 (Min-max heap
) 處理任務優先級排序, 任何時刻調用reserve
命令的消費者總是能拿到當前優先級最高的任務, 時間復雜度為O(logn)
. ttr
(time to run
, 預設的執行時間)
消費者必須在預設的TTR
(time-to-run
) 時間內發送delete
/release
/bury
改變任務狀態;否則Beanstalkd
會認為消息處理失敗,狀態改為ready
,然后把任務交給另外的消費者節點執行。如果消費者預計在TTR (time-to-run)
時間內無法完成任務, 也可以發送touch
命令, 它的作用是讓Beanstalkd
重置該任務的time-left
剩余執行時間.
## 6.5 消費者調用方法
1. 正常獲取和執行job流程
// 獲取 use_email_list 管道的 job
$job = $pheanstalk->watch('use_email_list')->ignore('default')->reserve();
$job_2 = $pheanstalk->reserveFromTube('use_email_list');
$job_3 = $pheanstalk->peekReady('use_email_list');
// 如果知道 job 的 id, 也可以
$job_4 = $pheanstalk->peek($id);
// var_export($pheanstalk->statsJob($job_4));
// 獲取下一個延遲時間最短 的 job
$job_5 = $pheanstalk->peekDelayed('use_email_list');
// do job .... 這里省略異常的考慮
// 釋放任務 讓別人執行
$pheanstalk->release($job);
// 或成功執行完,則刪除任務
//$pheanstalk->delete($job);
// 將任務埋起來,預留
//$pheanstalk->bury($job);
2. 處理 buried
狀態的 Job
// 獲取下一個被埋藏的 job
$job = $pheanstalk->peekBuried('use_email_list');
// 將任務狀態從 buried 改為 ready
//$pheanstalk->kickJob($job);
// 批量將指定數目的任務從 buried 改為 ready
$pheanstalk->kick(10);
七:參考
https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.zh-CN.md
Beanstalkd中文協議
https://github.com/pheanstalk/pheanstalk php的beanstalkd庫
https://segmentfault.com/a/1190000014803344 php操作beanstalkd
https://segmentfault.com/a/1190000016067218 消息隊列beanstalkd源碼詳解