rabbitmq保持連接


背景:最近線上mq消費者進程ok,但rabbitmq控制台顯示無消費進程,導致mq隊列消息堆積,以前是直接重啟mq,這次決定深究下原因

操作耗時的守護進程

因業務原因,每次導入30w條記錄,代碼中將每500條一批塞入mq隊列,在消費的時候,需要查表插庫,處理耗時較長,我們使用的是php-amqp庫,代碼非常簡單

$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, ...);
$channel = $connection->channel();
$channel->queue_declare($queue, false, true, false, false);
$channel->basic_consume($queue, $consumerTag, false, false, false, false, 'consumeLogic');
function consumeLogic(AMQPMessage $message): void
{
    // 消費邏輯
}
while (count($channel->callbacks)) {
    $channel->wait();
}

mq心跳

1.rabbitmq使用心跳機制來保持連接,在正常場景下,客戶端期望通過發送心跳包來告知服務端自己存活。如果服務端連續兩次發送心跳客戶端均無回應,服務端會斷開與客戶端的連接。心跳間隔可在每次連接時設置。

2.因php是同步語言,它無法在后台運行耗時任務時持續發送心跳包。這時候服務端就會斷開連接,而客戶端只有繼續使用這個隊列的時候才會發現已斷開

rabbitmq如何處理心跳

通過閱讀該庫源碼,發現是通過方法 AbstractIO::check_heartbeat(), 該方法會在你每次使用連接時調用,如 AMQPChannel::basic_consume(),AMQPChannel::basic_consume(),AMQPChannel::basic_consume()

如果設置了心跳間隔,check_heartbeat()方法會監測離上次使用連接過去的時間。如果客戶端忽略了兩次心跳,會自動重連,或者過去了心跳間隔的一半客戶端會主動發送心跳。

手動發送心跳

當在處理耗時任務時,我們需要確保連接,且在任務處理過程中主動發送心跳,那如何實現呢,我們來看下check_heartbeat()的源碼

public function check_heartbeat()
{
    // ignore unless heartbeat interval is set
    if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
        $t = microtime(true);
        $t_read = round($t - $this->last_read);
        $t_write = round($t - $this->last_write);

        // server has gone away
        if (($this->heartbeat * 2) < $t_read) {
            $this->close();
            throw new AMQPHeartbeatMissedException("Missed server heartbeat");
        }

        // time for client to send a heartbeat
        if (($this->heartbeat / 2) < $t_write) {
            $this->write_heartbeat();
        }
    }
}

看了源碼之后,發送心跳是有前置條件的

  1. 設置了心跳間隔
  2. 從socket中取值了
  3. 向socket中寫過數據

第一條我們手動設置,第三條只要我們連接了就會有 last_write,現在我們需要滿足第二條,那何時會觸發read呢,當然是接收消息的時候,可是我們還在處理消息,因同步的問題,需要在處理完才會接收下一條消息。

那我們可不可以主動read呢,可以,需要加一行代碼,就能實現,可在消費代碼中調用

function send_heartbeat($connection)
{
  $connection->getIO()->read(0);
}

這時候我們並沒有拿消息,只是用了一個hack,來觸發發送心跳,看下它是如何生效的

public function read($len)
{
    if (is_null($this->sock)) {
        throw new AMQPSocketException(sprintf(
            'Socket was null! Last SocketError was: %s',
            socket_strerror(socket_last_error())
        ));
    }

    $this->check_heartbeat();

    list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
    $read_start = microtime(true);
    $read = 0;
    $data = '';
    while ($read < $len) {
        $buffer = null;
        $result = socket_recv($this->sock, $buffer, $len - $read, 0);
        if ($result === 0) {
            // From linux recv() manual:
            // When a stream socket peer has performed an orderly shutdown,
            // the return value will be 0 (the traditional "end-of-file" return).
            // http://php.net/manual/en/function.socket-recv.php#47182
            $this->close();
            throw new AMQPConnectionClosedException('Broken pipe or closed connection');
        }

        if (empty($buffer)) {
            $read_now = microtime(true);
            $t_read = $read_now - $read_start;
            if ($t_read > $this->read_timeout) {
                throw new AMQPTimeoutException('Too many read attempts detected in SocketIO');
            }
            $this->select($timeout_sec, $timeout_uSec);
            continue;
        }

        $read += mb_strlen($buffer, 'ASCII');
        $data .= $buffer;
    }

    if (mb_strlen($data, 'ASCII') != $len) {
        throw new AMQPIOException(sprintf(
            'Error reading data. Received %s instead of expected %s bytes',
            mb_strlen($data, 'ASCII'),
            $len
        ));
    }

    $this->last_read = microtime(true);

    return $data;
}

我們調用read后,會主動觸發檢測心跳包,之后會設置last_read,在第二次手動調用的時候就會發送心跳了。

這里給大家的建議是處理消息盡量快速,最好不要用hack

參考鏈接

  1. Keeping RabbitMQ connections alive in PHP


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM