kafka安裝第三方Kafka-PHP的使用


Kafka-php-使用 PHP 編寫的 Kafka 客戶端

 

一. 首先確認下jdk有沒有安裝

使用命令

[root@localhost ~]# java -version
java version "1.8.0_73"
Java(TM) SE Runtime Environment (build 1.8.0_73-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)

如果有以上信息的話,就往下安裝吧,有些可能是jdk對不上,那就裝到對的上的。如果沒有安裝,就看一下下面的jdk安裝方法:

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

到這個地址下載jdk8版本,我下載的是jdk-8u73-linux-x64.tar.gz,然后解壓到/usr/local/jdk/下。

然后打開/etc/profile文件

[root@localhost ~]# vim /etc/profile

把下面這段代碼寫到文件里

export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73
export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar
export PATH=$JAVA_HOME/bin:$PATH

最后

[root@localhost ~]# source /etc/profile

這時jdk就生效了,可以使用 java -version驗證下。

二. 接下來安裝Kafka

1. 下載Kafka

到http://kafka.apache.org/downloads.html下載相應的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz

2. 下載完解壓到你喜歡的目錄

我是解壓到 /usr/local/kafka/kafka_2.9.1-0.8.2.2

3. 運行默認的Kafka

啟動Zookeeper server

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &

啟動Kafka server

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-server-start.sh config/server.properties &

運行生產者producer

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

運行消費者consumer

[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning   //可以啟動在其他服務器上接受消息

對於消費者,kafka中有兩個設置的地方:對於老的消費者,由--zookeeper參數設置;對於新的消費者,由--bootstrap-server參數設置

如果使用了--zookeeper參數,那么consumer的信息將會存放在zk之中

查看的方法是使用./zookeeper-client,然后 ls /consumers/[group_id]/offsets/[topic]/[broker_id-part_id],這個是查看某個group_id的某個topic的offset

如果使用了--bootstrap-server參數,那么consumer的信息將會存放在kafka之中

. 新建topic

kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_a

查看已有主題list

kafka-topics --list --zookeeper localhost:2181

修改分區數

kafka-topics --zookeeper localhost:2181 --alter --topic topic_a  --partitions 2

 

模擬客戶端發送,接受消息
  • 發送消息

bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
  • 接收消息

bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning

需要注意,此時producer將topic發布到了3個broker中,現在就有點分布式的概念了。

 

這樣,在producer那邊輸入內容,consumer馬上就能接收到。

4. 當有跨機的producer或consumer連接時

需要配置config/server.properties的host.name,要不然跨機的連不上。

 

 

安裝環境要求

  • PHP 版本大於 5.5
  • Kafka Server 版本大於 0.8.0
  • 消費模塊 Kafka Server 版本需要大於 0.9.0

使用 Composer 安裝

添加 composer 依賴 nmred/kafka-php 到項目的 composer.json 文件中即可,如:

{
	"require": { "nmred/kafka-php": "0.2.*" } }
復制代碼
Produce

<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); // 設置生產相關配置,具體配置參數見 [Configuration](Configuration.md) $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('0.9.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode, $context) { var_dump($errorCode); }); $producer->send();

Consumer

<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('0.9.0.1'); $config->setTopics(array('test')); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM