kafka集群(zookeeper)


 

部署環境准備

kafka集群部署
ip地址            主機名          安裝軟件
10.0.0.131      mcwkafka01        zookeeper、kafka
10.0.0.132        mcwkafka02        zookeeper、kafka
10.0.0.133        mcwkafka03        zookeeper、kafka
10.0.0.131      kafka-manager     kafka-manager
                                  

查看防火牆,都已關閉
[root@mcwkafka01 ~]$ /etc/init.d/iptables status
-bash: /etc/init.d/iptables: No such file or directory
[root@mcwkafka01 ~]$ getenforce
Disabled
[root@mcwkafka01 ~]$ systemctl status firewalld.service 
● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; enabled; vendor preset: enabled)
   Active: inactive (dead) since Tue 2022-02-08 01:35:17 CST; 35s ago

做hosts解析
[root@mcwkafka01 ~]$ tail -3 /etc/hosts
10.0.0.131      mcwkafka01
10.0.0.132    mcwkafka02
10.0.0.133    mcwkafka03


三個節點都部署jdk環境
[root@mcwkafka02 ~]$ unzip jdk1.8.0_202.zip
[root@mcwkafka02 ~]$ ls
anaconda-ks.cfg  jdk1.8.0_202  jdk1.8.0_202.zip  mcw4
[root@mcwkafka02 ~]$ 
[root@mcwkafka02 ~]$ mv jdk1.8.0_202 jdk
[root@mcwkafka02 ~]$ ls
anaconda-ks.cfg  jdk  jdk1.8.0_202.zip  mcw4
[root@mcwkafka02 ~]$ mv jdk /opt/
[root@mcwkafka02 ~]$ vim /etc/profile 
[root@mcwkafka02 ~]$ source /etc/profile
[root@mcwkafka02 ~]$ java -version
java version "1.8.0_202"
Java(TM) SE Runtime Environment (build 1.8.0_202-b08)
Java HotSpot(TM) 64-Bit Server VM (build 25.202-b08, mixed mode)
[root@mcwkafka02 ~]$ 

安裝及配置zk(三台機器做如下同樣操作)

[root@mcwkafka01 /usr/local/src]$ cd /usr/local/src/
[root@mcwkafka01 /usr/local/src]$ wget http://apache.forsale.plus/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz

1)安裝三個節點的zookeeper
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin.tar.gz
[root@mcwkafka01 /usr/local/src]$ tar xf apache-zookeeper-3.5.9-bin.tar.gz
[root@mcwkafka01 /usr/local/src]$ mkdir /data
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin  apache-zookeeper-3.5.9-bin.tar.gz
[root@mcwkafka01 /usr/local/src]$ mv apache-zookeeper-3.5.9-bin /data/zk
[root@mcwkafka01 /usr/local/src]$ 

修改三個節點的zookeeper的配置文件,內容如下所示:
[root@mcwkafka01 /usr/local/src]$ mkdir -p /data/zk/data
[root@mcwkafka01 /usr/local/src]$ ls /data/zk/
bin  conf  data  docs  lib  LICENSE.txt  NOTICE.txt  README.md  README_packaging.txt
[root@mcwkafka01 /usr/local/src]$ ls /data/zk/conf/
configuration.xsl  log4j.properties  zoo_sample.cfg
[root@mcwkafka01 /usr/local/src]$ ls /data/bin
ls: cannot access /data/bin: No such file or directory
[root@mcwkafka01 /usr/local/src]$ ls /data/zk/bin/
README.txt    zkCli.cmd  zkEnv.cmd  zkServer.cmd            zkServer.sh          zkTxnLogToolkit.sh
zkCleanup.sh  zkCli.sh   zkEnv.sh   zkServer-initialize.sh  zkTxnLogToolkit.cmd
[root@mcwkafka01 /usr/local/src]$ cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo_sample.cfg.bak
[root@mcwkafka01 /usr/local/src]$ cp /data/zk/conf/zoo_sample.cfg /data/zk/conf/zoo.cfg
[root@mcwkafka01 /usr/local/src]$ vim /data/zk/conf/zoo.cfg ##清空之前的內容,配置成下面內容
[root@mcwkafka01 /usr/local/src]$ cat /data/zk/conf/zoo.cfg 
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zk/data/zookeeper
dataLogDir=/data/zk/data/logs
clientPort=2181
maxClientCnxns=60
autopurge.snapRetainCount=3
autopurge.purgeInterval=1
server.1=10.0.0.131:2888:3888
server.2=10.0.0.132:2888:3888
server.3=10.0.0.133:2888:3888
[root@mcwkafka01 /usr/local/src]$ 
===============
配置參數說明:
server.id=host:port:port:表示了不同的zookeeper服務器的自身標識,作為集群的一部分,每一台服務器應該知道其他服務器的信息。
用戶可以從"server.id=host:port:port" 中讀取到相關信息。
在服務器的data(dataDir參數所指定的目錄)下創建一個文件名為myid的文件,這個文件的內容只有一行,指定的是自身的id值。
比如,服務器"1"應該在myid文件中寫入"1"。這個id必須在集群環境中服務器標識中是唯一的,且大小在1~255之間。
這一樣配置中,zoo1代表第一台服務器的IP地址。第一個端口號(port)是從follower連接到leader機器的端口,第二個端口是用來進行leader選舉時所用的端口。
所以,在集群配置過程中有三個非常重要的端口:clientPort=2181、port:2888、port:3888===============
 
注意:如果想更換日志輸出位置,除了在zoo.cfg加入"dataLogDir=/data/zk/data/logs"外,還需要修改zkServer.sh文件,大概修改方式地方在
125行左右,內容如下:
[root@mcwkafka03 /usr/local/src]$ cp /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak
[root@mcwkafka03 /usr/local/src]$ vim /data/zk/bin/zkServer.sh
.......
125 ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"     #添加這一行
126 if [ ! -w "$ZOO_LOG_DIR" ] ; then
127 mkdir -p "$ZOO_LOG_DIR"
128 fi
 [root@mcwkafka03 /usr/local/src]$ diff /data/zk/bin/zkServer.sh /data/zk/bin/zkServer.sh.bak
125d124
<   ZOO_LOG_DIR="$($GREP "^[[:space:]]*dataLogDir" "$ZOOCFG" | sed -e 's/.*=//')"

在啟動zookeeper服務之前,還需要分別在三個zookeeper節點機器上創建myid,方式如下:
[root@mcwkafka01 /usr/local/src]$ mkdir /data/zk/data/zookeeper/
[root@mcwkafka01 /usr/local/src]$ echo 1 >  /data/zk/data/zookeeper/myid


另外兩個節點的myid分別為2、3(注意這三個節點機器的myid決不能一樣,配置文件等其他都是一樣配置)
[root@mcwkafka02 /usr/local/src]$ echo 2 >  /data/zk/data/zookeeper/myid
[root@mcwkafka03 /usr/local/src]$ echo 3 >  /data/zk/data/zookeeper/myid

啟動三個節點的zookeeper服務
[root@mcwkafka01 /usr/local/src]$  /data/zk/bin/zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@mcwkafka01 /usr/local/src]$ ps -ef|grep zookeeper
root       6997      1 58 21:53 pts/0    00:00:25 /opt/jdk/bin/java -Dzookeeper.log.dir=/data/zk/bin/../logs -Dzookeeper.log.file=zookeeper-root-server-mcwkafka01.log -Dzookeepe.root.logger=INFO,CONSOLE -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError=kill -9 %p -cp /data/zk/bin/../zookeeper-server/target/classes:/data/zk/bin/../build/classes:/data/zk/bin/../zookeeper-server/target/lib/*.jar:/data/zk/bin/../build/lib/*.jar:/data/zk/bin/../lib/zookeeper-jute-3.5.9.jar:/data/zk/bin/../lib/zookeeper-3.5.9.jar:/data/zk/bin/../lib/slf4j-log4j12-1.7.25.jar:/data/zk/bin/../lib/slf4j-api-1.7.25.jar:/data/zk/bin/../lib/netty-transport-native-unix-common-4.1.50.Final.jar:/data/zk/bin/../lib/netty-transport-native-epoll-4.1.50.Final.jar:/data/zk/bin/../lib/netty-transport-4.1.50.Final.jar:/data/zk/bin/../lib/netty-resolver-4.1.50.Final.jar:/data/zk/bin/../lib/netty-handler-4.1.50.Final.jar:/data/zk/bin/../lib/netty-common-4.1.50.Final.jar:/data/zk/bin/../lib/netty-codec-4.1.50.Final.jar:/data/zk/bin/../lib/netty-buffer-4.1.50.Final.jar:/data/zk/bin/../lib/log4j-1.2.17.jar:/data/zk/bin/../lib/json-simple-1.1.1.jar:/data/zk/bin/../lib/jline-2.14.6.jar:/data/zk/bin/../lib/jetty-util-ajax-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-util-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-servlet-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-server-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-security-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-io-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-http-9.4.35.v20201120.jar:/data/zk/bin/../lib/javax.servlet-api-3.1.0.jar:/data/zk/bin/../lib/jackson-databind-2.10.5.1.jar:/data/zk/bin/../lib/jackson-core-2.10.5.jar:/data/zk/bin/../lib/jackson-annotations-2.10.5.jar:/data/zk/bin/../lib/commons-cli-1.2.jar:/data/zk/bin/../lib/audience-annotations-0.5.0.jar:/data/zk/bin/../zookeeper-*.jar:/data/zk/bin/../zookeeper-server/src/main/resources/lib/*.jar:/data/zk/bin/../conf:.:/opt/jdk/lib:/opt/jdk/jre/lib:/opt/jdk/lib/tools.jar -Xmx1000m -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=false org.apache.zookeeper.server.quorum.QuorumPeerMain /data/zk/bin/../conf/zoo.cfg
root       7033   4049  0 21:54 pts/0    00:00:00 grep --color=auto zookeeper
[root@mcwkafka01 /usr/local/src]$ lsof -i:2181
-bash: lsof: command not found

查看三個節點的zookeeper角色
[root@mcwkafka01 /usr/local/src]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@mcwkafka02 /usr/local/src]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

[root@mcwkafka03 /usr/local/src]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

2)安裝kafka(三個節點同樣操作)

下載地址:http://mirrors.shu.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
[root@mcwkafka01 /usr/local/src]$ wget http://mirrors.shu.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin.tar.gz  kafka_2.11-1.0.1.tgz
[root@mcwkafka01 /usr/local/src]$ tar xf kafka_2.11-1.0.1.tgz
[root@mcwkafka01 /usr/local/src]$ mv kafka_2.11-1.0.1 /data/kafka

進入kafka下面的config目錄,修改配置文件server.properties:
[root@mcwkafka03 /usr/local/src]$ cp /data/kafka/config/server.properties /data/kafka/config/server.properties.bak
[root@mcwkafka03 /usr/local/src]$ ls /data/kafka/
bin  config  libs  LICENSE  NOTICE  site-docs
[root@mcwkafka03 /usr/local/src]$ ls /data/kafka/config/
connect-console-sink.properties    connect-file-sink.properties    connect-standalone.properties  producer.properties    tools-log4j.properties
connect-console-source.properties  connect-file-source.properties  consumer.properties            server.properties      zookeeper.properties
connect-distributed.properties     connect-log4j.properties        log4j.properties               server.properties.bak
[root@mcwkafka03 /usr/local/src]$ ls /data/kafka/bin/
connect-distributed.sh        kafka-consumer-groups.sh             kafka-reassign-partitions.sh    kafka-streams-application-reset.sh  zookeeper-server-start.sh
connect-standalone.sh         kafka-consumer-perf-test.sh          kafka-replay-log-producer.sh    kafka-topics.sh                     zookeeper-server-stop.sh
kafka-acls.sh                 kafka-delete-records.sh              kafka-replica-verification.sh   kafka-verifiable-consumer.sh        zookeeper-shell.sh
kafka-broker-api-versions.sh  kafka-log-dirs.sh                    kafka-run-class.sh              kafka-verifiable-producer.sh
kafka-configs.sh              kafka-mirror-maker.sh                kafka-server-start.sh           trogdor.sh
kafka-console-consumer.sh     kafka-preferred-replica-election.sh  kafka-server-stop.sh            windows
kafka-console-producer.sh     kafka-producer-perf-test.sh          kafka-simple-consumer-shell.sh  zookeeper-security-migration.sh
[root@mcwkafka03 /usr/local/src]$ 
[root@mcwkafka01 /usr/local/src]$ vim /data/kafka/config/server.properties
[root@mcwkafka01 /usr/local/src]$ cat /data/kafka/config/server.properties
broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://10.0.0.131:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0



其他兩個節點的server.properties只需要修改下面兩行,其他配置都一樣
[root@mcwkafka02 /usr/local/src]$ egrep  "listeners|broker" /data/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://10.0.0.132:9092


[root@mcwkafka03 /usr/local/src]$ egrep  "listeners|broker" /data/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://10.0.0.133:9092


啟動三個節點的kafka服務
[root@mcwkafka01 /usr/local/src]$ nohup /data/kafka/bin/kafka-server-start.sh /data/kafka/config/server.properties >/dev/null 2>&1 &
[1] 7224
[root@mcwkafka01 /usr/local/src]$ ls
apache-zookeeper-3.5.9-bin.tar.gz  kafka_2.11-1.0.1.tgz
[root@mcwkafka01 /usr/local/src]$ lsof -i:9092
[root@mcwkafka01 /usr/local/src]$ ss -lntup|grep 9092
[root@mcwkafka01 /usr/local/src]$ 
[root@mcwkafka03 /usr/local/src]$ ss -lntup|grep 9092
tcp    LISTEN     0      50        ::ffff:10.0.0.133:9092                 :::*                   users:(("java",pid=5992,fd=103))
[root@mcwkafka02 /usr/local/src]$ ss -lntup|grep 9092
tcp    LISTEN     0      50        ::ffff:10.0.0.132:9092                 :::*                   users:(("java",pid=6766,fd=103))

驗證服務

隨便在其中一台節點主機執行
/data/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181 --replication-factor 1 --partitions 1 --topic test

[root@mcwkafka01 /usr/local/src]$ /data/kafka/bin/kafka-topics.sh --create --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

/data/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181

[root@mcwkafka02 /usr/local/src]$ /data/kafka/bin/kafka-topics.sh --list --zookeeper 10.0.0.131:2181,10.0.0.132:2181,10.0.0.133:2181
test

到此,kafka集群環境已部署完成!

客戶端連接工具

連接配置:

 

 連接查看,之前命令行創建的test

 某次網絡不通帶來的問題Error contacting service. It is probably not running.

當重新啟動虛擬機后,有個節點進程掛了

 

 

可以看到少了個節點

[root@mcwkafka02 ~]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Error contacting service. It is probably not running.
[root@mcwkafka02 ~]$ ping mcwkafka02
PING mcwkafka02 (10.0.0.132) 56(84) bytes of data.
^C
--- mcwkafka02 ping statistics ---
9 packets transmitted, 0 received, 100% packet loss, time 8009ms

[root@mcwkafka02 ~]$ systemctl restart network
[root@mcwkafka02 ~]$ vim /etc/resolv.conf 
[root@mcwkafka02 ~]$ ping mcwkafka02
PING mcwkafka02 (10.0.0.132) 56(84) bytes of data.
64 bytes from mcwkafka02 (10.0.0.132): icmp_seq=1 ttl=64 time=0.106 ms
^C
--- mcwkafka02 ping statistics ---
[root@mcwkafka02 ~]$ /data/zk/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zk/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader
[root@mcwkafka02 ~]$ 

broker還是只有兩個,后面再看,發現kafka節點進程掛了一個。重新啟動132上的kafka后,broker從2變成了3

 

 

 

 kafka命令行使用

1)創建Topic

kafka-topics.sh --create --bootstrap-server hadoop102:9092 --topic second --partitions 2 --replication-factor 3
kafka-topics.sh --create --zookeeper hadoop102:2181 --topic second --partitions 2 --replication-factor 3
--create  要進行的操作,這里表示創建
--bootstrap-server 把broker注冊信息存儲到kafka集群
--zookeeper 把broker注冊信息存儲到zookeeper集群
--topic 給主題一個名字
--partitions 設置主題分區數
--replication-factor 副本存儲數量,不能大於kafka集群數量
第二行的命令創建topic  
[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-topics.sh --create  --zookeeper mcwkafka02:2181 --topic mcwtopic1 --partitions 2 --replication-factor 3
Created topic "mcwtopic1".
[root@mcwkafka01 ~]$ 

右擊刷新,可以看到剛剛創建的topic

 

 

 

2)查看所有的Topic

kafka-topics.sh --list --bootstrap-server hadoop102:9092
mcwkafka02 zk  leader節點

[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-topics.sh --create --zookeeper mcwkafka02:2181 --topic mcwtopic1 --partitions 2 --replication-factor 3
Created topic "mcwtopic1".
[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-topics.sh --zookeeper mcwkafka02:2181 --list
mcwtopic1
test
[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-topics.sh --list --bootstrap-server mcwkafka02:9092
Exception in thread "main" joptsimple.UnrecognizedOptionException: bootstrap-server is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:352)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:44)
at kafka.admin.TopicCommand.main(TopicCommand.scala)

3)查看topic的詳細信息

kafka-topics.sh --describe --bootstrap-server hadoop102:9092 --topic second
#功能一樣,下面的命令更加詳細
kafka-topics.sh --describe --bootstrap-server hadoop102:9092
#ist屬性表示節點數
 
[root@mcwkafka01 ~]$  /data/kafka/bin/kafka-topics.sh --describe --zookeeper  mcwkafka02:2181 --topic mcwtopic1  #查詢指定topic
Topic:mcwtopic1    PartitionCount:2    ReplicationFactor:3    Configs:
    Topic: mcwtopic1    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: mcwtopic1    Partition: 1    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
[root@mcwkafka01 ~]$  /data/kafka/bin/kafka-topics.sh --describe --zookeeper  mcwkafka02:2181 --topic xxx  #查詢不存在的topic
[root@mcwkafka01 ~]$ 
[root@mcwkafka01 ~]$  /data/kafka/bin/kafka-topics.sh --describe --zookeeper mcwkafka02:2181  #沒指定就查詢所有的topic
Topic:mcwtopic1    PartitionCount:2    ReplicationFactor:3    Configs:
    Topic: mcwtopic1    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: mcwtopic1    Partition: 1    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0
[root@mcwkafka01 ~]$ 

 

4)修改topic

#把分區數量調大
kafka-topics.sh --alter --bootstrap-server hadoop102:9092 --topic first --partitions 2
現在是2
[root@mcwkafka01 ~]$  /data/kafka/bin/kafka-topics.sh --describe --zookeeper  mcwkafka02:2181 --topic mcwtopic1
Topic:mcwtopic1    PartitionCount:2    ReplicationFactor:3    Configs:
    Topic: mcwtopic1    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: mcwtopic1    Partition: 1    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0

分區修改為3

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
[root@mcwkafka01 ~]$  /data/kafka/bin/kafka-topics.sh --describe --zookeeper  mcwkafka02:2181 --topic mcwtopic1
Topic:mcwtopic1    PartitionCount:3    ReplicationFactor:3    Configs:
    Topic: mcwtopic1    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2
    Topic: mcwtopic1    Partition: 1    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    Topic: mcwtopic1    Partition: 2    Leader: 2    Replicas: 2,0,1    Isr: 2,0,1

右擊刷新分區才能出來,刷新topics不行

 

 

 

5)刪除topic

當kafka是舊版本時,需要先更改server.propertites里的文件,再刪除

delete.topic.enable=true
kafka-topics.sh --delete --bootstrap-server hadoop102:9092 --topic first

[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-topics.sh --zookeeper mcwkafka02:2181 --list
mcwtopic1
test
[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-topics.sh --delete --zookeeper mcwkafka02:2181 --topic test
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-topics.sh --zookeeper mcwkafka02:2181 --list
mcwtopic1
[root@mcwkafka01 ~]$

已刪除

6)生產者

kafka-console-producer.sh --topic second --broker-list hadoop102:9092
--topic    指定生產的數據流向的topic
--broker-list 指定生產者的數據流向的broker
寫上不行,還得按下enter鍵才能發出這行消息,>后一行代表一個消息,enter鍵后就發出消息
[root@mcwkafka01 ~]$  /data/kafka/bin/kafka-topics.sh --zookeeper mcwkafka02:2181 --list
mcwtopic1
[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-console-producer.sh --topic mcwtopic1 --broker-list mcwkafka02:9092
>wo shi
mac>hangwei^H^H^H^C[root@mcwkafka01 ~]$ ^C
[root@mcwkafka01 ~]$ 
[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-console-producer.sh --topic mcwtopic1 --broker-list mcwkafka02:9092
>wo shi
>machangwei
>

可以從工具中看到消息

 

 把這個topic的屬性中修改為字符串,點擊更新

 然后就可以看到這個消息解碼后的內容了

 選中消息,可以看道下面有消息的內容,可以全選消息內容復制出來

 partitions可以看到分區對應的kafka節點,及時命令行ctrl+c退出>消息的編輯,但是發出去的消息還在kafka里面的

7)消費者以及工具使用

kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092
--topic 執行要消費的分區
--boostrap-server 要消費的broker
#上面的寫法只能檢測當前消費者啟動后的數據,如果想檢測所有數據,使用如下寫法
kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning

#消費消息

kafka-console-consumer.sh --zookeeper hadoop-001:2181 --from-beginning --topic first

當執行了消費者之后,如果沒有消息產生,它一直卡在這里不動,監聽新的消息到來

[root@mcwkafka01 ~]$ /data/kafka/bin/kafka-console-consumer.sh --zookeeper 10.0.0.132:2181 --from-beginning --topic mcwtopic1
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
machangwei
wo shi
wo shi

當生產這這里生產了數據時

 命令行消費者同時監聽到消息並打印了出來

雖然消費了,但是這里消息還是存在的,即使命令行ctrl+c退出了,消息還在kakfa里面,

只看分區0的消息

 

只看分區2的消息

 點擊添加消息:

添加

當點擊了add之后,已經添加了,如果點擊是,可以進行添加下一個

當前所有的消息,分區1是在主節點上,貌似默認它沒選擇zk主節點上去保存消息

 

 

8)疑問:為什么啟動一個新的消費者消費不到topic中已有的消息?

#下面寫法會讀取所有分區的數據,所以數據顯示的順序有可能會變
kafka -console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --from-beginning
可以繼續執行,獲取到消息的

 

 

9)消費者組:

1)啟動消費者指定配置文件,讀取配置文件中的group.id配置的組名

這里的配置文件,指的是kafka-2.4.1/config/consumer.properties

進入配置文件后,可以通過group id配置組名

2)啟動消費者時通過--group來指定組名
​ kafka-console-consumer.sh --topic second --bootstrap-server hadoop102:9092 --group testgroup

 

執行消費組命令

 

 可以看到直接生成兩個新的topic,其中一個有一個partitions,另一個帶下划線的消費組有0-49的partitions.

 

 可以看到它的配置

往topic2里生產一個消息

 

 然后消費組收到了

 

10)從目錄的角度看topic

1)通過存儲的角度觀察
offset文件存儲在每台kafka節點的
datas目錄下,
格式為__consumer_offsets-xxx

 

分區0在主機1節點上有一個副本 ,所以這里顯示topic1-分區0吧,表示節點上有分區0的一個副本

zookeeper命令

[root@mcwkafka03 ~]$ /data/zk/bin/zkCli.sh -server 10.0.0.132:2181  #連接zk
Connecting to 10.0.0.132:2181
2022-02-17 02:10:11,476 [myid:] - INFO  [main:Environment@109] - Client environment:zookeeper.version=3.5.9-83df9301aa5c2a5d284a9940177808c01bc35cef, built on 01/06/2021 19:49 GMT
2022-02-17 02:10:11,481 [myid:] - INFO  [main:Environment@109] - Client environment:host.name=mcwkafka03
2022-02-17 02:10:11,481 [myid:] - INFO  [main:Environment@109] - Client environment:java.version=1.8.0_202
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:java.vendor=Oracle Corporation
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:java.home=/opt/jdk/jre
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:java.class.path=/data/zk/bin/../zookeeper-server/target/classes:/data/zk/bin/../build/classes:/data/zk/bin/../zookeeper-server/target/lib/*.jar:/data/zk/bin/../build/lib/*.jar:/data/zk/bin/../lib/zookeeper-jute-3.5.9.jar:/data/zk/bin/../lib/zookeeper-3.5.9.jar:/data/zk/bin/../lib/slf4j-log4j12-1.7.25.jar:/data/zk/bin/../lib/slf4j-api-1.7.25.jar:/data/zk/bin/../lib/netty-transport-native-unix-common-4.1.50.Final.jar:/data/zk/bin/../lib/netty-transport-native-epoll-4.1.50.Final.jar:/data/zk/bin/../lib/netty-transport-4.1.50.Final.jar:/data/zk/bin/../lib/netty-resolver-4.1.50.Final.jar:/data/zk/bin/../lib/netty-handler-4.1.50.Final.jar:/data/zk/bin/../lib/netty-common-4.1.50.Final.jar:/data/zk/bin/../lib/netty-codec-4.1.50.Final.jar:/data/zk/bin/../lib/netty-buffer-4.1.50.Final.jar:/data/zk/bin/../lib/log4j-1.2.17.jar:/data/zk/bin/../lib/json-simple-1.1.1.jar:/data/zk/bin/../lib/jline-2.14.6.jar:/data/zk/bin/../lib/jetty-util-ajax-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-util-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-servlet-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-server-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-security-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-io-9.4.35.v20201120.jar:/data/zk/bin/../lib/jetty-http-9.4.35.v20201120.jar:/data/zk/bin/../lib/javax.servlet-api-3.1.0.jar:/data/zk/bin/../lib/jackson-databind-2.10.5.1.jar:/data/zk/bin/../lib/jackson-core-2.10.5.jar:/data/zk/bin/../lib/jackson-annotations-2.10.5.jar:/data/zk/bin/../lib/commons-cli-1.2.jar:/data/zk/bin/../lib/audience-annotations-0.5.0.jar:/data/zk/bin/../zookeeper-*.jar:/data/zk/bin/../zookeeper-server/src/main/resources/lib/*.jar:/data/zk/bin/../conf:.:/opt/jdk/lib:/opt/jdk/jre/lib:/opt/jdk/lib/tools.jar
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:java.io.tmpdir=/tmp
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:java.compiler=<NA>
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:os.name=Linux
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:os.arch=amd64
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:os.version=3.10.0-693.el7.x86_64
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:user.name=root
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:user.home=/root
2022-02-17 02:10:11,484 [myid:] - INFO  [main:Environment@109] - Client environment:user.dir=/root
2022-02-17 02:10:11,485 [myid:] - INFO  [main:Environment@109] - Client environment:os.memory.free=13MB
2022-02-17 02:10:11,486 [myid:] - INFO  [main:Environment@109] - Client environment:os.memory.max=247MB
2022-02-17 02:10:11,486 [myid:] - INFO  [main:Environment@109] - Client environment:os.memory.total=15MB
2022-02-17 02:10:11,503 [myid:] - INFO  [main:ZooKeeper@868] - Initiating client connection, connectString=10.0.0.132:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@4c98385c
2022-02-17 02:10:11,523 [myid:] - INFO  [main:X509Util@79] - Setting -D jdk.tls.rejectClientInitiatedRenegotiation=true to disable client-initiated TLS renegotiation
2022-02-17 02:10:11,539 [myid:] - INFO  [main:ClientCnxnSocket@237] - jute.maxbuffer value is 4194304 Bytes
2022-02-17 02:10:11,573 [myid:] - INFO  [main:ClientCnxn@1653] - zookeeper.request.timeout value is 0. feature enabled=
Welcome to ZooKeeper!
2022-02-17 02:10:11,623 [myid:10.0.0.132:2181] - INFO  [main-SendThread(10.0.0.132:2181):ClientCnxn$SendThread@1112] - Opening socket connection to server mcwkafka02/10.0.0.132:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2022-02-17 02:10:12,035 [myid:10.0.0.132:2181] - INFO  [main-SendThread(10.0.0.132:2181):ClientCnxn$SendThread@959] - Socket connection established, initiating session, client: /10.0.0.133:37512, server: mcwkafka02/10.0.0.132:2181
2022-02-17 02:10:12,094 [myid:10.0.0.132:2181] - INFO  [main-SendThread(10.0.0.132:2181):ClientCnxn$SendThread@1394] - Session establishment complete on server mcwkafka02/10.0.0.132:2181, sessionid = 0x200005b7a3d0005, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: 10.0.0.132:2181(CONNECTED) 0] ls /  #查看 ZooKeeper 所包含的內容
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: 10.0.0.132:2181(CONNECTED) 1] help
ZooKeeper -server host:port cmd args
    addauth scheme auth
    close 
    config [-c] [-w] [-s]
    connect host:port
    create [-s] [-e] [-c] [-t ttl] path [data] [acl]
    delete [-v version] path
    deleteall path
    delquota [-n|-b] path
    get [-s] [-w] path
    getAcl [-s] path
    history 
    listquota path
    ls [-s] [-w] [-R] path
    ls2 path [watch]
    printwatches on|off
    quit 
    reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
    redo cmdno
    removewatches path [-c|-d|-a] [-l]
    rmr path
    set [-s] [-v version] path data
    setAcl [-s] [-v version] [-R] path acl
    setquota -n|-b val path
    stat [-w] path
    sync path
Command not found: Command not found help
[zk: 10.0.0.132:2181(CONNECTED) 2] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: 10.0.0.132:2181(CONNECTED) 3] ls brokers
Path must start with / character
[zk: 10.0.0.132:2181(CONNECTED) 4] ls /brokers
[ids, seqid, topics]
[zk: 10.0.0.132:2181(CONNECTED) 5] ls /brokers/ids
[0, 1, 2]
[zk: 10.0.0.132:2181(CONNECTED) 6] ls /brokers/ids/0
[]
[zk: 10.0.0.132:2181(CONNECTED) 7] get /brokers/ids/0 #查看以及獲取
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.131:9092"],"jmx_port":-1,"host":"10.0.0.131","timestamp":"1645063143117","port":9092,"version":4}
[zk: 10.0.0.132:2181(CONNECTED) 8] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.132:9092"],"jmx_port":-1,"host":"10.0.0.132","timestamp":"1645063166889","port":9092,"version":4}
[zk: 10.0.0.132:2181(CONNECTED) 9] get /brokers/ids/3
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/ids/3
[zk: 10.0.0.132:2181(CONNECTED) 10] get /brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.0.0.133:9092"],"jmx_port":-1,"host":"10.0.0.133","timestamp":"1645034387402","port":9092,"version":4}
[zk: 10.0.0.132:2181(CONNECTED) 11] get /brokers/ids/
Path must not end with / character
[zk: 10.0.0.132:2181(CONNECTED) 12] ls cluster
Path must start with / character
[zk: 10.0.0.132:2181(CONNECTED) 13] ls /cluster
[id]
[zk: 10.0.0.132:2181(CONNECTED) 14] ls /cluster/id
[]
[zk: 10.0.0.132:2181(CONNECTED) 15] get /cluster/id
{"version":"1","id":"SoDXd6jIQCuEIfnSv_FQYg"}
[zk: 10.0.0.132:2181(CONNECTED) 16] ls /zookeeper
[config, quota]
[zk: 10.0.0.132:2181(CONNECTED) 17] ls /admin 
[delete_topics]
[zk: 10.0.0.132:2181(CONNECTED) 18] ls /admin/delete_topics 
[]
[zk: 10.0.0.132:2181(CONNECTED) 19] get /admin/delete_topics 
null
[zk: 10.0.0.132:2181(CONNECTED) 20] ls /con
config             consumers          controller         controller_epoch   
[zk: 10.0.0.132:2181(CONNECTED) 20] ls /consumers 
[console-consumer-23437]
[zk: 10.0.0.132:2181(CONNECTED) 21] #創建一個新的 znode ,使用 create /zk myData 。這個命令創建了一個新的 znode 節點“ zk ”以及與它關聯的字符串:
ZooKeeper -server host:port cmd args
    addauth scheme auth
    close 
    config [-c] [-w] [-s]
    connect host:port
    create [-s] [-e] [-c] [-t ttl] path [data] [acl]
    delete [-v version] path
    deleteall path
    delquota [-n|-b] path
    get [-s] [-w] path
    getAcl [-s] path
    history 
    listquota path
    ls [-s] [-w] [-R] path
    ls2 path [watch]
    printwatches on|off
    quit 
    reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]
    redo cmdno
    removewatches path [-c|-d|-a] [-l]
    rmr path
    set [-s] [-v version] path data
    setAcl [-s] [-v version] [-R] path acl
    setquota -n|-b val path
    stat [-w] path
    sync path
Command not found: Command not found #創建一個新的
[zk: 10.0.0.132:2181(CONNECTED) 22] create /zk myData
Created /zk
[zk: 10.0.0.132:2181(CONNECTED) 24] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zk, zookeeper]
[zk: 10.0.0.132:2181(CONNECTED) 25] ls /zk 
[]
[zk: 10.0.0.132:2181(CONNECTED) 26] get /zk
myData
[zk: 10.0.0.132:2181(CONNECTED) 27] ls  /con
config             consumers          controller         controller_epoch   
[zk: 10.0.0.132:2181(CONNECTED) 27] ls  /config 
[changes, clients, topics]
[zk: 10.0.0.132:2181(CONNECTED) 28] ls  /config/topics 
[__consumer_offsets, mcwtopic1, mcwtopic2, test]
[zk: 10.0.0.132:2181(CONNECTED) 29] ls  /config/topics/mcwtopic1 
[]
[zk: 10.0.0.132:2181(CONNECTED) 30] get /config/topics/mcwtopic
mcwtopic1   mcwtopic2   
[zk: 10.0.0.132:2181(CONNECTED) 30] get /config/topics/mcwtopic1
{"version":1,"config":{}}
[zk: 10.0.0.132:2181(CONNECTED) 31] get /zk
myData
[zk: 10.0.0.132:2181(CONNECTED) 32] set /zk mcw220217
[zk: 10.0.0.132:2181(CONNECTED) 33] get /zk
mcw220217
[zk: 10.0.0.132:2181(CONNECTED) 34] delete /zk  #將剛才創建的 znode 刪除
[zk: 10.0.0.132:2181(CONNECTED) 35] get /zk
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /zk
[zk: 10.0.0.132:2181(CONNECTED) 36] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
[zk: 10.0.0.132:2181(CONNECTED) 37] quit  #退出

WATCHER::

WatchedEvent state:Closed type:None path:null
2022-02-17 02:21:43,519 [myid:] - INFO  [main:ZooKeeper@1422] - Session: 0x200005b7a3d0005 closed
2022-02-17 02:21:43,520 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@524] - EventThread shut down for session: 0x200005b7a3d0005
[root@mcwkafka03 ~]$

 

 

 

 

參考:https://www.cnblogs.com/kevingrace/p/9021508.html

https://www.cnblogs.com/traveller-hzq/p/14105128.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM