-
如僅作為消費者或生產者,直接使用下面消費者或生產者的代碼,並安裝擴展即可。
-
PHP要安裝rdkafka擴展,而rdkafka又依賴librdkafka,因此你需要安裝rdkafka和librdkafka,之后就可以與kafka服務器交互了。
-
如搭建kafka服務,需要jdk環境和zookeeper,以及kafka遠程訪問的配置,請參考
消費者
<?php
/**
* 代碼中的輸出注釋都可以打開供調試使用
* 對 中台生產的 用戶信息 進行消費
* Date: 2019/7/31
*/
// 設置將要消費消息的主題
$topic = 'alikafka-jl-yz-zt-updata-test';
$host = '172.168.50.233';
$group_id = 'CID_alikafka_jl_lz';
$conf = new \RdKafka\Conf();
// 當有新的消費進程加入或者退出消費組時,kafka 會自動重新分配分區給消費者進程,這里注冊了一個回調函數,當分區被重新分配時觸發
$conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
// echo "Assign: ";
// var_dump($partitions);
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
// echo "Revoke: ";
// var_dump($partitions);
$kafka->assign(NULL);
break;
default:
throw new \Exception($err);
}
});
// 配置groud.id 具有相同 group.id 的consumer 將會處理不同分區的消息,
// 所以同一個組內的消費者數量如果訂閱了一個topic,
// 那么消費者進程的數量多於 多於這個topic 分區的數量是沒有意義的。
$conf->set('group.id', $group_id);
// 添加 kafka集群服務器地址
$conf->set('metadata.broker.list', $host); //'localhost:9092,localhost:9093,localhost:9094,localhost:9095'
// 針對低延遲進行了優化的配置。這允許PHP進程/請求盡快發送消息並快速終止
$conf->set('socket.timeout.ms', 50);
//多進程和信號
if (function_exists('pcntl_sigprocmask')) {
pcntl_sigprocmask(SIG_BLOCK, array(SIGIO));
$conf->set('internal.termination.signal', SIGIO);
} else {
$conf->set('queue.buffering.max.ms', 1);
}
$topicConf = new \RdKafka\TopicConf();
// 在interval.ms的時間內自動提交確認、建議不要啟動, 1是啟動,0是未啟動
$topicConf->set('auto.commit.enable', 1);
$topicConf->set('auto.commit.interval.ms', 100);
//smallest:簡單理解為從頭開始消費,largest:簡單理解為從最新的開始消費
$topicConf->set('auto.offset.reset', 'smallest');
// 設置offset的存儲為broker
//$topicConf->set('offset.store.method', 'broker');
// 設置offset的存儲為file
//$topicConf->set('offset.store.method', 'file');
// 設置offset的存儲路徑
$topicConf->set('offset.store.path', 'kafka_offset.log');
//$topicConf->set('offset.store.path', __DIR__);
$conf->setDefaultTopicConf($topicConf);
$consumer = new \RdKafka\KafkaConsumer($conf);
// 更新訂閱集(自動分配partitions )
$consumer->subscribe([$topic]);
// 指定topic分配partitions使用那個分區
// $consumer->assign([
// new \RdKafka\TopicPartition("zzy8", 0),
// new \RdKafka\TopicPartition("zzy8", 1),
// ]);
while (true) {
// 設置120s為超時
$message = $consumer->consume(3 * 1000);
if (!empty($message)) {
switch ($message->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
var_dump('New message received :', $message); // 打印消息
// 拆解對象為數組,並根據業務需求處理數據
$payload = json_decode($message->payload,true);
$key = $message->key;
// 根據kafka中不同key,調用對應方法傳遞處理數據*(如果有必要的話)
//對該條message進行處理,比如用戶數據同步, 記錄日志。
// var_dump("asasasasasasasasasasasas");
break;
case RD_KAFKA_RESP_ERR__PARTITION_EOF:
echo "No more messages; will wait for more\n";
break;
case RD_KAFKA_RESP_ERR__TIMED_OUT:
echo "Timed out\n";
var_dump("##################");
break;
default:
var_dump("nothing");
throw new \Exception($message->errstr(), $message->err);
break;
}
} else {
var_dump('this is empty obj!!!');
}
}
生產者
<?php
/**
* Date: 2019/8/1
*/
$conf = new RdKafka\Conf();
$conf->setDrmSgCb(function ($kafka, $message){
file_put_contents("d:/dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);
});
$conf->setErrorCb(function ($kafka, $err, $reason){
file_put_contents("d:/err_cb.log",sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);
});
$rk = new RdKafka\Producer($conf);
$rk->setLogLevel(LOG_DEBUG);
$rk->addBrokers("127.0.0.1");
$cf = new RdKafka\TopicConf();
// -1必須等所有brokers同步完成的確認 1當前服務器確認 0不確認,這里如果是0回調里的offset無返回,如果是1和-1會返回offset
// 我們可以利用該機制做消息生產的確認,不過還不是100%,因為有可能會中途kafka服務器掛掉
$cf->set('request.required.acks', 0);
$topic = $rk->newTopic("test", $cf);
$option = 'qkl';
for ($i = 0; $i < 10; $i++) {
//RD_KAFKA_PARTITION_UA自動選擇分區
//$option可選
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message . $i", $option);
}
$len = $rk->getOutQLen();
while ($len > 0) {
$len = $rk->getOutQLen();
// var_dump($len);
$rk->poll(10);
}
var_dump("finish");exit;
- 列如:將消費者保存為consumer.php文件后,使用php命令行運行