1.安裝(linux安裝beanstalkd, windows暫不支持)
# wget https://github.com/kr/beanstalkd/archive/v1.10.tar.gz
# tar xzvf v1.10
# cd beanstalkd-1.10/
# make && make install
# beanstalkd -v beanstalkd 1.10 2
beanstalkd -l 127.0.0.1 -p 11300 &
composer require pda/pheanstalk
(官網:https://github.com/pda/pheanstalk)
下面是自己寫的封裝的一個類里面的方法(類的話自己創建就可以了):
// 添加任務 public static function addJob() { //創建隊列消息 $pheanstalk = self::const(); print_r($pheanstalk->stats());//查看狀態 exit; $tubeName = 'email_list'; $jobData = [ 'email' => '123456@163.com', 'message' => 'Hello World !!', 'dtime' => date('Y-m-d H:i:s'), ]; $pda = $pheanstalk->useTube($tubeName)->put(json_encode($jobData)); return $pda; } // 引用公共信息(自己寫的方法連接Pheanstalk,首先在服務器開啟beanstalkd) public static function const() { $pheanstalk = Pheanstalk::create('127.0.0.1', 11300); return $pheanstalk; }
消費 job
// 消費job
public static function Consumption()
{
$pheanstalk = self::const();
$tubeName = 'email_list';
$job = $pheanstalk->watch($tubeName)->ignore('default')->reserve();
if ($job !== false) {
$res = $pheanstalk->statsJob($job);
if ($res['reserves'] > 5) { //判斷一個任務請求失敗5次后直接刪除
$pheanstalk->delete($job);
} else {
$job_data = $job->getData();
print_r($job_data);
exit;
self::subscribe($job_data);
$pheanstalk->delete($job);
}
/* 繼續 Watch 下一個 job */
self::Consumption();
} else {
$pheanstalk->log->error('reserve false', 'reserve false');
}
}
// 添加到數據庫 public static function subscribe($job_data) { print_r($job_data); // 數據格式 {"time":"1212312412","email":"123456@163.com","message":"Hello World !!","dtime":"2020-09-28 15:12:14"} }

default_socket_timeout 這個參數是一定要加的,php 默認一般是 60s,假如您沒有在代碼里面設置,采用默認的話(60s),60s 之內如果沒有 job 產生,腳本就會報 socket 錯誤,我寫的是 7 天超時,您可以根據業務去調整,記住一定要配置,網上很多搜的 consumer 腳本都沒有配置這個,根本不能投入生產環境使用,這是我親自實踐的結果。
關於 while true 是否死循環,很明確告訴你是死循環,但是不會一直耗性能的那樣執行下去,它會在 reserve 這里阻塞不動,直到有消息產生才會往下走,所以大可放心使用,我的項目代碼里面是使用了方法調用方法自身去實現循環的。
4.Pheanstalk使用方法
維護方法
stats() 查看狀態方法
listTubes() 目前存在的管道
listTubesWatched() 目前監聽的管道
statsTube() 管道的狀態
useTube() 指定使用的管道
statsJob() 查看任務的詳細信息
peek() 通過任務ID獲取任務
生產者方法
putInTube() 往管道中寫入數據
put() 配合useTube()使用
消費者方法
watch() 監聽管道,可以同時監聽多個管道
ignore() 不監聽管道
reserve() 以阻塞方式監聽管道,獲取任務
reserveFromTube()
release() 把任務重新放回管道
bury() 把任務預留
peekBuried() 把預留任務讀取出來
kickJob() 把buried狀態的任務設置成ready
kick() 批量把buried狀態的任務設置成ready
peekReady() 把准備好的任務讀取出來
peekDelayed() 把延遲的任務讀取出來
pauseTube() 給管道設置延遲
resumeTube() 取消管道延遲
touch() 讓任務重新計算ttr時間,給任務續命
