php7.2
kafka7.8.1
生產者代碼
<?php $conf = new RdKafka\Conf(); $conf->setDrMsgCb(function ($kafka, $message) { file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }); $conf->setErrorCb(function ($kafka, $err, $reason) { file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }); $rk = new RdKafka\Producer($conf); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $cf = new RdKafka\TopicConf(); // -1必須等所有brokers同步完成的確認 1當前服務器確認 0不確認,這里如果是0回調里的offset無返回,如果是1和-1會返回offset // 我們可以利用該機制做消息生產的確認,不過還不是100%,因為有可能會中途kafka服務器掛掉 $cf->set('request.required.acks', 0); $topic = $rk->newTopic("test", $cf); //RD_KAFKA_PARTITION_UA自動選擇分區 //$option可選 $topic->produce(RD_KAFKA_PARTITION_UA, 0, "kafka123456789", 'kafka');
//kafka隊列中的長度
$len = $rk->getOutQLen(); while ($len > 0) { $len = $rk->getOutQLen(); var_dump($len); $rk->poll(50); }
這里我並沒有開啟集群,只是單機操作。發送 成功
消費者代碼
<?php /** * 消費者消費消息 * * 實現的例子來源於: * * https://github.com/arnaud-lb/php-rdkafka#examples */ $objRdKafka = new RdKafka\Consumer(); $objRdKafka->setLogLevel(LOG_DEBUG); $objRdKafka->addBrokers("127.0.0.1:9092"); $oObjTopic = $objRdKafka->newTopic('test'); /** * consumeStart * 第一個參數標識分區,生產者是往分區0發送的消息,這里也從分區0拉取消息 * 第二個參數標識從什么位置開始拉取消息,可選值為 * RD_KAFKA_OFFSET_BEGINNING : 從開始拉取消息 * RD_KAFKA_OFFSET_END : 從當前位置開始拉取消息 * RD_KAFKA_OFFSET_STORED : 猜測跟RD_KAFKA_OFFSET_END一樣 */ $oObjTopic->consumeStart(0, RD_KAFKA_OFFSET_END); while (true) { // 第一個參數是分區,第二個參數是超時時間 $oMsg = $oObjTopic->consume(0, 1000); // 沒拉取到消息時,返回NULL if (!$oMsg) { usleep(10000); continue; } if ($oMsg->err) { echo $msg->errstr(), "\n"; break; } else { echo $oMsg->payload, "\n"; } }
消費者獲取到消息