php kafka生產者,消費者操作


php7.2

kafka7.8.1

生產者代碼

<?php
$conf = new RdKafka\Conf();
$conf->setDrMsgCb(function ($kafka, $message) {
    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason) {
    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});

$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$cf = new RdKafka\TopicConf();
// -1必須等所有brokers同步完成的確認 1當前服務器確認 0不確認,這里如果是0回調里的offset無返回,如果是1和-1會返回offset
// 我們可以利用該機制做消息生產的確認,不過還不是100%,因為有可能會中途kafka服務器掛掉
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);


//RD_KAFKA_PARTITION_UA自動選擇分區
//$option可選
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "kafka123456789", 'kafka');
//kafka隊列中的長度 

$len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }

 

 這里我並沒有開啟集群,只是單機操作。發送 成功

 

 

消費者代碼

<?php

/**
 * 消費者消費消息
 *
 * 實現的例子來源於:
 *
 * https://github.com/arnaud-lb/php-rdkafka#examples
 */

$objRdKafka = new RdKafka\Consumer();
$objRdKafka->setLogLevel(LOG_DEBUG);
$objRdKafka->addBrokers("127.0.0.1:9092");

$oObjTopic = $objRdKafka->newTopic('test');

/**
 * consumeStart
 *   第一個參數標識分區,生產者是往分區0發送的消息,這里也從分區0拉取消息
 *   第二個參數標識從什么位置開始拉取消息,可選值為
 *     RD_KAFKA_OFFSET_BEGINNING : 從開始拉取消息
 *     RD_KAFKA_OFFSET_END : 從當前位置開始拉取消息
 *     RD_KAFKA_OFFSET_STORED : 猜測跟RD_KAFKA_OFFSET_END一樣
 */
$oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END);

while (true) {
    // 第一個參數是分區,第二個參數是超時時間
    $oMsg = $oObjTopic->consume(0, 1000);

    // 沒拉取到消息時,返回NULL
    if (!$oMsg) {
        usleep(10000);
        continue;
    }

    if ($oMsg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $oMsg->payload, "\n";
    }
}

消費者獲取到消息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM