linux中kafka集群搭建及常用命令


一、安裝zookeeper集群

tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local/
cd /usr/local/
ln -s apache-zookeeper-3.5.5-bin zookeeper

mv zoo_sample.cfg zoo.cfg
vi zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
clientPort=2181
server.1=192.168.1.12:2888:3888
server.2=192.168.1.13:2888:3888
server.3=192.168.1.14:2888:3888

mkdir /tmp/zookeeper/data -p
mkdir /tmp/zookeeper/log -p

#在192.168.1.12上
echo 1 >/tmp/zookeeper/data/myid
#在192.168.1.13上
echo 2 >/tmp/zookeeper/data/myid
#在192.168.1.14上
echo 3 >/tmp/zookeeper/data/myid
vi /etc/profile
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

source /etc/profile
zkServer.sh start
zkServer.sh status

二、安裝Kafka集群

tar -zxvf kafka_2.11-2.2.1.tgz -C /usr/local/
cd /usr/local/
ln -s kafka_2.11-2.2.1 kafka
cd /usr/local/kafka/config
vi server.properties
broker.id=1
listeners=PLAINTEXT://192.168.1.12:9092
log.dirs=/tmp/kafka-logs
#設置可以刪除toptic,如果沒有設置 delete.topic.enable=true,則調用kafka 的delete命令無法真正將topic刪除,而是顯示(marked for deletion)
delete.topic.enable=true
#設置不能自動創建toptic,如果設置為true,則生產者在生產消息時如果toptic不存在則會自動創建
auto.create.topics.enable = false
zookeeper.connect=192.168.1.12:2181,192.168.1.12:2181,192.168.1.12:2181
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties

  

三、Kafka用戶權限管理(SASL)和數據傳輸加密(SSL)

Kafka 權限管理實戰(最全整理)
https://www.cnblogs.com/felixzh/p/13695298.html


Kafka SASL/SCRAM+ACL實現動態創建用戶及權限控制(實現用戶權限控制)
https://blog.csdn.net/ashic/article/details/86661599

Kafka實戰:集群SSL加密認證和配置(最新版kafka-2.7.0)
https://www.cnblogs.com/felixzh/p/14661156.html
kafka學習筆記十kafka-SSL安全認證機制(實現數據安全傳輸)
https://blog.csdn.net/hwhanwan/article/details/82216016


SSL
https://blog.csdn.net/qq_38265137/article/details/90112705
https://blog.csdn.net/Tianlujunr/article/details/82379545?utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control

https://www.cnblogs.com/hofmann/p/11174258.html

  

四、Kafka常用命令

主題topic
a.創建topic
./kafka-topics.sh --create --zookeeper 192.168.1.12:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
./kafka-topics.sh --create --bootstrap-server 192.168.1.12:9092 --replication-factor 1 --partitions 1 --topic my-kafka-toptic1   #--bootstrap-server指定broker地址,可以寫多個地址,用逗號分割,用法和--zookeeper相同


b.查看topic列表
./kafka-topics.sh --list --zookeeper 192.168.1.13:2181
./kafka-topics.sh --list --bootstrap-server 192.168.1.13:9092
 
c.如果需要查看topic的詳細信息,需要使用describe命令
kafka-topics.sh --describe --zookeeper 192.168.1.12:2181 --topic test-topic
 
d.#若不指定topic,則查看所有topic的信息
kafka-topics.sh --describe --zookeeper 192.168.1.12:2181
 
e.刪除topic
kafka-topics.sh --delete --zookeeper 192.168.1.12:2181 --topic my-kafka-topic
#https://blog.csdn.net/qq_39657597/article/details/84307541?utm_term=kafka%E5%88%A0%E9%99%A4%E6%B6%88%E6%81%AF&utm_medium=distribute.pc_aggpage_search_result.none-task-blog-2~all~sobaiduweb~default-0-84307541&spm=3001.4430
-------------------------------------------
生產者
kafka-console-producer.sh --broker-list 192.168.1.12:9092 --topic my-kafka-topic
-------------------------------------------
消費者
kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic my-kafka-topic
# 如果需要從頭開始接收數據,需要添加--from-beginning參數
kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --from-beginning --topic my-kafka-topic
#指定消費者組
./kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic my-kafka-toptic1 --group yingyong1 --from-beginning
查看消費者組
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.12:9092  --list
查看消費者組中消費者消費的分區個數
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.12:9092 --describe --members --group yingyong1
重置offset到指定位置(1、--to-offset 30,2、--to-offset --to-earliest,3、--to-offset --to-current,),前提是沒有消費者在連着
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.12:9092 --group yingyong2 --reset-offsets --topic my-kafka-topic5 --to-offset 30 --execute
讀取指定toptic中的指定分區中的指定offset后面的2條消息
./kafka-console-consumer.sh --bootstrap-server 192.168.1.12:9092 --topic my-kafka-topic5  --partition 1 --offset 10036 --max-messages 2
查看消費者組消息消費情況(查看消息的堆積情況)
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.12:9092 --describe --group yingyong1
TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
my-kafka-topic5 4          25              25              0               -               -               -
my-kafka-topic5 2          26              26              0               -               -               -
my-kafka-topic5 0          25              25              0               -               -               -
my-kafka-topic5 3          26              27              1               -               -               -
my-kafka-topic5 1          25              25              0               -               -               -
注:LAG當前分區中消息堆積個數
	LOG-END-OFFSET當前分區中最后一條消息的位置
	CURRENT-OFFSET消費者組中當前分區已經消費到的位置
	同一個toptic,不同的消費者組中LOG-END-OFFSET是一樣的,而CURRENT-OFFSET是不一樣的。

刪除toptic中的消息
a.查看分區和offset
./kafka-consumer-groups.sh --bootstrap-server 192.168.1.12:9092 --describe --group yingyong1
b.編寫一個json文件
vi /tmp/offsetfile.json
{"partitions": [{"topic": "my-kafka-topic5", "partition": 4, "offset": 27}], "version":1 }
c.執行命令刪除
./kafka-delete-records.sh --bootstrap-server 192.168.1.12:9092 --offset-json-file /tmp/offsetfile.json

#########################################
./kafka-topics.sh --describe  --zookeeper  192.168.1.12:2181
Topic:my-kafka-topic5   PartitionCount:5        ReplicationFactor:2     Configs:
        Topic: my-kafka-topic5  Partition: 0    Leader: 3       Replicas: 2,3   Isr: 3,2
        Topic: my-kafka-topic5  Partition: 1    Leader: 3       Replicas: 3,1   Isr: 3,1
        Topic: my-kafka-topic5  Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: my-kafka-topic5  Partition: 3    Leader: 1       Replicas: 2,1   Isr: 1,2
        Topic: my-kafka-topic5  Partition: 4    Leader: 3       Replicas: 3,2   Isr: 3,2
注:Isr(in-sync Replica)在同步的副本,Replicas指副本在哪個代理上
一個分區只能被一個消費者組中的一個消費者消費,
如果一個消費者組中有5個分區一個消費者,那么這個消費者會消費這五個分區
如果一個消費者組中有5個分區2個個消費者,那么第一個消費者會消費3個分區,另一個消費者會消費2個分區
如果一個消費者組中有5個分區6個個消費者,那么其中5個消費者會分別消費一個,而第六個消費者不會消費數據,當前五個中有退出的時候,第六個消費者才會消費數據
生產者和消費者在連接kafka時,既可以連接kafka服務器的地址,也可以連接zookeeper的地址,且地址可以寫多個中間用逗號隔開即可

  

  

  

  

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM