Kafka常用命令收錄


目錄

目錄 1

1. 前言 2

2. Broker默認端口號 2

3. 安裝Kafka 2

4. 啟動Kafka 2

5. 創建Topic 2

6. 列出所有Topic 3

7. 刪除Topic 3

8. 查看Topic 3

9. 增加topic的partition數 4

10. 生產消息 4

11. 消費消息 4

12. 查看有哪些消費者Group 4

13. 查看新消費者詳情 5

14. 查看Group詳情 5

15. 刪除Group 5

16. 設置consumer group的offset 5

17. RdKafka自帶示例 6

18. 平衡leader 6

19. 自帶壓測工具 6

20. 查看topic指定分區offset的最大值或最小值 6

21. 查看__consumer_offsets 6

22. 獲取指定consumer group的位移信息 7

23. 20) 查看kafka的zookeeper 7

24. 如何增加__consumer_offsets的副本數? 9

25. 問題 11

附1:進程監控工具process_monitor.sh 12

附2:批量操作工具 12

附2.1:批量執行命令工具:mooon_ssh 12

附2.2:批量上傳文件工具:mooon_upload 13

附2.3:使用示例 13

附3:批量設置broker.id和listeners工具 15

附4:批量設置hostname工具 16

附5:Kafka監控工具kafka-manager 16

附6:kafka的安裝 16

附7:__consumer_offsets 17

 

1. 前言

本文內容主要來自兩個方面:一是網上的分享,二是自研的隨手記。日記月累,收錄kafka各種命令,會持續更新。

在0.9.0.0之后的Kafka,出現了幾個新變動,一個是在Server端增加了GroupCoordinator這個角色,另一個較大的變動是將topic的offset 信息由之前存儲在zookeeper上改為存儲到一個特殊的topic(__consumer_offsets)中。

Kafka的瓶頸容易發生在網卡,而不是CPU、內存和磁盤,所以應當考慮log的壓縮。

 

相關網址:

1) Kafka官網:http://kafka.apache.org/

2) 下載地址:http://kafka.apache.org/downloads

3) 客戶端庫:https://cwiki.apache.org/confluence/display/KAFKA/Clients

4) librdkafka庫:https://github.com/edenhill/librdkafka

5) confluent-kafka-go:https://github.com/confluentinc/confluent-kafka-go

2. Broker默認端口號

9092,建議安裝時,在zookeeper中指定kafka的根目錄,比如“/kafka”,而不是直接使用“/”,這樣多套kafka也可共享同一個zookeeper集群。

3. 安裝Kafka

相比Hadoop、HBase、Spark,甚至Redis等,Kafka的安裝到跑起來比較簡單,參考官方的介紹即可:http://kafka.apache.org/quickstart

Kafka依賴Zookeeper,本身自帶了Zookeeper,不過建議另外安裝Zookeeper。

4. 啟動Kafka

kafka-server-start.sh config/server.properties

后台常駐方式,請帶上參數“-daemon”,如:

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

5. 創建Topic

參數--topic指定Topic名,--partitions指定分區數,--replication-factor指定備份數:

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

 

注意,如果配置文件server.properties指定了kafka在zookeeper上的目錄,則參數也要指定,否則會報無可用的brokers,如:

kafka-topics.sh --create --zookeeper localhost:2181/kafka --replication-factor 1 --partitions 1 --topic test

6. 列出所有Topic

kafka-topics.sh --list --zookeeper localhost:2181

注意,如果配置文件server.properties指定了kafka在zookeeper上的目錄,則參數也要指定,否則會報無可用的brokers,如:

kafka-topics.sh --list --zookeeper localhost:2181/kafka

 

輸出示例:

__consumer_offsets

my-replicated-topic

test

7. 刪除Topic

1) kafka-topics.sh --zookeeper localhost:2181 --topic test --delete

2) kafka-topics.sh --zookeeper localhost:2181/kafka --topic test --delete

3) kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper localhost:2181 --topic test

8. 查看Topic

kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

 

注意,如果配置文件server.properties指定了kafka在zookeeper上的目錄,則參數也要指定,否則會報無可用的brokers,如:

kafka-topics.sh --describe --zookeeper localhost:2181/kafka --topic test

 

輸出示例:

Topic:test PartitionCount:3 ReplicationFactor:2 Configs:

Topic: test Partition: 0 Leader: 140 Replicas: 140,214 Isr: 140,214

Topic: test Partition: 1 Leader: 214 Replicas: 214,215 Isr: 214,215

Topic: test Partition: 2 Leader: 215 Replicas: 215,138 Isr: 215,138

9. 增加topic的partition數

kafka-topics.sh --zookeeper localhost:2181 --alter --topic test --partitions 5

10. 生產消息

kafka-console-producer.sh --broker-list localhost:9092 --topic test

11. 消費消息

1) 從頭開始

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2) 從尾部開始

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest

3) 指定分區

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 1

4) 取指定個數

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --offset latest --partition 1 --max-messages 1

 

5) 新消費者(ver>=0.9)

kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --new-consumer --from-beginning --consumer.config config/consumer.properties

12. 查看有哪些消費者Group

1) 分ZooKeeper方式(老)

kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181/kafka --list

2) API方式(新)

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list

 

輸出示例:

test

console-consumer-37602

console-consumer-75637

console-consumer-59893

13. 查看新消費者詳情

僅支持offset存儲在zookeeper上的:

 

 

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

14. 查看Group詳情

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe

 

輸出示例:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID

test  1         87             87             0   -           -    -

15. 刪除Group

老版本的ZooKeeper方式可以刪除Group,新版本則自動刪除,當執行:

kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --delete

 

輸出如下提示:

Option '[delete]' is only valid with '[zookeeper]'.

Note that there's no need to delete group metadata for the new consumer

as the group is deleted when the last committed offset for that group expires.

16. 設置consumer group的offset

執行zkCli.sh進入zookeeper命令行界面,假設需將group為testgroup的topic的offset設置為2018,則:set /consumers/testgroup/offsets/test/0 2018

如果kakfa在zookeeper中的根目錄不是“/”,而是“/kafka”,則:

set /kafka/consumers/testgroup/offsets/test/0 2018

 

另外,還可以使用kafka自帶工具kafka-run-class.sh kafka.tools.UpdateOffsetsInZK修改,命令用法:

kafka.tools.UpdateOffsetsInZK$ [earliest | latest] consumer.properties topic

從用法提示可以看出,只能修改為earliest或latest,沒有直接修改zookeeper靈活。

17. RdKafka自帶示例

rdkafka_consumer_example -b 127.0.0.1:9092 -g test test

rdkafka_consumer_example -e -b 127.0.0.1:9092 -g test test

18. 平衡leader

kafka-preferred-replica-election.sh --zookeeper localhost:2181/chroot

19. 自帶壓測工具

kafka-producer-perf-test.sh --topic test --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

20. 查看topic指定分區offset的最大值或最小值

time為-1時表示最大值,為-2時表示最小值:

kafka-run-class.sh kafka.tools.GetOffsetShell --topic hive-mdatabase-hostsltable --time -1 --broker-list 127.0.0.1:9092 --partitions 0

21. 查看__consumer_offsets

需consumer.properties中設置exclude.internal.topics=false:

1) 0.11.0.0之前版本

kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

2) 0.11.0.0之后版本(含)

kafka-console-consumer.sh --topic __consumer_offsets --zookeeper localhost:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

22. 獲取指定consumer group的位移信息

需consumer.properties中設置exclude.internal.topics=false:

1) 0.11.0.0版本之前:

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

2) 0.11.0.0版本以后(含):

kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9091,localhost:9092,localhost:9093 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

23. 20) 查看kafka的zookeeper

1) 查看Kakfa在zookeeper的根目錄

[zk: localhost:2181(CONNECTED) 0] ls /kafka

[cluster, controller_epoch, controller, brokers, admin, isr_change_notification, consumers, config]

 

2) 查看brokers

[zk: localhost:2181(CONNECTED) 1] ls /kafka/brokers

[ids, topics, seqid]

 

3) 查看有哪些brokers(214和215等為server.properties中配置的broker.id值):

[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[214, 215, 138, 139]

 

4) 查看broker 214,下列數據顯示該broker沒有設置JMX_PORT:

[zk: localhost:2181(CONNECTED) 4] get /kafka/brokers/ids/214

{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://test-204:9092"],"jmx_port":-1,"host":"test-204","timestamp":"1498467464861","port":9092,"version":4}

cZxid = 0x200002400

ctime = Mon Jun 26 16:57:44 CST 2017

mZxid = 0x200002400

mtime = Mon Jun 26 16:57:44 CST 2017

pZxid = 0x200002400

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x45b9d9e841f0136

dataLength = 190

numChildren = 0

 

5) 查看controller,下列數據顯示broker 214為controller:

[zk: localhost:2181(CONNECTED) 9] get /kafka/controller

{"version":1,"brokerid":214,"timestamp":"1498467946988"}

cZxid = 0x200002438

ctime = Mon Jun 26 17:05:46 CST 2017

mZxid = 0x200002438

mtime = Mon Jun 26 17:05:46 CST 2017

pZxid = 0x200002438

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x45b9d9e841f0136

dataLength = 56

numChildren = 0

 

6) 查看kafka集群的id:

[zk: localhost:2181(CONNECTED) 13] get /kafka/cluster/id

{"version":"1","id":"OCAEJy4qSf29bhwOfO7kNQ"}

cZxid = 0x2000023e7

ctime = Mon Jun 26 16:57:28 CST 2017

mZxid = 0x2000023e7

mtime = Mon Jun 26 16:57:28 CST 2017

pZxid = 0x2000023e7

cversion = 0

dataVersion = 0

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 45

numChildren = 0

 

7) 查看有哪些topics:

[zk: localhost:2181(CONNECTED) 16] ls /kafka/brokers/topics

[test, my-replicated-topic, test1, test2, test3, test123, __consumer_offsets, info]

 

8) 查看topic下有哪些partitions:

[zk: localhost:2181(CONNECTED) 19] ls /kafka/brokers/topics/__consumer_offsets/partitions

[44, 45, 46, 47, 48, 49, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43]

 

9) 查看“partition 0”的狀態:

[zk: localhost:2181(CONNECTED) 22] get /kafka/brokers/topics/__consumer_offsets/partitions/0/state

{"controller_epoch":2,"leader":215,"version":1,"leader_epoch":1,"isr":[215,214]}

cZxid = 0x2000024c6

ctime = Mon Jun 26 18:02:07 CST 2017

mZxid = 0x200bc4fc3

mtime = Mon Aug 27 18:58:10 CST 2018

pZxid = 0x2000024c6

cversion = 0

dataVersion = 1

aclVersion = 0

ephemeralOwner = 0x0

dataLength = 80

numChildren = 0

24. 如何增加__consumer_offsets的副本數?

可使用kafka-reassign-partitions.sh來增加__consumer_offsets的副本數,方法如下,構造一JSON文件reassign.json:

{

    "version":1,

    "partitions":[

        {"topic":"__consumer_offsets","partition":0,"replicas":[1,2,3]}, 

        {"topic":"__consumer_offsets","partition":1,"replicas":[2,3,1]},

        {"topic":"__consumer_offsets","partition":2,"replicas":[3,1,2]},

        {"topic":"__consumer_offsets","partition":3,"replicas":[1,2,3]},

        ...

        {"topic":"__consumer_offsets","partition":100,"replicas":[2,3,1]}

    ]

}

 

然后執行:

kafka-reassign-partitions.sh --zookeeper localhost:2181/kafka --reassignment-json-file reassign.json --execute

 

“[1,2,3]”中的數字為broker.id值。

 

如果執行報錯“Partitions reassignment failed due to Partition reassignment data file is empty”,可能是因為reasign.json文件格式不對,比如成下列格式了(中間的沒有以逗號結尾):

{"topic":"__consumer_offsets","partition":29,"replicas":[5,3,2]},

{"topic":"__consumer_offsets","partition":30,"replicas":[1,4,3]}

{"topic":"__consumer_offsets","partition":31,"replicas":[2,5,4]}

{"topic":"__consumer_offsets","partition":32,"replicas":[3,2,5]}

{"topic":"__consumer_offsets","partition":33,"replicas":[4,3,1]},

 

如果執行遇到下列錯誤:

Partitions reassignment failed due to Partition replica lists may not contain duplicate entries: __consumer_offsets-16 contains multiple entries for 2. __consumer_offsets-39 contains multiple entries for 2. __consumer_offsets-40 contains multiple entries for 3. __consumer_offsets-44 contains multiple entries for 3

 

原因是一個分區的兩個副本被指定在同一個broker上,以16號分區為列,有兩個副本落在了broker 2上:

{"topic":"__consumer_offsets","partition":16,"replicas":[2,5,2]},

 

執行成功后的輸出:

$ ../bin/kafka-reassign-partitions.sh --zookeeper 192.168.1.35.31:2181/kafka --reassignment-json-file __consumer_offsets.reassign --execute

Current partition replica assignment

 

{"version":1,"partitions":[{"topic":"__consumer_offsets","partition":22,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":30,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":8,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":21,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":4,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":27,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":7,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":9,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":46,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":25,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":35,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":41,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":33,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":23,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":49,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":47,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":16,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":28,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":31,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":36,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":42,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":3,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":18,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":37,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":15,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":24,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":38,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":17,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":48,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":19,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":11,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":13,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":43,"replicas":[3],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":6,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":14,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":20,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":0,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":44,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":39,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":12,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":45,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":5,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":26,"replicas":[1],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":29,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":34,"replicas":[4],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":10,"replicas":[5],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":32,"replicas":[2],"log_dirs":["any"]},{"topic":"__consumer_offsets","partition":40,"replicas":[5],"log_dirs":["any"]}]}

 

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions.

25. 問題

1) -190,Local: Unknown partition

比如單機版只有一個分區,但prodcue參數的分區值為1等。

 

2) Rdkafka程序日志“delivery failed. errMsg:[Local: Message timed out]”

同一個程序,在有些機器上會這個錯誤,有些機器則工作正常,相關的issues:

https://github.com/edenhill/librdkafka/issues/474

實測是因為在運行Kafka應用程序的機器上沒有配置Kafka Brokers機器的hosts。

另外的解決辦法是在server.properties配置listeners和advertised.listeners,並且使用IP而不是hostname作為值。

 

3) Name or service not known (after 9020595078ms in state INIT)

event_callback: type(0), severity(3), (-193)kafka-204:9092/214: Failed to resolve 'kafka-204:9092': Name or service not known (after 9020595078ms in state INIT)

原因是運行kafka應用程序(非kafka本身)的機器不能識別主機名kafka-204(Kafka Brokers機器可以識別),

解決辦法是在server.properties配置listeners和advertised.listeners,並且使用IP而不是hostname作為值。

附1:進程監控工具process_monitor.sh

process_monitor.sh為shell腳本,本身含詳細的使用說明和幫助提示。適合放在crontab中,檢測到進程不在時,3秒左右時間重拉起。支持不同用戶運行相同程序,也支持同一用戶帶不同參數運行相同程序。

下載網址:

https://github.com/eyjian/libmooon/blob/master/shell/process_monitor.sh

 

使用示例:

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"

 

由於所有的java程序均運行在JVM中,所以程序名均為java,“kafkaServer”用於限定只監控kafka。如果同一用戶運行多個kafka實例,則需加端口號區分,並且要求端口號為命令行參數,和“kafkaServer”共同組成匹配模式。

當檢測到進程不存在時,則執行第三列的重啟指令“/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties”。

 

使用示例2,監控zooekeeper

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java -Dzookeeper" "/data/zookeeper/bin/zkServer.sh start"

附2:批量操作工具

適用用來批量安裝kafka和日常運維。

下載網址:

https://github.com/eyjian/libmooon/releases

 

監控工具有兩個版本:一是C++版本,另一是GO版本。當前C++版本比較成熟,GO版本相當簡略,但C++版本依賴C++運行時庫,不同環境需要特定編譯,而GO版本可不依賴C和C++運行時庫,所以不需編譯即可應用到廣泛的Linux環境。

 

使用簡單,直接執行命令,即會提示用法。

附2.1:批量執行命令工具:mooon_ssh

參數名

默認值

說明

-u

用戶名參數,可用環境變量U替代

-p

密碼參數,可用環境變量P替代

-h

IP列表參數,可用環境變量H替代

-P

22,可修改源碼,編譯為常用端口號

SSH端口參數,可用環境變量PORT替代

-c

在遠程機器上執行的命令,建議單引號方式指定值,除非要執行的命令本身已經包含了單引號有沖突。使用雙引號時,要注意轉義,否則會被本地shell解釋

-v

1

工具輸出的詳細度

附2.2:批量上傳文件工具:mooon_upload

參數名

默認值

說明

-u

用戶名參數,可用環境變量U替代

-p

密碼參數,可用環境變量P替代

-h

IP列表參數,可用環境變量H替代

-P

22,可修改源碼,編譯為常用端口號

SSH端口參數,可用環境變量PORT替代

-s

以逗號分隔的,需要上傳的本地文件列表,可以帶相對或絕對目錄

-d

文件上傳到遠程機器的目錄,只能為單個目錄

附2.3:使用示例

1) 使用示例1:上傳/etc/hosts

mooon_upload -s=/etc/hosts -d=/etc

 

2) 使用示例2:檢查/etc/profile文件是否一致

mooon_ssh -c='md5sum /etc/hosts'

 

3) 使用示例3:批量查看crontab

mooon_ssh -c='crontab -l'

 

4) 使用示例4:批量清空crontab

mooon_ssh -c='rm -f /tmp/crontab.empty;touch /tmp/crontab.empty'

mooon_ssh -c='crontab /tmp/crontab.emtpy'

 

5) 使用示例5:批量更新crontab

mooon_ssh -c='crontab /tmp/crontab.online'

 

6) 使用示例6:取遠端機器IP

因為awk用單引號,所以參數“-c”的值不能使用單引號,所以內容需要轉義,相對其它來說要復雜點:

mooon_ssh -c="netstat -ie | awk -F[\\ :]+ 'BEGIN{ok=0;}{if (match(\$0, \"eth1\")) ok=1; if ((1==ok) && match(\$0,\"inet\")) { ok=0; if (7==NF) printf(\"%s\\n\",\$3); else printf(\"%s\\n\",\$4);} }'"

 

不同的環境,IP在“netstat -ie”輸出中的位置稍有不同,所以awk中加了“7==NF”判斷,但仍不一定適用於所有的環境。需要轉義的字符包含:雙引號、美元符和斜杠。

 

7) 使用示例7:批量查看kafka進程(環境變量方式)

$ export H=192.168.31.9,192.168.31.10,192.168.31.11,192.168.31.12,192.168.31.13

$ export U=kafka

$ export P='123456'

 

$ mooon_ssh -c='/usr/local/jdk/bin/jps -m'

[192.168.31.15]

50928 Kafka /data/kafka/config/server.properties

125735 Jps -m

[192.168.31.15] SUCCESS

 

[192.168.31.16]

147842 Jps -m

174902 Kafka /data/kafka/config/server.properties

[192.168.31.16] SUCCESS

 

[192.168.31.17]

51409 Kafka /data/kafka/config/server.properties

178771 Jps -m

[192.168.31.17] SUCCESS

 

[192.168.31.18]

73568 Jps -m

62314 Kafka /data/kafka/config/server.properties

[192.168.31.18] SUCCESS

 

[192.168.31.19]

123908 Jps -m

182845 Kafka /data/kafka/config/server.properties

[192.168.31.19] SUCCESS

 

 

================================

[192.168.31.15 SUCCESS] 0 seconds

[192.168.31.16 SUCCESS] 0 seconds

[192.168.31.17 SUCCESS] 0 seconds

[192.168.31.18 SUCCESS] 0 seconds

[192.168.31.19 SUCCESS] 0 seconds

SUCCESS: 5, FAILURE: 0

 

8) 使用示例8:批量停止kafka進程(參數方式)

$ mooon_ssh -c='/data/kafka/bin/kafka-server-stop.sh' -u=kafka -p='123456' -h=192.168.31.15,192.168.31.16,192.168.31.17,192.168.31.18,192.168.31.19

[192.168.31.15]

No kafka server to stop

command return 1

 

[192.168.31.16]

No kafka server to stop

command return 1

 

[192.168.31.17]

No kafka server to stop

command return 1

 

[192.168.31.18]

No kafka server to stop

command return 1

 

[192.168.31.19]

No kafka server to stop

command return 1

 

================================

[192.168.31.15 FAILURE] 0 seconds

[192.168.31.16 FAILURE] 0 seconds

[192.168.31.17 FAILURE] 0 seconds

[192.168.31.18 FAILURE] 0 seconds

[192.168.31.19 FAILURE] 0 seconds

SUCCESS: 0, FAILURE: 5

附3:批量設置broker.id和listeners工具

為shell腳本,有詳細的使用說明和幫助提示,依賴mooon_ssh和mooon_upload:

https://github.com/eyjian/libmooon/blob/master/shell/set_kafka_id_and_ip.sh

附4:批量設置hostname工具

為shell腳本,有詳細的使用說明和幫助提示,依賴mooon_ssh和mooon_upload:

https://github.com/eyjian/libmooon/blob/master/shell/set_hostname.sh

附5:Kafka監控工具kafka-manager

官網:https://github.com/yahoo/kafka-manager

kafka-manager的數據主要來源兩個方便:一是kafka的zookeeper數據,二是kafka的JMX數據。

 

kafka-manager要求JDK版本不低於1.8,從源碼編譯kafka-manager相對復雜,但編譯拿到二進制包后,只需修改application.conf中的“kafka-manager.zkhosts”值,即可開始啟動kafka-manager。“kafka-manager.zkhosts”值,不是kafka的zookeeper配置值,而是kafka-manager自己用的zookeeper配置,所以兩者可以為不同的zookeeper,注意值用雙引號引起來。

 

crontab啟動示例:

JMX_PORT=9999

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"

 

指定JMX_PORT不是必須的,但建議設置,這樣kafka-manager可以更詳細的查看brokers。

crontab中啟動kafka-manager示例(指定服務端口為8080,不指定的默認值為9000):

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafka-manager" "/data/kafka/kafka-manager/bin/kafka-manager -Dconfig.file=/data/kafka/kafka-manager/conf/application.conf -Dhttp.port=8080 > /dev/null 2>&1"

 

process_monitor.sh下載:

https://github.com/eyjian/libmooon/blob/master/shell/process_monitor.sh

注意crontab的用戶密碼有效,crontab才能正常執行。

附6:kafka的安裝

最基本的兩個配置項為server.properties文件中的:

1) Broker.id

2) zookeeper.connect

 

其中broker.id每個節點要求不同,zookeeper.connect值建議指定目錄,不要直接放在zookeeper根目錄下。另外也建議設置listeners值,不然需要客戶端配置hostname和IP的映射關系。

因broker.id和listeners的原因,每個節點的server.properties不一致,可利用工具set_kafka_id_and_ip.sh實現批量的替換,以簡化kafka集群的部署。set_kafka_id_and_ip.sh下載地址:https://github.com/eyjian/libmooon/blob/master/shell/set_kafka_id_and_ip.sh。

 

crontab中啟動kafka示例:

JMX_PORT=9999

* * * * * /usr/local/bin/process_monitor.sh "/usr/local/jdk/bin/java kafkaServer" "/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties"

 

設置JMX_PORT是為方便kafka-manager管理kafka。

附7:__consumer_offsets

__consumer_offsets是kafka內置的Topic,在0.9.0.0之后的Kafka,將topic的offset 信息由之前存儲在zookeeper上改為存儲到內置的__consumer_offsets中。

server.properties中的配置項num.partitions和default.replication.factor對__consumer_offsets無效,而是受offsets.topic.num.partitions和offsets.topic.replication.factor兩個控制。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM