背景:最近線上mq消費者進程ok,但rabbitmq控制台顯示無消費進程,導致mq隊列消息堆積,以前是直接重啟mq,這次決定深究下原因
操作耗時的守護進程
因業務原因,每次導入30w條記錄,代碼中將每500條一批塞入mq隊列,在消費的時候,需要查表插庫,處理耗時較長,我們使用的是php-amqp庫,代碼非常簡單
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, ...);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'consumeLogic');
function consumeLogic(AMQPMessage $message): void
{
// 消費邏輯
}
while (count($channel->callbacks)) {
$channel->wait();
}
mq心跳
1.rabbitmq使用心跳機制來保持連接,在正常場景下,客戶端期望通過發送心跳包來告知服務端自己存活。如果服務端連續兩次發送心跳客戶端均無回應,服務端會斷開與客戶端的連接。心跳間隔可在每次連接時設置。
2.因php是同步語言,它無法在后台運行耗時任務時持續發送心跳包。這時候服務端就會斷開連接,而客戶端只有繼續使用這個隊列的時候才會發現已斷開
rabbitmq如何處理心跳
通過閱讀該庫源碼,發現是通過方法
AbstractIO::check_heartbeat()
, 該方法會在你每次使用連接時調用,如AMQPChannel::basic_consume()
,AMQPChannel::basic_consume()
,AMQPChannel::basic_consume()
如果設置了心跳間隔,
check_heartbeat()
方法會監測離上次使用連接過去的時間。如果客戶端忽略了兩次心跳,會自動重連,或者過去了心跳間隔的一半客戶端會主動發送心跳。
手動發送心跳
當在處理耗時任務時,我們需要確保連接,且在任務處理過程中主動發送心跳,那如何實現呢,我們來看下
check_heartbeat()
的源碼
public function check_heartbeat()
{
// ignore unless heartbeat interval is set
if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
$t = microtime(true);
$t_read = round($t - $this->last_read);
$t_write = round($t - $this->last_write);
// server has gone away
if (($this->heartbeat * 2) < $t_read) {
$this->close();
throw new AMQPHeartbeatMissedException("Missed server heartbeat");
}
// time for client to send a heartbeat
if (($this->heartbeat / 2) < $t_write) {
$this->write_heartbeat();
}
}
}
看了源碼之后,發送心跳是有前置條件的
- 設置了心跳間隔
- 從socket中取值了
- 向socket中寫過數據
第一條我們手動設置,第三條只要我們連接了就會有
last_write
,現在我們需要滿足第二條,那何時會觸發read呢,當然是接收消息的時候,可是我們還在處理消息,因同步的問題,需要在處理完才會接收下一條消息。
那我們可不可以主動read呢,可以,需要加一行代碼,就能實現,可在消費代碼中調用
function send_heartbeat($connection)
{
$connection->getIO()->read(0);
}
這時候我們並沒有拿消息,只是用了一個hack,來觸發發送心跳,看下它是如何生效的
public function read($len)
{
if (is_null($this->sock)) {
throw new AMQPSocketException(sprintf(
'Socket was null! Last SocketError was: %s',
socket_strerror(socket_last_error())
));
}
$this->check_heartbeat();
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
$read_start = microtime(true);
$read = 0;
$data = '';
while ($read < $len) {
$buffer = null;
$result = socket_recv($this->sock, $buffer, $len - $read, 0);
if ($result === 0) {
// From linux recv() manual:
// When a stream socket peer has performed an orderly shutdown,
// the return value will be 0 (the traditional "end-of-file" return).
// http://php.net/manual/en/function.socket-recv.php#47182
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
if (empty($buffer)) {
$read_now = microtime(true);
$t_read = $read_now - $read_start;
if ($t_read > $this->read_timeout) {
throw new AMQPTimeoutException('Too many read attempts detected in SocketIO');
}
$this->select($timeout_sec, $timeout_uSec);
continue;
}
$read += mb_strlen($buffer, 'ASCII');
$data .= $buffer;
}
if (mb_strlen($data, 'ASCII') != $len) {
throw new AMQPIOException(sprintf(
'Error reading data. Received %s instead of expected %s bytes',
mb_strlen($data, 'ASCII'),
$len
));
}
$this->last_read = microtime(true);
return $data;
}
我們調用read后,會主動觸發檢測心跳包,之后會設置
last_read
,在第二次手動調用的時候就會發送心跳了。
這里給大家的建議是處理消息盡量快速,最好不要用hack