kafka 小案例【一】---設置但個消息集群


啟動kafka服務

【 bin/kafka-server-start.sh config/server.properties 】
[root@zhangxs kafka_2.11]# bin/kafka-server-start.sh config/server.properties
[2017-03-30 00:00:33,952] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 0
broker.id.generation.enable = true
broker.rack = null
compression.type = producer
connections.max.idle.ms = 600000
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 30000
create.topic.policy.class.name = null
default.replication.factor = 1
delete.topic.enable = false
fetch.purgatory.purge.interval.requests = 1000
group.max.session.timeout.ms = 300000
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 0.10.2-IV0
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
(2)創建topic
bin/kafka-topics.sh --create --zookeeper 192.168.177.120:2181 --replication-factor 1 --partitions 1 --topic test1
(3)查看指定服務的topic
[root@zhangxs kafka_2.11]# bin/kafka-topics.sh --list --zookeeper 192.168.177.120:2181
[2017-03-30 00:09:15,317] WARN Connected to an old server; r-o mode will be unavailable (org.apache.zookeeper.ClientCnxnSocket)
__consumer_offsets
test
test1
(4)啟動生產者
./kafka-console-producer.sh --broker-list 192.168.177.120:9092 --topic test1
(5)啟動消費者
bin/kafka-console-consumer.sh --bootstrap-server 192.168.177.120:9092 --topic test1 --from-beginning
 
在生產者輸入
[root@zhangxs bin]# ./kafka-console-producer.sh --broker-list 192.168.177.120:9092 --topic test1
zhangxs
hello kafka
meide
張姓^H
消費者收到消息
[root@zhangxs kafka_2.11]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.177.120:9092 --topic test1 --from-beginning
zhangxs
hello kafka
meide
張姓
 
遇到問題
1:我在生產者輸入消息,卻拋出下面異常,
[2017-03-29 23:31:50,473] ERROR Error when sending message to topic test1 with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
這個是因為,因為kafka中config/server.properties 文件默認配置的zk服務是localhost,而我的zk服務ip配置的是我的指定的ip。所以我的producer鏈接不上我的zk服務,所以更新不了元數據。 要保證你的producer的zk服務跟你 自己配置的zk服務是一致的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM