簡介:
Apache Kafka 是一個 Scala 語言編寫的可擴展、分布式、高性能的容錯消息發布、訂閱系統。
官網地址:http://kafka.apache.org
中文教程:http://www.orchome.com/kafka/index
下載地址:http://mirror.bit.edu.cn/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz ( Scala 2.11 is recommended )
Java版本:jdk-8u111-linux-x64.rpm
一、單機部署
shell > vim /etc/hosts 192.168.10.23 zk-node01 kb-node01 # 必須配置,否則無法啟動 shell > rpm -ivh jdk-8u111-linux-x64.rpm shell > java -version java version "1.8.0_111" Java(TM) SE Runtime Environment (build 1.8.0_111-b14) Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode) shell > cd /usr/local/src; tar zxf kafka_2.11-1.0.0.tgz -C /usr/local shell > cd /usr/local/kafka_2.11-1.0.0
1、zookeeper
shell > vim config/zookeeper.properties # 數據目錄 dataDir=/data/zookeeper_data # 監聽端口 clientPort=2181 # 最大連接數 不限制 maxClientCnxns=0 shell > sh bin/zookeeper-server-start.sh -daemon config/zookeeper.properties # 啟動 zookeeper
2、kafka
shell > vim config/server.properties # 唯一ID broker.id=1 # 監聽地址 listeners=PLAINTEXT://0.0.0.0:9092 # 向 Zookeeper 注冊的地址,這里如果需要同時內外網訪問需要注冊 hostname,否則只能注冊外網IP地址,會導致所有流量都走外網 advertised.listeners=PLAINTEXT://kb-node01:9092 # 數據目錄 log.dirs=/data/kafka_data # 允許刪除topic delete.topic.enable=true # 不允許自動創建topic auto.create.topics.enable=false # 磁盤IO不足的時候,可以適當調大該值 ( 當內存足夠時 ) #log.flush.interval.messages=10000 #log.flush.interval.ms=1000 # kafka 數據保留時間 默認 168 -> 7 天 log.retention.hours=24 # zookeeper zookeeper.connect=zk-node01:2181 # 其余都使用默認配置 # 這樣配置,外網訪問 kafka 時,需要配置 hosts ( IP kb-node01 ) 否則提示主機名未知! shell > sh bin/kafka-server-start.sh -daemon config/server.properties # 啟動 kafka
二、kafka 指令
1、topic
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --create --topic kafka-test --partitions 1 --replication-factor 1 Created topic "kafka-test". shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --list kafka-test
# 創建一個 topic kafka-test,它有一個分區、一個副本
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --describe --topic kafka-test Topic:kafka-test PartitionCount:1 ReplicationFactor:1 Configs: Topic: kafka-test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
# 查看這個 topic 的屬性,總共一個分區,一個副本;當前分區為 0,leader 為 broker.id=1 的 broker,
# 副本所在 broker,活躍的 broker ( 需要同步副本的broker )
2、consumer
shell > sh bin/kafka-console-consumer.sh --bootstrap-server kb-node01:9092 --topic kafka-test --from-beginning
# 啟動一個消費者,消費 kafka-test 這個 topic,從頭讀取
3、producer
shell > sh bin/kafka-console-producer.sh --broker-list kb-node01:9092 --topic kafka-test > hello world
# 重啟一個終端,啟動一個生產者,輸入 hello world,這是消費者終端會看到消息 hello world
4、alter topic
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --alter --topic kafka-test --partitions 2 shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --describe --topic kafka-test Topic:kafka-test PartitionCount:2 ReplicationFactor:1 Configs: Topic: kafka-test Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Topic: kafka-test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
# 這樣就將 topic kafka-test 的分區修改成了 2 個
5、delete topic
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --delete --topic kafka-test shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --list __consumer_offsets
# 這樣就刪除了 topic kafka-test了,__consumer_offsets 這個 topic 為系統自動生成,用來存放消費者 offset 信息的。
三、偽分布式部署
shell > sh bin/kafka-server-stop.sh
# 停止原來的 kafka 不停也行...
shell > cp config/server.properties config/server-1.properties shell > cp config/server.properties config/server-2.properties shell > cp config/server.properties config/server-3.properties
# 沒辦法,強迫症
shell > vim config/server-1.properties broker.id=1 listeners=PLAINTEXT://0.0.0.0:9091 advertised.listeners=PLAINTEXT://kb-node01:9091 log.dirs=/data/kafka_data-1 shell > vim config/server-2.properties broker.id=2 listeners=PLAINTEXT://0.0.0.0:9092 advertised.listeners=PLAINTEXT://kb-node01:9092 log.dirs=/data/kafka_data-2 shell > vim config/server-3.properties broker.id=3 listeners=PLAINTEXT://0.0.0.0:9093 advertised.listeners=PLAINTEXT://kb-node01:9093 log.dirs=/data/kafka_data-3
# 這幾項不能重復 注意:防火牆要開啟相應的端口
shell > sh bin/kafka-server-start.sh -daemon config/server-1.properties shell > sh bin/kafka-server-start.sh -daemon config/server-2.properties shell > sh bin/kafka-server-start.sh -daemon config/server-3.properties
# 啟動這些 broker
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --create --topic kafka-all --partitions 6 --replication-factor 3 Created topic "kafka-all". shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --list __consumer_offsets kafka-all
# 新創建的 topic
shell > sh bin/kafka-topics.sh --zookeeper zk-node01:2181 --describe --topic kafka-all Topic:kafka-all PartitionCount:6 ReplicationFactor:3 Configs: Topic: kafka-all Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 Topic: kafka-all Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 Topic: kafka-all Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1 Topic: kafka-all Partition: 3 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1 Topic: kafka-all Partition: 4 Leader: 1 Replicas: 1,3,2 Isr: 1,3,2 Topic: kafka-all Partition: 5 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
# 一個 6 分區、3 副本的 topic 誕生了,0 分區的 leader 是 3,
# 副本分布在 3、1、2 上,活躍的 broker 為 3、1、2 ( 需要同步副本的broker )
shell > sh bin/kafka-console-producer.sh --broker-list kb-node01:9091,kb-node01:9092 --topic kafka-all > hello kafka-all
# 啟動一個生產者,注意:只寫了兩個 broker 向 kafka-all topic 中發送了一條消息
shell > sh bin/kafka-console-consumer.sh --bootstrap-server kb-node01:9093 --topic kafka-all --from-beginning hello kafka-all
# 啟動一個消費者,注意:只寫了生產者沒填寫的 broker,還是消費到了消息
# 說明:broker 不需要全部填寫,會自動發現,即使有機器宕機數據也不會丟失!
# 注意:集群部署時,zookeeper 要么為 1,要么為 3,不要是兩台,否則其中一台宕機,集群則無法提供服務!
# 部署 zookeeper 集群請參照其余博文。