Ubuntu下安裝和使用zookeeper和kafka


1.在清華鏡像站下載kafka_2.10-0.10.0.0.tgz 和 zookeeper-3.4.10.tar.gz

分別解壓到/usr/local目錄下

2.進入zookeeper目錄,在conf目錄下將zoo_sample.cfg文件拷貝,並更名為zoo.cfg

參考 https://my.oschina.net/phoebus789/blog/730787

zoo.cfg文件的內容

# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=/home/common/zookeeper/zookeeperdir/zookeeper-data
dataLogDir=/home/common/zookeeper/zookeeperdir/logs
# the port at which the clients will connect
clientPort=2181
server.1=10.10.100.10:2888:3888
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

新建下面這兩個目錄

/home/common/zookeeper/zookeeperdir/zookeeper-data
/home/common/zookeeper/zookeeperdir/logs

在zookeeper-data目錄下新建一個myid文件,內容為1,代表這個服務器的編號是1,具體參考上面網址中的內容

最后在/etc/profile中添加環境變量,並source

export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=${ZOOKEEPER_HOME}/bin:$PATH

現在zookeeper就安裝好了,現在啟動zookeeper

bin/zkServer.sh start

查看狀態

bin/zkServer.sh status

啟動客戶端腳本

bin/zkCli.sh -server localhost:2181

停止zookeeper

bin/zkServer.sh stop

 

1.現在安裝kafka,同樣是解壓之后就安裝好了

參考 http://www.jianshu.com/p/efc8b9dbd3bd

2.進入kafka目錄下

kafka需要使用Zookeeper,首先需要啟動Zookeeper服務,上面的操作就已經啟動了Zookeeper服務

如果沒有的話,可以使用kafka自帶的腳本啟動一個簡單的單一節點Zookeeper實例

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動 Kafka服務

bin/kafka-server-start.sh config/server.properties

停止 Kafka服務

bin/kafka-server-stop.sh config/server.properties

 

3.創建一個主題

首先創建一個名為test的topic,只使用單個分區和一個復本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 現在可以運行list topic命令看到我們的主題

bin/kafka-topics.sh --list --zookeeper localhost:2181

4.發送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

如果要批量導入文件數據到kafka,參考:2.1 本地環境下kafka批量導入數據

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic < file_pat

如果要模擬實時數據到打入kafka的情況,可以寫一個shell腳本

#!/usr/bin/env bash

cat XXXX.log | while read line
do
    sleep 0.1
    echo "${line}"
    echo "${line}" | /home/lintong/software/apache/kafka_2.11-0.10.0.0/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topicA
done

 

5.啟動一個消費者,消費者會接收到消息

舊版消費者

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 2>/dev/null

新版消費者

bin/kafka-console-consumer.sh --new-consumer --bootstrap-server localhost:9092 --topic input --from-beginning 2>/dev/null

消費帶權限kafka topic

/opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --new-consumer --bootstrap-server xxxx:9092 --topic my_topic --consumer.config ./client.jaas > ./test.log

其他參數

--max-messages XXX   指定在退出前最多讀取的消息數
--partition XXX    指定消費的分區

 

6.查看指定的topic的offset信息

對於結尾是ZK的消費者,其消費者的信息是存儲在Zookeeper中的

對於結尾是KF的消費者,其消費者的信息是存在在Kafka的broker中的

都可以使用下面的命令進行查看

bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group xxx --topic xxx

結果

bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --group test-consumer-group --topic xxx
[2018-09-03 20:34:57,595] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group xxx              0   509             0               -509            none

或者

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group xxxx --topic xxxx

結果

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test-consumer-group
[2018-09-03 20:45:02,967] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group           Topic                          Pid Offset          logSize         Lag             Owner
test-consumer-group xxx              0   509             509             0               none

lag是負數的原因是 topic中的消息數量過期(超過kafka默認的7天后被刪除了),變成了0,所以Lag=logSize減去Offset,所以就變成了負數

7.刪除一個topic

需要在 conf/server.properties 文件中設置

# For delete topic
delete.topic.enable=true

否則在執行了以下刪除命令后,再 list 查看所有的topic,還是會看到該topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topicB

再到 配置文件 中的kafka數據存儲地址去刪除物理數據了,我的地址為

/tmp/kafka-logs

最后需要到zk里刪除kafka的元數據

./bin/zkCli.sh #進入zk shell
ls /brokers/topics
rmr /brokers/topics/topicA

 參考:kafka 手動刪除topic的數據

 

8.查看某個group的信息

新版

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group xxx

結果

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group group_id
GROUP          TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET   LAG                 OWNER
group_id      xxx              0          509             509             0               consumer-1_/127.0.0.1

如果這時候消費者進程關閉了之后,使用上面的命令和下面的-list命令將不會查出這個group_id,但是當消費者進程重新開啟后,這個group_id又能重新查到,且消費的offset不會丟失

舊版

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group xxx --describe

9.查看consumer group的列表

ZK的消費者可以使用下面命令查看,比如上面的例子中的 test-consumer-group

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

KF的消費者可以使用下面命令查看,比如上面的例子中的 console-consumer-xxx ,但是只會查看到類似於 KMOffsetCache-lintong-B250M-DS3H 的結果,這是由於這種消費者的信息是存放在 __consumer_offsets 中

對於如何查看存儲於 __consumer_offsets 中的新版消費者的信息,可以參考huxihx的博文: Kafka 如何讀取offset topic內容 (__consumer_offsets)

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

10.在zk中刪除一個consumer group

rmr /consumers/test-consumer-group

11.查看topic的offset的最小值

參考:重置kafka的offset

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -2
xxxx:0:0

12.查看topic的offset的最大值

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic xxxx --time -1

13.重置topic的某個消費者的offset為0,需要高版本的kafka才有該命令,在高版本的kafka client對低版本的kafka集群執行該命令是會生效的

而且需要該group是inactive的,,即該消費組沒有消費者,不然會報 Error: Assignments can only be reset if the group 'xxxxxx' is inactive, but the current state is Stable.

kafka-consumer-groups --bootstrap-server localhost:9092 --group xxx --topic xxx --reset-offsets --to-earliest --execute

14.指定offset和partition進行消費,指定offset的時候必須指定partition

/opt/cloudera/parcels/KAFKA/bin/kafka-console-consumer --topic xxxx --partition 2 --offset 820000  --bootstrap-server xxx:9092 > ./test2.log

15.查看kafka topic的consumer的某個時間的offset,注意這個--to-datetime是utc時間,需要減去8個小時

/opt/cloudera/parcels/KAFKA/bin/kafka-consumer-groups --bootstrap-server xxxx:9092 --group xxxx --topic xxxx --command-config ./client.jaas  --reset-offsets --to-datetime 2020-01-01T00:00:00.000

client.jaas

properties {
	security.protocol=SASL_PLAINTEXT
	sasl.mechanism=PLAIN
	sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxx" serviceName="kafka" password="xxxx";
}

16.遷移topic副本,比如topic1有3個partition,這個3個partition分布在broker150、broker151、broker152上,此時想要將broker152換成broker164,比如說在kafka集群添加了新節點或者遇到磁盤分布不均勻的情況的時候

在kafka manager上面點擊

 

修改

 

  再點擊

 

 如果數據量大的話會比較耗時

17.生成遷移方案

參考:kafka partiton遷移方法與原理

 

18.查看同步延遲(under replicated)的partition

/opt/cloudera/parcels/KAFKA/lib/kafka/bin/kafka-topics.sh --zookeeper xxxx:2181 --describe --under-replicated-partitions

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM