說明:有很多同學在服務器上搭建好,kafka,在應用端使用kafka時候出現很多問題,這里提供下我的kafka生產和消費的php函數
環境說明:
1:首先php要有kafka擴展,在命令行中輸入 php -m 看是否有rdkafka
沒有的話需要安裝配置下:
--------------- kafka php客戶端安裝(php-rdkafka) --------------
1.安裝 librdkafka
git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make
sudo make install
2.安裝php-rdkafka
git clone https://github.com/arnaud-lb/php-rdkafka.git
cd php-rdkafka
phpize
./configure
make all -j 5
sudo make install
vi /usr/local/lib/php.ini
加入 extension=rdkafka.so
2:在kafka控制器中我直接貼出來我的生產和消費函數:
/** * 生產單個消息 * @param string $topic * @param null $post */ function kafka_produce($key=null,$post=null) { $rk = new \RdKafka\producer(); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("localhost:9092,localhost:9093,localhost:9094,localhost:9095"); $topics = $rk->newTopic('engine.com'); $topics->produce(1, 0,$post,$key); echo 'kafka_produce success!!!'; }
/** * 高級消費模式 * @param $topic * @return int * @throws Exception */ function kafka_high_consume($topic='engine.com'){ $conf = new \RdKafka\Conf(); $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; var_dump($partitions); $kafka->assign($partitions); break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new \Exception($err); } }); $conf->set('group.id', '0'); $conf->set('metadata.broker.list', 'localhost:9092,localhost:9093,localhost:9094,localhost:9095'); // 針對低延遲進行了優化的配置。這允許PHP進程/請求盡快發送消息並快速終止 $conf->set('socket.timeout.ms', 50); if (function_exists('pcntl_sigprocmask')) { pcntl_sigprocmask(SIG_BLOCK, array(SIGIO)); $conf->set('internal.termination.signal', SIGIO); } else { $conf->set('queue.buffering.max.ms', 1); } $topicConf = new \RdKafka\TopicConf(); $topicConf->set('auto.commit.interval.ms', 100); $topicConf->set('auto.offset.reset', 'smallest'); $topicConf->set('offset.store.path', 'kafka_offset.log'); $conf->setDefaultTopicConf($topicConf); $consumer = new \RdKafka\KafkaConsumer($conf); // $topics->consumeStart(1, RD_KAFKA_OFFSET_STORED); // 更新訂閱集(自動分配partitions ) $consumer->subscribe([$topic]); // 指定topic分配partitions使用那個分區 // $consumer->assign([ // new \RdKafka\TopicPartition("zzy8", 0), // new \RdKafka\TopicPartition("zzy8", 1), // ]); while (true) { // 設置120s為超時 $message = $consumer->consume(120 * 1000); if (!empty($message)) { switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: info('New message received :', $message); // 拆解對象為數組 $payload = json_decode($message->payload,true); $Orders = new OrdersController(); $key = $message->key; // 根據kafka中不同key,調用對應方法傳遞處理數據 ...
...
...
break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; var_dump("##################"); break; default: var_dump("nothing"); throw new \Exception($message->errstr(), $message->err); break; } } else { var_dump('this is empty obj!!!'); } } return 0; }
在這里說明下,我放的是我項目中的使用函數,一些參數配置,大家可以根據我的提示自行注釋和使用。