Kafka-php-使用 PHP 编写的 Kafka 客户端
一. 首先确认下jdk有没有安装
使用命令
[root@localhost ~]# java -version java version "1.8.0_73" Java(TM) SE Runtime Environment (build 1.8.0_73-b02) Java HotSpot(TM) 64-Bit Server VM (build 25.73-b02, mixed mode)
如果有以上信息的话,就往下安装吧,有些可能是jdk对不上,那就装到对的上的。如果没有安装,就看一下下面的jdk安装方法:
http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
到这个地址下载jdk8版本,我下载的是jdk-8u73-linux-x64.tar.gz,然后解压到/usr/local/jdk/下。
然后打开/etc/profile文件
[root@localhost ~]# vim /etc/profile
把下面这段代码写到文件里
export JAVA_HOME=/usr/local/jdk/jdk1.8.0_73 export CLASSPATH=.:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar export PATH=$JAVA_HOME/bin:$PATH
最后
[root@localhost ~]# source /etc/profile
这时jdk就生效了,可以使用 java -version验证下。
二. 接下来安装Kafka
1. 下载Kafka
到http://kafka.apache.org/downloads.html下载相应的版本,我使用的是kafka_2.9.1-0.8.2.2.tgz。
2. 下载完解压到你喜欢的目录
我是解压到 /usr/local/kafka/kafka_2.9.1-0.8.2.2
3. 运行默认的Kafka
启动Zookeeper server
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/zookeeper-server-start.sh config/zookeeper.properties &
启动Kafka server
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-server-start.sh config/server.properties &
运行生产者producer
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
运行消费者consumer
[root@localhost kafka_2.9.1-0.8.2.2]# sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning //可以启动在其他服务器上接受消息
对于消费者,kafka中有两个设置的地方:对于老的消费者,由--zookeeper参数设置;对于新的消费者,由--bootstrap-server参数设置
如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中
查看的方法是使用./zookeeper-client,然后 ls /consumers/[group_id]/offsets/[topic]/[broker_id-part_id],这个是查看某个group_id的某个topic的offset
如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中
. 新建topic
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic_a
查看已有主题list
kafka-topics --list --zookeeper localhost:2181
修改分区数
kafka-topics --zookeeper localhost:2181 --alter --topic topic_a --partitions 2
模拟客户端发送,接受消息
-
发送消息
bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
-
接收消息
bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning
需要注意,此时producer将topic发布到了3个broker中,现在就有点分布式的概念了。
这样,在producer那边输入内容,consumer马上就能接收到。
4. 当有跨机的producer或consumer连接时
需要配置config/server.properties的host.name,要不然跨机的连不上。
安装环境要求
- PHP 版本大于 5.5
- Kafka Server 版本大于 0.8.0
- 消费模块 Kafka Server 版本需要大于 0.9.0
使用 Composer 安装
添加 composer 依赖 nmred/kafka-php 到项目的 composer.json 文件中即可,如:
{
"require": { "nmred/kafka-php": "0.2.*" } }
Produce
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); // 设置生产相关配置,具体配置参数见 [Configuration](Configuration.md) $config = \Kafka\ProducerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setBrokerVersion('0.9.0.1'); $config->setRequiredAck(1); $config->setIsAsyn(false); $config->setProduceInterval(500); $producer = new \Kafka\Producer(function() { return array( array( 'topic' => 'test', 'value' => 'test....message.', 'key' => 'testkey', ), ); }); $producer->setLogger($logger); $producer->success(function($result) { var_dump($result); }); $producer->error(function($errorCode, $context) { var_dump($errorCode); }); $producer->send();
Consumer
<?php require '../vendor/autoload.php'; date_default_timezone_set('PRC'); use Monolog\Logger; use Monolog\Handler\StdoutHandler; // Create the logger $logger = new Logger('my_logger'); // Now add some handlers $logger->pushHandler(new StdoutHandler()); $config = \Kafka\ConsumerConfig::getInstance(); $config->setMetadataRefreshIntervalMs(10000); $config->setMetadataBrokerList('10.13.4.159:9192'); $config->setGroupId('test'); $config->setBrokerVersion('0.9.0.1'); $config->setTopics(array('test')); //$config->setOffsetReset('earliest'); $consumer = new \Kafka\Consumer(); $consumer->setLogger($logger); $consumer->start(function($topic, $part, $message) { var_dump($message); });