消息中間件Kafaka - PHP操作使用Kafka


Centos版本:Centos6.4,PHP版本:PHP7。

在上一篇文章中使用IP為192.168.9.154的機器安裝並開啟了Kafka進行了簡單測試,充當了Kafka服務器。

本篇文章新開啟一台IP為192.16.9.157的機器給PHP開啟擴展。

找到github的擴展下載地址,這里是php-rdkafka,雖然php有一個擴展是php-kafka,但是php-rdkafka要比php-kafka強大。

https://github.com/arnaud-lb/php-rdkafka  //php-rdkafka下載地址

  在安裝php-rdkafka之前需要給系統安裝一個庫,librdkafka。

https://github.com/edenhill/librdkafka  //librdkafka地址

  兩個包都下載完之后,首先進行librdkafka的解壓安裝

# unzip librdkafka-master.zip 
# cd librdkafka-master
# ./configure
# make && make install

  接下來編譯安裝php-rdkafka

unzip php-rdkafka-master.zip
# cd php-rdkafka-master
# phpize
# ./configure --with-php-config=/usr/local/php/bin/php-config 
# make && make install

  

Installing shared extensions:     /usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/  //so地址
# vim /usr/local/php/etc/php.ini   //添加下面代碼
extension=/usr/local/php/lib/php/extensions/no-debug-non-zts-20170718/rdkafka.so
# service php-fpm restart  //重啟PHP

  

#查看擴展是否生效 php -m | grep kafka

OK

 

hp操作kafka

運行前先開啟我們的zookeeper和kafka 上篇文章有如何開啟

    1. 運行producer
      kafka默認端口9092

      vim producer.php

 <?php
    $rk = new RdKafka\Producer();
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers("ip:9092");       
    $topic = $rk->newTopic("test");
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "要發送的消息");

  運行consumer
vim consumer.php

<?php
    $rk = new RdKafka\Consumer();
    $rk->setLogLevel(LOG_DEBUG);
    $rk->addBrokers("ip");
    $topic = $rk->newTopic("test");
    $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
    while(true){
        sleep(1);
        $msg = $topic->consume(0, 1000);
        if ($msg) {
            echo $msg->payload, "\n";
        }          
    }    

 開啟兩個窗口一個運行consumer 一個運行producer

php consumer.php
php producer.php

會發現我們已經簡單的會使用kafka了。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM