php kafka 生產和消費



<?php #生產 $rk = new RdKafka\Producer(); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("127.0.0.1"); $topic = $rk->newTopic("test3"); for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i"); }

<?php
#消費
 
         
<?php

$rk = new RdKafka\Consumer();
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");

$topic = $rk->newTopic("test");

$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
    $msg = $topic->consume(0, 1000);
    if (null === $msg) {
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
        echo $msg->payload, "\n";
    }
}
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM