php調用kafka消息隊列


2020年10月27日16:49:33

環境php 7.3 laravel 8

kafka版本  kafka_2.13-2.6.0.tgz

擴展https://github.com/arnaud-lb/php-rdkafka

其他的php擴展不是很久沒更新就是擴展關系亂七八糟,建議使用rdkafka

http://pecl.php.net/package/rdkafka 有編譯好的dll擴展

官方文檔 https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/book.rdkafka.html  需要FQ

kafka中文文檔

https://kafka.apachecn.org/documentation.html

https://www.orchome.com/472

相關概念理解

https://www.cnblogs.com/rickiyang/category/1487719.html

 

 

注意: 將其中php_rdkafka.dll放入php目錄下的ext文件夾內,librdkafka.dll放入php根目錄下,然后修改php.ini,添加:

extension=php_rdkafka.dll

php -m和phpinfo()都需要驗證是否安裝成功

代碼:

KafkaProducerService

<?php

namespace App\Service;

use RdKafka\Conf;
use RdKafka\Producer;

class KafkaProducerService {

    public static function doTask() {

        $conf = new Conf();
//        $conf->set('log_level', (string) LOG_DEBUG);
//        $conf->set('debug', 'all');
        $rk = new Producer($conf);
        $rk->addBrokers('172.18.0.105');
        $topic = $rk->newTopic('zx');

        $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message 222222222222");

        $rk->poll(1);
    }

}
View Code

KafkaConsumerService low消費者模式

<?php

namespace App\Service;

use RdKafka\Conf;
use RdKafka\Consumer;

class KafkaConsumerService {

    public static function doTask() {

        $conf = new Conf();
//        $conf->set('log_level', (string) LOG_DEBUG);
//        $conf->set('debug', 'all');
        $rk = new Consumer($conf);
        $rk->addBrokers('172.18.0.105');

        $topic = $rk->newTopic('zx');

        $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

        while (true) {
            $msg = $topic->consume(0, 5000);
            if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
                // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
                continue;
            } elseif ($msg->err) {
                echo $msg->errstr(), "\n";
                break;
            } else {
                echo $msg->payload, "\n";
            }
        }
    }

}
View Code

 

high消費者模式

 

<?php

namespace App\Service;

use RdKafka\Conf;
use RdKafka\Consumer;
use RdKafka\TopicConf;
use RdKafka\KafkaConsumer;
use App\Service\BaseKafkaService;

class KafkaConsumerService extends BaseKafkaService {

    //low消費者
//    public static function doTask() {
//
//        $conf = new Conf();
//        $rk = new Consumer($conf);
//        $rk->addBrokers(self::$server_ip);
//        $topic = $rk->newTopic(self::$topic);
//
//        $topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);
//
//        while (true) {
//            $msg = $topic->consume(0, 5000);
//            if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
//                // Constant check required by librdkafka 0.11.6. Newer librdkafka versions will return NULL instead.
//                continue;
//            } elseif ($msg->err) {
//                echo $msg->errstr(), "\n";
//                break;
//            } else {
//                echo $msg->payload, "\n";
//            }
//        }
//    }
    //high消費者
    public static function doTask() {
        $conf = new Conf();
        // Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (KafkaConsumer $kafka, $err, array $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    echo "Assign: ";
                    var_dump($partitions);
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    echo "Revoke: ";
                    var_dump($partitions);
                    $kafka->assign(NULL);
                    break;

                default:
                    throw new \Exception($err);
            }
        });

        // Configure the group.id. All consumer with the same group.id will consume
        $conf->set('group.id', self::$group_id);
        $conf->set('metadata.broker.list', self::$server_ip);

        $conf->set('auto.offset.reset', 'earliest');

        $consumer = new KafkaConsumer($conf);
        
        $topicConf = new TopicConf();
//        $topicConf->set("auto.commit.interval.ms", 1e3);
        $topicConf->set("enable.auto.commit", 0);

        $topic = $consumer->newTopic(self::$topic, $topicConf);
        
//        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
        // Subscribe to topic 'test'
        $consumer->subscribe([self::$topic]);

        while (true) {
            $message = $consumer->consume(1 * 1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
//                    var_dump($message);
                    var_dump($message->payload);

//                    $consumer->commit($message);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    echo "Timed out\n";
                    break;
                default:
                    throw new \Exception($message->errstr(), $message->err);
                    break;
            }
        }
    }

}
View Code

 

注意:

1,你測試服務器上的kafka必須的先測試好,不然有些錯誤,在log里面沒法只管體現

2,$rk->poll(1);必須,不然消費端接受不到信息,因為是編譯擴展,所以不清楚底層是怎么實現的

3,注意消費者模式和參看文檔

一些測試命令

啟動查看啟動錯誤
bin/kafka-server-start.sh config/server.properties bin/zookeeper-server-start.sh config/zookeeper.properties
172.18.0.105 本地測試服務Ip
bin/kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name test --alter --add-config retention.ms=3000
//刪除
bin/kafka-topics.sh --delete --topic testTopic --zookeeper localhost:2181
//列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
//創建
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zx
//生產者客戶端
bin/kafka-console-producer.sh --broker-list 172.18.0.105:9092 --topic zx
//消費者客戶端
bin/kafka-console-consumer.sh --bootstrap-server 172.18.0.105:9092 --topic zx --from-beginning

jps查看是否啟動
9173 Kafka
9462 Jps
8589 QuorumPeerMain

現在服務器啟動生產者客戶端,消費者客戶端測試確保kafka跑起來完全沒問題,這點一定要確保

個人的一點感悟,kafka配置和測試有一定難度,不如rabbitmq上手快,而且kafka找錯誤真的不好找

雖然kafka號稱吞吐性能能夠達到T級別,都有集群模式,kafka借助與zookeeper的集群,各自都有優點,如果是java做粘合劑就使用kafka,如果是多語言建議RabbitMQ

 建議java全家桶還是使用java做粘合劑會好一些


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM