隊列:Beanstalkd介紹


一:介紹

Beanstalkd 是一個輕量級的內存型隊列。它是典型的類Memcached設計,協議和使用方式都是同樣風格。
github:https://github.com/beanstalkd
官網:https://beanstalkd.github.io/

二:功能特性

2.1 優先級
任務job可以有0~2^32 個優先級, 0 代表最高優先級,默認優先級為1024。

2.2 延遲 delay
比如說多長時間后執行某個任務

2.3 持久化
可以通過binlog將job及其狀態記錄到文件里面,在Beanstalkd下次啟動時可以通過讀取binlog來恢復之前的job及狀態。

2.4 超時控制
為了防止某個consumer長時間占用任務但不能處理的情況,Beanstalkd為reserve操作設置了timeout時間,如果該consumer不能在指定時間內完成job,job將被遷移回READY狀態,供其他consumer執行。

2.5 分布式容錯
因為它是類Memcached設計,beanstalkd各個server之間並不知道彼此的存在,都是通過client來實現分布式以及根據tube名稱去特定server獲取job。

三:使用場景

  • 用作延時隊列:比如可以用於如果用戶30分鍾內不操作,任務關閉。
  • 用作定時任務:比如可以用於專門的后台任務。
  • 用作異步操作:這是所有消息隊列都最常用的,先將任務仍進去,順序執行。
  • 用作循環隊列:用release命令可以循環執行任務,比如可以做負載均衡任務分發。
  • 用作兜底機制:比如一個請求有失敗的概率,可以用Beanstalk不斷重試,設定超時時間,時間內嘗試到成功為止。

其實我們最重要的使用場景是把它作為延遲隊列類使用,比如:

  1. 下訂單后多長時間沒有付款,要取消訂單,並退庫存
  2. 用戶注冊成功后,發送一封郵件
  3. 定期檢查退款狀態的訂單是否退款成功

四:Beanstalkd設計基本概念

4.1 核心概念

  • job:一個需要異步處理的任務,是Beanstalkd中的基本單元,需要放在一個tube中
  • tube:一個有名的隊列,用來存儲統一類型的job,是producer和consumer操作的對象。tube可以稱為管道
  • producer:Job 的生產者,通過 put 命令來將一個 job 放到一個 tube 中
  • consumer:Job的消費者,通過 reserve/release/bury/delete 命令來獲取 job 或改變 job 的狀態

4.2 job的生命周期

生產者生成任務,並根據業務需求將任務放到不同的管道中。比如與注冊有關的任務放到注冊管道中,和訂單有關的任務放到訂單管道中。

任務進入管道到離開管道一共有5個狀態 :
(ready,delayed,reserved,buried,delete)

  • **READY **- 需要立即處理的任務,當延時 (DELAYED) 任務到期后會自動成為當前任務;
  • **DELAYED **- 延遲執行的任務, 當消費者處理任務后, 可以用將消息再次放回 DELAYED 隊列延遲執行;
  • **RESERVED **- 已經被消費者獲取, 正在執行的任務。Beanstalkd 負責檢查任務是否在 TTR(time-to-run) 內完成;
  • **BURIED **- 保留的任務: 任務不會被執行,也不會消失,除非有人把它 "踢" 回隊列;
  • **DELETED **- 消息被徹底刪除。Beanstalkd 不再維持這些消息。

狀態流程

1.生產任務:

當producer生產者put一個job到tube時,這個job就處於 ready 狀態,等待consumer來消費處理;
      producer生產者也可以選擇延遲put一個job,這時job就先達到 delayed 狀態(比如設置一個5秒延遲的job,那么5秒之后,這個job才會變成 ready 狀態,才可以被consumer消費)

2. 消費任務:

consumer獲取了當前 ready 的job后,該job就會遷移到 reserved 狀態,這樣其他的consumer就不能在操作該job

3. 消費完任務后:

當consumer消費完該job后,可以選擇delete, release 或者 bury 3種操作。

  • delete操作:job從系統消亡,之后不能在獲取;
  • release操作:可以重新把該job狀態遷移回 ready (也可以延遲狀態 delayed 操作),使其他的consumer可以繼 續獲取和執行該job;
  • bury操作: 把job置為 buried 狀態,及是把該job休眠,等到需要的時候,還可以將休眠的 job 重新置為 ready  狀態, 也可以delete掉 buried 狀態的job。

也就是說:當消費者處理完任務后,任務的狀態可能是delete(刪除,處理成功),可能是buried(預留,意味着先把任務放一邊,等待條件成熟還要用),可能是ready,也可能是delayed,需要根據具體業務場景自己進行判斷定義


示意圖:
image.png

五:例子

我們使用php來作為例子的練習

使用的PHP庫:https://github.com/pheanstalk/pheanstalk
直接用composer進行安裝
composer.json

{
    "require": {
        "pda/pheanstalk": "^4.0"
    }
}

**5.1 生產者和消費者例子**
**生產者: producer.php**
<?php
require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('192.168.1.109', 11301);

$tubeName = 'use_email_list';
$jobData = array(
    'uid' => time(),
    'email' => 'test@11.com',
    'message' => 'hello beanstalkd',
    'dtime' => date('Y-m-d H:i:s'),
);

$pheanstalk->useTube($tubeName)->put(json_encode($jobData));

echo json_encode($jobData);

運行程序: php  ./producer.php


**消費者:consumer.php**
<?php
require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('192.168.1.109', 11301);
$tubeName = 'use_email_list';

while(true) {
    //獲取隊列信息,reserve 阻塞獲取
    $job = $pheanstalk->watch($tubeName)->ignore('default')->reserve();
    $data = $job->getData();

    //執行相關邏輯
    $result = file_put_contents('./send_email.log', $data, FILE_APPEND | LOCK_EX);
    if ($result) {
        echo 'success :'.$data.PHP_EOF;
        $pheanstalk->delete($job);
    }

    //暫停(不可能是百分百的准確,跟系統的調度、CPU時鍾周期等有一定關系)
    usleep(500000);
}

echo 'Success !';

運行程序:php ./consumer.php


**5.2 監控例子**
monitor.php
<?php

//監控服務狀態
require_once('./vendor/autoload.php');

use Pheanstalk\Pheanstalk;

$pheanstalk = Pheanstalk::create('192.168.1.109', 11301);

//可以開發監控面板,監控數據的,有多少tube,多少隊列,多少延遲等等


//查看beanstalkd狀態
var_export($pheanstalk->stats());

//查看有多少個tube
var_export($pheanstalk->listTubes());

//設置要監聽的tube
$pheanstalk->watch('use_email_list');

//取消對默認tube的監聽,可以省略
$pheanstalk->ignore('default');

//查看監聽的tube列表
var_export($pheanstalk->listTubesWatched());

//查看use_email_list的tube當前的狀態
var_export($pheanstalk->statsTube('use_email_list'));

//單個job信息
// var_export($pheanstalk->statsJob($job));

六: pheanstalk一些方法說明

6.1 整個 beanstalkd 當前狀態信息

var_export($pheanstalk->stats())  //beanstalkd 當前狀態信息

//output:
Pheanstalk\Response\ArrayResponse::__set_state(array(
  'current-jobs-urgent' => '0', // 優先級小於1024狀態為ready的job數量
  'current-jobs-ready' => '0', // 狀態為ready的job數量
  'current-jobs-reserved' => '0', // 狀態為reserved的job數量
  'current-jobs-delayed' => '0', // 狀態為delayed的job數量
  'current-jobs-buried' => '0', // 狀態為buried的job數量
  'cmd-put' => '0', // 總共執行put指令的次數
  'cmd-peek' => '0', // 總共執行peek指令的次數
  'cmd-peek-ready' => '0', // 總共執行peek-ready指令的次數
  'cmd-peek-delayed' => '0', // 總共執行peek-delayed指令的次數
  'cmd-peek-buried' => '0', // 總共執行peek-buried指令的次數
  'cmd-reserve' => '0', // 總共執行reserve指令的次數
  'cmd-reserve-with-timeout' => '0',
  'cmd-delete' => '0',
  'cmd-release' => '0',
  'cmd-use' => '0', // 總共執行use指令的次數
  'cmd-watch' => '0', // 總共執行watch指令的次數
  'cmd-ignore' => '0',
  'cmd-bury' => '0',
  'cmd-kick' => '0',
  'cmd-touch' => '0',
  'cmd-stats' => '2',
  'cmd-stats-job' => '0',
  'cmd-stats-tube' => '0',
  'cmd-list-tubes' => '0',
  'cmd-list-tube-used' => '0',
  'cmd-list-tubes-watched' => '0',
  'cmd-pause-tube' => '0',
  'job-timeouts' => '0', // 所有超時的job的總共數量
  'total-jobs' => '0', // 創建的所有job數量
  'max-job-size' => '65535', // job的數據部分最大長度
  'current-tubes' => '1', // 當前存在的tube數量
  'current-connections' => '1', // 當前打開的連接數
  'current-producers' => '0', // 當前所有的打開的連接中至少執行一次put指令的連接數量
  'current-workers' => '0', // 當前所有的打開的連接中至少執行一次reserve指令的連接數量
  'current-waiting' => '0', // 當前所有的打開的連接中執行reserve指令但是未響應的連接數量
  'total-connections' => '11', // 總共處理的連接數
  'pid' => '4839', // 服務器進程的id
  'version' => '1.10', // 服務器版本號
  'rusage-utime' => '0.000000', // 進程總共占用的用戶CPU時間
  'rusage-stime' => '0.001478', // 進程總共占用的系統CPU時間
  'uptime' => '12031', // 服務器進程運行的秒數
  'binlog-oldest-index' => '2', // 開始儲存jobs的binlog索引號
  'binlog-current-index' => '2', // 當前儲存jobs的binlog索引號
  'binlog-records-migrated' => '0',
  'binlog-records-written' => '0', // 累積寫入的記錄數
  'binlog-max-size' => '10485760', // binlog的最大容量
  'id' => '4b005307e8af5b37', // 一個隨機字符串,在beanstalkd進程啟動時產生
  'hostname' => 'xing',
))

6.2 單個job任務的信息:

//單個job信息
var_export($pheanstalk->statsJob($job1));

'id' => '1', // job id
'tube' => 'use_email_list', // job 所在的管道
'state' => 'reserved', // job 當前的狀態
'pri' => '1024', // job 的優先級
'age' => '5222', // 自 job 創建時間為止 單位:秒
'delay' => '0',
'ttr' => '60', // time to run
'time-left' => '58', // 僅在job狀態為reserved或者delayed時有意義,當job狀態為reserved時表示剩余的超時時間
'file' => '2', // 表示包含此job的binlog序號,如果沒有開啟它將為0
'reserves' => '10', // 表示job被reserved的次數
'timeouts' => '0', // 表示job處理的超時時間
'releases' => '1', // 表示job被released的次數
'buries' => '0', // 表示job被buried的次數
'kicks' => '0', // 表示job被kiced的次數

6.3 tube 管道的信息:

//查看有多少個tube
var_export($pheanstalk->listTubes());

//設置要監聽的tube
$pheanstalk->watch('use_email_list');

//取消對默認tube的監聽,可以省略
$pheanstalk->ignore('default');

//查看監聽的tube列表
var_export($pheanstalk->listTubesWatched());

//查看use_email_list的tube當前的狀態
var_export($pheanstalk->statsTube('use_email_list'));

6.4 生產者調用方法

// put 任務 方式一; 返回新 job 的任務標識,整型值;
$pheanstalk->useTube('use_email_list')->put(
    'hello, world', // 任務內容
    23, // 任務的優先級, 默認為 1024
    0, // 不等待直接放到ready隊列中.
    60 // 處理任務的時間(單位為秒)
);

// put 任務 方式二; 返回新 job 的任務標識,整型值;
$pheanstalk->putInTube(
    'use_email_list', // 管道名稱
    'hello, world', // 任務內容
    23, // 任務的優先級, 默認為 1024
    0, // 不等待直接放到ready隊列中. 如值為 60 表示 60秒;
    60 // 處理任務的時間(單位為秒)
);

// 給管道里所有新任務設置延遲
$pheanstalk->pauseTube('use_email_list', 30);

// 取消管道延遲
$pheanstalk->resumeTube('use_email_list');

概念說明:

  • 任務優先級
    任務 (job) 可以有 0~2^32 個優先級, 0 代表最高優先級。 beanstalkd 采用最大最小堆 (Min-max heap) 處理任務優先級排序, 任何時刻調用 reserve 命令的消費者總是能拿到當前優先級最高的任務, 時間復雜度為 O(logn).
  • ttr(time to run, 預設的執行時間)
    消費者必須在預設的 TTR (time-to-run) 時間內發送 delete / release / bury 改變任務狀態;否則 Beanstalkd 會認為消息處理失敗,狀態改為 ready,然后把任務交給另外的消費者節點執行。如果消費者預計在 TTR (time-to-run) 時間內無法完成任務, 也可以發送 touch 命令, 它的作用是讓 Beanstalkd 重置該任務的 time-left 剩余執行時間.

## 6.5 消費者調用方法

1. 正常獲取和執行job流程

// 獲取 use_email_list 管道的 job
$job = $pheanstalk->watch('use_email_list')->ignore('default')->reserve();
$job_2 = $pheanstalk->reserveFromTube('use_email_list');
$job_3 = $pheanstalk->peekReady('use_email_list');

// 如果知道 job 的 id, 也可以
$job_4 = $pheanstalk->peek($id);
// var_export($pheanstalk->statsJob($job_4));

// 獲取下一個延遲時間最短 的 job
$job_5 = $pheanstalk->peekDelayed('use_email_list');

// do job .... 這里省略異常的考慮

// 釋放任務 讓別人執行
$pheanstalk->release($job);
// 或成功執行完,則刪除任務
//$pheanstalk->delete($job);
// 將任務埋起來,預留
//$pheanstalk->bury($job);

2. 處理 buried 狀態的 Job

// 獲取下一個被埋藏的 job
$job = $pheanstalk->peekBuried('use_email_list');

// 將任務狀態從 buried 改為 ready
//$pheanstalk->kickJob($job);

// 批量將指定數目的任務從 buried 改為 ready
$pheanstalk->kick(10);

七:參考

https://github.com/beanstalkd/beanstalkd/blob/master/doc/protocol.zh-CN.md  
Beanstalkd中文協議
https://github.com/pheanstalk/pheanstalk  php的beanstalkd庫
https://segmentfault.com/a/1190000014803344 php操作beanstalkd
https://segmentfault.com/a/1190000016067218 消息隊列beanstalkd源碼詳解


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM