kafka1 三種模式安裝


一 搭建單節點單broker的kafka集群

注意:請打開不同的終端分別執行以下步驟

1.復制安裝包到/usr/local目錄下,解壓縮,重命名(或者軟鏈接),配置環境變量

[root@hadoop ~]# cd /usr/local/
[root@hadoop local]# tar xzvf kafka_2.11-2.0.0.tgz 
[root@hadoop local]# mv kafka_2.11-2.0.0 kafka
[root@hadoop local]# ln -s kafka_2.11-2.0.0 kafka #軟鏈接或者重命名執行一條即可
[root@hadoop local]# vi /etc/profile
添加變量 export KAFKA_HOME=/usr/local/kafka
在PATH后添加 :$KAFKA_HOME/bin
[root@hadoop local]# source /etc/profile

[root@hadoop kafka]# echo $KAFKA_HOME #查看環境變量
/usr/local/kafka

2.啟動服務器

啟動zookeeper

[root@hadoop kafka]# zookeeper-server-start.sh config/zookeeper.properties
[root@hadoop kafka]# jps #打開另一個終端查看是否啟動成功
3892 Jps
3566 QuorumPeerMain

啟動kafka

[root@hadoop kafka]# kafka-server-start.sh config/server.properties 

3.創建topic

#創建一個分區,一個副本的主題
#副本數無法修改,只能在創建主題時指定
[root@hadoop kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test 
Created topic "test".  

[root@hadoop kafka]# kafka-topics.sh --list --zookeeper localhost:2181 #列出主題
test

可以通過zk的客戶端觀察zk的數據結構

[root@hadoop kafka]# zkCli.sh  -server localhost:2181 #進入zk客戶端
Connecting to localhost:2181
2018-07-31 14:27:24,876 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT
2018-07-31 14:27:24,879 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=hadoop
2018-07-31 14:27:24,880 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.8.0_11
2018-07-31 14:27:24,882 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Oracle Corporation
2018-07-31 14:27:24,882 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/usr/java/jre
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/usr/local/zookeeper/bin/../build/classes:/usr/local/zookeeper/bin/../build/lib/*.jar:/usr/local/zookeeper/bin/../lib/slf4j-log4j12-1.6.1.jar:/usr/local/zookeeper/bin/../lib/slf4j-api-1.6.1.jar:/usr/local/zookeeper/bin/../lib/netty-3.10.5.Final.jar:/usr/local/zookeeper/bin/../lib/log4j-1.2.16.jar:/usr/local/zookeeper/bin/../lib/jline-0.9.94.jar:/usr/local/zookeeper/bin/../zookeeper-3.4.9.jar:/usr/local/zookeeper/bin/../src/java/lib/*.jar:/usr/local/zookeeper/bin/../conf:.:/usr/java/lib/dt.jar:/usr/java/lib/tools.jar:/usr/java/jre/lib
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/usr/local/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=<NA>
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.10.0-514.el7.x86_64
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=root
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/root
2018-07-31 14:27:24,883 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/usr/local/kafka
2018-07-31 14:27:24,888 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@67424e82
Welcome to ZooKeeper!
2018-07-31 14:27:25,037 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1032] - Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
JLine support is enabled
2018-07-31 14:27:25,131 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@876] - Socket connection established to localhost/127.0.0.1:2181, initiating session
2018-07-31 14:27:25,153 [myid:] - INFO  [main-SendThread(localhost:2181):ClientCnxn$SendThread@1299] - Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x10000ded7830002, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]
[zk: localhost:2181(CONNECTED) 1] ls  /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 2] ls /brokers/topics
[test]
[zk: localhost:2181(CONNECTED) 3] get /brokers/topics/test
{"version":1,"partitions":{"0":[0]}}
cZxid = 0x22
ctime = Tue Jul 31 14:22:42 CST 2018
mZxid = 0x22
mtime = Tue Jul 31 14:22:42 CST 2018
pZxid = 0x24
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 36
numChildren = 1
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/test 
[partitions]
[zk: localhost:2181(CONNECTED) 5] ls /brokers/topics/test/partitions
[0]
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/test/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/test/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/test/partitions/0/state
{"controller_epoch":1,"leader":0,"version":1,"leader_epoch":0,"isr":[0]}
cZxid = 0x26
ctime = Tue Jul 31 14:22:42 CST 2018
mZxid = 0x26
mtime = Tue Jul 31 14:22:42 CST 2018
pZxid = 0x26
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 72
numChildren = 0
[zk: localhost:2181(CONNECTED) 9] quit #退出
Quitting...
2018-07-31 15:01:53,761 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x10000ded7830002 closed
2018-07-31 15:01:53,789 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@519] - EventThread shut down for session: 0x10000ded7830002
[root@hadoop kafka]# 
View Code

4.發送消息

[root@hadoop kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello world
>how are you

5.啟動消費者

[root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning 
hello world
how are you

 最后查看一下進程

[root@hadoop kafka]# jps
3905 Kafka
7637 ConsoleConsumer
5702 ConsoleProducer
3566 QuorumPeerMain
9135 Jps

 

二 搭建單節點多broker的kafka集群

首先把上述終端全部關閉,重新打開不同的終端分別執行以下步驟

1.創建並配置多個server配置文件

注意:以下3個broker中,必須有一個broker的端口號是9092。
這個是我踩過的坑,我剛開始建立了3個broker端口號分別為9093,9094,9095,然后消費消息時無論如何都不顯示結果,在這卡了半天。
我猜是因為consumer.properties、producer.properties、connect-distributed.properties等文件中默認bootstrap.servers=localhost:9092,如果3個broker中沒有一個端口號是9092,需要將這些配置文件中的bootstrap.servers參數值的端口號均改為其中一個(比如9093),否則消費者無法消費消息。但是我試過修改這3個配置文件,竟然沒用!!!難道是我猜錯了?郁悶。。。
后來我把其中一個端口號改為9092就好了。

[root@hadoop ~]# cd /usr/local/kafka/config/
[root@hadoop config]# cp server.properties server0.properties 
[root@hadoop config]# cp server.properties server1.properties 
[root@hadoop config]# cp server.properties server2.properties 
[root@hadoop config]# vi server0.properties 
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs0
[root@hadoop config]# vi server1.properties 
broker.id=1
listeners=PLAINTEXT://:9093
log.dirs=/tmp/kafka-logs1
[root@hadoop config]# vi server2.properties 
broker.id=2
listeners=PLAINTEXT://:9094
log.dirs=/tmp/kafka-logs2

2.啟動服務器

啟動zookeeper

[root@hadoop kafka]# zookeeper-server-start.sh config/zookeeper.properties 
...

分別啟動3個kafka服務器(可以在每條命令之后加 & 使命令在后台運行)

[root@hadoop kafka]# kafka-server-start.sh config/server0.properties &
[root@hadoop kafka]# kafka-server-start.sh config/server1.properties &
[root@hadoop kafka]# kafka-server-start.sh config/server2.properties &

查看

[root@hadoop config]# jps #顯示啟動了zookeeper和3個kafka服務器
12161 QuorumPeerMain
13154 Kafka
15940 Jps
15609 Kafka
12828 Kafka

3.創建主題(3個副本)

注意:副本數必須小於等於broker數

[root@hadoop kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test02
Created topic "test02".

查看主題內容

[root@hadoop kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02
Topic:test02    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: test02    Partition: 0    Leader: 2    Replicas: 2,1,0    Isr: 2,1,0

還可以查看上一個主題內容

[root@hadoop kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test    PartitionCount:1    ReplicationFactor:1    Configs:
Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0

4.發布新消息給主題

[root@hadoop kafka]# netstat -ano | grep 909 #查看3個kafka server的端口號(9092,9093,9094)是否存在
tcp6       0      0 :::9092                 :::*                    LISTEN      off (0.00/0/0)
tcp6       0      0 :::9093                 :::*                    LISTEN      off (0.00/0/0)
tcp6       0      0 :::9094                 :::*                    LISTEN      off (0.00/0/0)
tcp6       0      0 192.168.42.133:49292    192.168.42.133:9093     ESTABLISHED keepalive (7058.77/0/0)
tcp6       0      0 192.168.42.133:50262    192.168.42.133:9094     ESTABLISHED keepalive (7075.16/0/0)
tcp6       0      0 192.168.42.133:9092     192.168.42.133:54622    ESTABLISHED keepalive (7058.77/0/0)
tcp6       0      0 192.168.42.133:9093     192.168.42.133:49292    ESTABLISHED keepalive (7058.77/0/0)
tcp6       0      0 192.168.42.133:9094     192.168.42.133:50262    ESTABLISHED keepalive (7075.16/0/0)
tcp6       0      0 192.168.42.133:50268    192.168.42.133:9094     ESTABLISHED keepalive (7157.08/0/0)
tcp6       0      0 192.168.42.133:54622    192.168.42.133:9092     ESTABLISHED keepalive (7058.77/0/0)
tcp6       0      0 192.168.42.133:9094     192.168.42.133:50264    ESTABLISHED keepalive (7075.16/0/0)
tcp6       0      0 192.168.42.133:9094     192.168.42.133:50268    ESTABLISHED keepalive (7157.08/0/0)
tcp6       0      0 192.168.42.133:50264    192.168.42.133:9094     ESTABLISHED keepalive (7075.16/0/0)

[root@hadoop ~]# kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test02
>hello kafka

5.消費消息

[root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test02 --from-beginning
hello kafka

注意:舊版本的kafka消費消息命令如下:

[root@hadoop kafka]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic test02 --from-beginning 
zookeeper is not a recognized option #新版本kafka移除了這個參數

百度后發現,從 0.9.0.0版本后,kafka的consumer配置作了很大的改變,不僅增加了很多配置,而且把原來的zookeeper相關配置統統取消了。感興趣的朋友可以查看官方文檔中新舊consumer配置的對比

6.容錯測試

在容錯性測試之前,可以先將生產者和消費者關掉。

a) 找到並殺死server2進程(broker 2是原來3個broker中的leader,現在我們殺死leader)

[root@hadoop kafka]# ps -ef|grep server2.properties #找到server2的進程號為13154
root 13154 12486 1 11:02 pts/4 00:00:33 ...信息太多,忽略.../server2.properties
root 16228 14977 0 11:56 pts/6 00:00:00 grep --color=auto server2.properties
[root@hadoop kafka]# kill
13154 #殺死進程
[root@hadoop kafka]# ps
-ef|grep server2.properties #再次查看,進程號沒有了 root 16239 14977 0 11:57 pts/6 00:00:00 grep --color=auto server2.properties
[root@hadoop kafka]# jps #只剩下2個kafka了
16240 Jps 12161 QuorumPeerMain 15609 Kafka 12828 Kafka

b) 查看主題描述

#主題還在,只是leader由原來的broker 2 變為了broker 1
[root@hadoop kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic test02
Topic:test02    PartitionCount:1    ReplicationFactor:3    Configs:
Topic: test02    Partition: 0    Leader: 1    Replicas: 2,1,0    Isr: 1,0

c) 啟動消費者消費主題消息

[root@hadoop kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic test02 --from-beginning
[2018-08-03 12:05:47,143] WARN [Consumer clientId=consumer-1, groupId=console-consumer-16224] Connection to node -3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
hello kafka

可以發現,消費者依然可以消費原來的消息。

當然,如果生產者重新發布新消息,消費者同樣也是可以消費的。

 

三 搭建多節點多broker的kafka集群

假設現在我們有3台服務器,分別為s101,s102,s103。

首先在3台服務器上先建立zookeeper完全分布式集群,步驟參考我以前寫的博客。

接着安裝kafka。

1.先在s101服務器上安裝kafka(步驟參考 一 搭建單節點單broker的kafka集群 第1步)

2.將 kafka + 環境變量 拷貝到s102,s103服務器上。

3.分別修改3個服務器下的server.properties文件

[root@hadoop config]# vi server.properties #s101修改配置如下
broker.id=101 #注意3個broker id不沖突即可
log.dirs=/usr/local/kafka/kafka-logs #自己設置一個目錄,但注意不要放在/tmp/臨時目錄下
zookeeper.connect=s101:2181,s102:2181,s103:2181 
[root@hadoop config]# vi server.properties #s102修改配置如下
broker.id=102 
log.dirs=/usr/local/kafka/kafka-logs 
zookeeper.connect=s101:2181,s102:2181,s103:2181 
[root@hadoop config]# vi server.properties #s103修改配置如下
broker.id=103 
log.dirs=/usr/local/kafka/kafka-logs 
zookeeper.connect=s101:2181,s102:2181,s103:2181 

4.創建log目錄 mkdir /usr/local/kafka/kafka-logs 

5.分別啟動3個zookeeper以及3個kafka server即可。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM