PHP7 生產環境隊列 Beanstalkd 正確使用姿勢


應用場景

  為什么要用呢,有什么好處?這應該放在最開頭說,一件東西你只有了解它是干什么的,適合干什么,才能更好的與自己的項目相結合,用到哪里學到哪里,學了不用等於不會,我們平時就應該多考慮一些這樣的問題:自己做個什么項目功能能跟 xx 技術相結合呢?這個 xx 技術放在這種業務場景下行不行呢?而不是 “學了這個 xx 技術能干嘛呢,公司現在也沒有用這個的呀,學了也沒用啊”,帶着這樣心情去學習 xx 技術,肯定很痛苦。

  隊列大家都知道是將一些耗時的操作先不去做,先埋點,再異步去處理,這樣對一些發郵件發短信之類的耗時操作,用戶是感覺不到的,因為埋點結束,操作也就結束了,消費隊列都是在服務器上做的。主要應用在短信或郵件通知,訪問第三方接口訂閱消息,商城的一些秒殺活動,都可以結合隊列來完成。

Beanstalkd 介紹

  Beanstalkd 是一個高性能,輕量級的分布式內存隊列,C 代碼,典型的類 Memcached 設計,協議和使用方式都是同樣的風格,所以使用過 memcached 的用戶會覺得 Beanstalkd 似曾相識。

  beanstalkd 的最初設計意圖是在高並發的網絡請求下,通過異步執行耗時較多的請求,及時返回結果,減少請求的響應延遲。

Ubuntu 安裝

sudo apt-get install beanstalkd

  

配置文件

vim /etc/default/beanstalkd 

  

查看狀態

service beanstalkd status
# 命令回顯 #
root@:/www/server/php/72/etc# service beanstalkd status
● beanstalkd.service - Simple, fast work queue
 Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled)
 Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago
 Docs: man:beanstalkd(1)
 Main PID: 7033 (beanstalkd)
 Tasks: 1 (limit: 4634)
 CGroup: /system.slice/beanstalkd.service
 └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd
Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.

  

配置連通性 + 持久化

ip 用 0.0.0.0 允許所有連接,靠配置安全組或防火牆去約束連接,放開 -b 參數 (默認沒有持久化),內存的隊列消息可以落地到硬盤 binlog 實現持久化,斷電可重新讀取隊列消息。

vim /etc/default/beanstalkd
BEANSTALKD_LISTEN_ADDR=0.0.0.0
BEANSTALKD_LISTEN_PORT=11300
BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"

  

beanstalkd 任務狀態

管理工具

親測了很多網上能找到的 beanstalkd 工具,這兩款是我最中意的了,一個命令行,一個 web 的。

命令行:

web 界面:

編程語言客戶端

PHP 客戶端

composer require pda/pheanstalk

寫入 job

<?php
//創建隊列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
$jobData = [
 'email' => '123456@163.com',
 'message' => 'Hello World !!',
 'dtime' => date('Y-m-d H:i:s'),
];
$pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );

  

消費 job

<?php
ini_set('default_socket_timeout', 86400*7);
ini_set( 'memory_limit', '256M' );
// 消費隊列消息
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
while ( true )
{
 // 獲取隊列信息, reserve 阻塞獲取
 $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve();
 if ( $job !== false )
 {
 $data = $job->getData();
 /* TODO 邏輯操作 */
 /* 處理完成,刪除 job */
 $pheanstalk->delete( $job );
 }
}

  

default_socket_timeout 這個參數是一定要加的,php 默認一般是 60s,假如您沒有在代碼里面設置,采用默認的話(60s),60s 之內如果沒有 job 產生,腳本就會報 socket 錯誤,我寫的是 7 天超時,您可以根據業務去調整,記住一定要配置,網上很多搜的 consumer 腳本都沒有配置這個,根本不能投入生產環境使用,這是我親自實踐的結果。

  關於 while true 是否死循環,很明確告訴你是死循環,但是不會一直耗性能的那樣執行下去,它會在 reserve 這里阻塞不動,直到有消息產生才會往下走,所以大可放心使用,我的項目代碼里面是使用了方法調用方法自身去實現循環的。

就是這樣的代碼,供參考:

public function watchJob()
{
 $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve();
 if ( $job !== false )
 {
 $job_data = $job->getData();
 $this->subscribe( $job_data );
 $this->pheanstalk->delete( $job );
 /* 繼續 Watch 下一個 job */
 $this->watchJob();
 }
 else
 {
 $this->log->error( 'reserve false', 'reserve false' );
 }
} 

  

監控 beanstalkd 狀態

<?php
//監控服務狀態
require_once('./vendor/autoload.php');
use Pheanstalk\Pheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();
var_dump( $isAlive );

  

可以配合 email 做一個報警郵件,腳本每分鍾去執行,判斷狀態是 false,就給管理員發送郵件報警。

一些相關命令

查看 beanstalkd 服務內存占用

top -u beanstalkd

  

后台運行 consumer 腳本

nohup php googlehome_subscribe.php &

  

查看 consumer 腳本運行時間

ps -A -opid,stime,etime,args | grep consumer.php

  

手工重啟 consumer 腳本

ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 
nohup php googlehome_subscribe.php &

  

一些總結

  php 要把錯誤日志打開,方便收集 consumer 腳本 crash 的 log,腳本跑出一些致命的 error 一定要及時修復,因為一旦有錯就會掛掉,這會影響你腳本的可用性,后期穩定之后可以上 supervisor 這種進程管理程序來管控腳本生命周期。

  一些網絡請求操作,一定要 try catch 到所有錯誤,一旦沒有 catch 到,腳本就崩。我用的是 Guzzle 去做的網絡請求,下面是我 catch 的一些錯誤,代碼片段供參考。

try
{
 /* TODO: 邏輯操作 */
}
catch ( ClientException $e )
{
 $results['mid']    = $this->mid;
 $results['code']   = $e->getResponse()->getStatusCode();
 $results['reason'] = $e->getResponse()->getReasonPhrase();
 $this->log->error( 'properties-changed ClientException', $results );
}
catch ( ServerException $e )
{
 $results['mid']    = $this->mid;
 $results['code']   = $e->getResponse()->getStatusCode();
 $results['reason'] = $e->getResponse()->getReasonPhrase();
 $this->log->error( 'properties-changed ServerException', $results );
}
catch ( ConnectException $e )
{
 $results['mid'] = $this->mid;
 $this->log->error( 'properties-changed ConnectException', $results );
}

  

  job 消費之后一定要刪除掉,如果長時間不刪除,php 客戶端會有 false 返回,是因為有 DEADLINE_SOON 這個超時錯誤產生,所以處理完任務,一定要記得刪除,這一點跟 kafka 不一樣,beanstalkd 需要開發者自己去刪除 job。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM