Kafka及集群部署


Kafka介紹

    官網:http://kafka.apache.org

    Kafka是一款性能非常好的並且支持分布式的消息隊列中間件。由於它的高吞吐特性,Kafka通常使用在大數據領域,如日志收集平台。其實Kafka是一個流處理平台,這個概念不太好理解,之所以叫做流,是因為它在工作中就像是一個可以支撐高吞吐量的管道,數據像水一樣流進去,然后另外一端再去讀取這些數據。我們就可以把Kafka看作是一種特殊的消息隊列中間件。

    kafka對消息保存時根據Topic進行歸類,發送消息者成為Producer,消息接受者成為Consumer,此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴於zookeeper來保證系統可用性集群保存一些meta信息。 kafka集群幾乎不需要維護任何consumer和producer狀態信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實現非常輕量級,它們可以隨意離開,而不會對集群造成額外的影響。

    Kafka與傳統消息系統相比,有以下不同:

1)它被設計為一個分布式系統,易於向外擴展;
2)它同時為發布和訂閱提供高吞吐量;
3)它支持多訂閱者,當失敗時能自動平衡消費者;
4)它將消息持久化到磁盤,因此可用於批量消費,例如ETL,以及實時應用程序。

1、Kafka中有幾個關鍵角色和概念

1)Producer

    消息生產者,是消息的產生源頭,負責生成消息並發送給Kafka。

    生產者創建消息。在其他發布與訂閱系統中,生產者可能被稱為發布者或寫入者。一般情況下,一個消息會被發布到一個特定的主題上。生產者在默認情況下把消息均衡地分布到主題的所有分區上,而並不關心特定消息會被寫到哪個分區。不過,在某些情況下,生產者會把消息直接寫到指定的分區。這通常是通過消息鍵和分區器來實現的,分區器為鍵生成一個散列值,並將其映射到指定的分區上。這樣可以保證包含同一個鍵的消息會被寫到同一個分區上。生產者也可以使用自定義的分區器,根據不同的業務規則將消息映射到分區。

2)Consumer

    消息消費者,是消息的使用方,負責消費Kafka服務器上的消息。

    消費者讀取消息。在其他發布與訂閱系統中,消費者可能被稱為訂閱者或讀者。消費者訂閱一個或多個主題,並按照消息生成的順序讀取它們。消費者通過檢查消息的偏移量來區分已經讀取過的消息。偏移量是另一種元數據,它是一個不斷遞增的整數值,在創建消息時,Kafka 會把它添加到消息里。在給定的分區里,每個消息的偏移量都是唯一的。消費者把每個分區最后讀取的消息偏移量保存在 Zookeeper 或 Kafka 上,如果消費者關閉或重啟,它的讀取狀態不會丟失。

    消費者是消費者群組的一部分,也就是說,會有一個或多個消費者共同讀取一個主題。群組保證每個分區只能被一個消費者使用。

3)Topic

    主題,由用戶自定義,並配置在Kafka服務器,用於建立生產者和消費者之間的訂閱關系,生產者將消息發送到指定的Topic,然后消費者再從該Topic下去取消息。

     一個Topic可以認為是一類消息,每個topic將被分成多個partition(區),每個partition在存儲層面是append log文件。任何發布到此partition的消息都會被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個long型數字,它是唯一標記一條消息。它唯一的標記一條消息。kafka並沒有提供其他額外的索引機制來存儲offset,因為在kafka中幾乎不允許對消息進行“隨機讀寫”。

4)Partition

消息分區,一個Topic下面會有多個Partition,每個Partition都是一個有序隊列,Partition中的每條消息都會被分配一個有序的id。

     物理上的分區,topic中的數據分割為一個或多個partition。每個topic至少有一個partition。每個partition中的數據使用多個segment文件存儲。partition中的數據是有序的,partition間的數據丟失了數據的順序。如果topic有多個partition,消費數據時就不能保證數據的順序。在需要嚴格保證消息的消費順序的場景下,需要將partition數目設為1。

    Kafka 通過分區來實現數據冗余和伸縮性。分區可以分布在不同的服務器上

5)Broker

    這個其實就是Kafka服務器了,無論是單台Kafka還是集群,被統一叫做Broker。

    一個獨立的 Kafka 服務器被稱為 broker。broker 接收來自生產者的消息,為消息設置偏移量,並提交消息到磁盤保存。broker 為消費者提供服務,對讀取分區的請求作出響應,返回已經提交到磁盤上的消息。根據特定的硬件及其性能特征,單個 broker 可以輕松處理數千個分區以及每秒百萬級的消息量。

  • Kafka 集群包含一個或多個服務器,服務器節點稱為broker。
  • broker存儲topic的數據。如果某topic有N個partition,集群有N個broker,那么每個broker存儲該topic的一個partition。
  • 如果某topic有N個partition,集群有(N+M)個broker,那么其中有N個broker存儲該topic的一個partition,剩下的M個broker不存儲該topic的partition數據。
  • 如果某topic有N個partition,集群中broker數目少於N個,那么一個broker存儲該topic的一個或多個partition。在實際生產環境中,盡量避免這種情況的發生,這種情況容易導致Kafka集群數據不均衡。

    broker 是集群的組成部分。每個集群都有一個 broker 同時充當了集群控制器的角色(自動從集群的活躍成員中選舉出來)。控制器負責管理工作,包括將分區分配給 broker 和監控broker。在集群中,一個分區從屬於一個 broker,該 broker 被稱為分區的首領。一個分區可以分配給多個 broker,這個時候會發生分區復制(見圖 )。這種復制機制為分區提供了消息冗余,如果有一個 broker 失效,其他 broker 可以接管領導權。不過,相關的消費者和生產者都要重新連接到新的首領。

6)Group

     消費者分組,將同一類的消費者歸類到一個組里。在Kafka中,多個消費者共同消費一個Topic下的消息,每個消費者消費其中的部分消息,這些消費者就組成了一個分組,擁有同一個組名。

kafka通過zookeeper管理集群配置,選舉leader

2、kafka特點

1)Kafka:內存、磁盤、數據庫、支持大量堆積

    kafka的最小存儲單元是分區,一個topic包含多個分區,kafka創建主題時,這些分區會被分配在多個服務器上,通常一個broker一台服務器。

    分區首領會均勻地分布在不同的服務器上,分區副本也會均勻的分布在不同的服務器上,確保負載均衡和高可用性,當新的broker加入集群的時候,部分副本會被移動到新的broker上。

    根據配置文件中的目錄清單,kafka會把新的分區分配給目錄清單里分區數最少的目錄。

    默認情況下,分區器使用輪詢算法把消息均衡地分布在同一個主題的不同分區中,對於發送時指定了key的情況,會根據key的hashcode取模后的值存到對應的分區中。

2)Kafka:支持負載均衡
a)一個broker通常就是一台服務器節點。對於同一個Topic的不同分區,Kafka會盡力將這些分區分布到不同的Broker服務器上,zookeeper保存了broker、主題和分區的元數據信息。分區首領會處理來自客戶端的生產請求,kafka分區首領會被分配到不同的broker服務器上,讓不同的broker服務器共同分擔任務。
     每一個broker都緩存了元數據信息,客戶端可以從任意一個broker獲取元數據信息並緩存起來,根據元數據信息知道要往哪里發送請求。
b)kafka的消費者組訂閱同一個topic,會盡可能地使得每一個消費者分配到相同數量的分區,分攤負載。
c)當消費者加入或者退出消費者組的時候,還會觸發再均衡,為每一個消費者重新分配分區,分攤負載。
   kafka的負載均衡大部分是自動完成的,分區的創建也是kafka完成的,隱藏了很多細節,避免了繁瑣的配置和人為疏忽造成的負載問題。
d)發送端由topic和key來決定消息發往哪個分區,如果key為null,那么會使用輪詢算法將消息均衡地發送到同一個topic的不同分區中。如果key不為null,那么會根據key的hashcode取模計算出要發往的分區。
3)集群方式, 天然的‘Leader-Slave’無狀態集群,每台服務器既是Master也是Slave
    分區首領均勻地分布在不同的kafka服務器上,分區副本也均勻地分布在不同的kafka服務器上,所以每一台kafka服務器既含有分區首領,同時又含有分區副本,每一台kafka服務器是某一台kafka服務器的Slave,同時也是某一台kafka服務器的leader。
     kafka的集群依賴於zookeeper,zookeeper支持熱擴展,所有的broker、消費者、分區都可以動態加入移除,而無需關閉服務,與不依靠zookeeper集群的mq相比,這是最大的優勢。

Kafka工作流程

1)生產者定期向主題發送消息。

2)Kafka broker將所有消息存儲在為該特定主題配置的分區中。它確保消息在分區之間平等共享。如果生產者發送兩個消息,並且有兩個分區,則Kafka將在第一個分區中存儲一個消息,在第二個分區中存儲第二個消息。

3)消費者訂閱一個特定的主題。

4)一旦消費者訂閱了一個主題,Kafka將向消費者提供該主題的當前偏移量,並將偏移量保存在ZooKeeper中。

5)消費者將定期請求Kafka新消息。

6)一旦Kafka收到來自生產者的消息,它會將這些消息轉發給消費者。

7)消費者將收到消息並處理它。

8)一旦消息被處理,消費者將向Kafka broker發送確認。

9)一旦Kafka收到確認,它會將偏移量更改為新值,並在ZooKeeper中進行更新。由於ZooKeeper中保留了偏移量,因此即使在服務器出現故障時,消費者也可以正確讀取下一條消息。

kafka集群部署:

(1)Kafka架構是由producer(消息生產者)、consumer(消息消費者)、borker(kafka集群的server,負責處理消息讀、寫請求,存儲消息,在kafka cluster這一層這里,其實里面是有很多個broker)、topic(消息隊列/分類相當於隊列,里面有生產者和消費者模型)、zookeeper(元數據信息存在zookeeper中,包括:存儲消費偏移量,topic話題信息,partition信息) 這些部分組成。

(2)kafka里面的消息是有topic來組織的,簡單的我們可以想象為一個隊列,一個隊列就是一個topic,然后它把每個topic又分為很多個partition,這個是為了做並行的,在每個partition內部消息強有序,相當於有序的隊列,其中每個消息都有個序號offset,比如0到12,從前面讀往后面寫。一個partition對應一個broker,一個broker可以管多個partition,比如說,topic有6個partition,有兩個broker,那每個broker就管3個partition。這個partition可以很簡單想象為一個文件,當數據發過來的時候它就往這個partition上面append,追加就行,消息不經過內存緩沖,直接寫入文件,kafka和很多消息系統不一樣,很多消息系統是消費完了我就把它刪掉,而kafka是根據時間策略刪除,而不是消費完就刪除,在kafka里面沒有一個消費完這么個概念,只有過期這樣一個概念。

(3)producer自己決定往哪個partition里面去寫,這里有一些的策略,譬如如果hash,不用多個partition之間去join數據了。consumer自己維護消費到哪個offset,每個consumer都有對應的group,group內是queue消費模型(各個consumer消費不同的partition,因此一個消息在group內只消費一次),group間是publish-subscribe消費模型,各個group各自獨立消費,互不影響,因此一個消息在被每個group消費一次。

kafka的集群安裝配置:

   1)kafka集群的安裝配置依賴zookeeper,搭建kafka集群之前,需先部署好一個可用的zookeeper集群

   2)需安裝openjdk運行環境

   3)同步kafka拷貝到所有集群主機

   4)修改配置文件

   5)每台服務器的broker.id都不能相同

   6)zookeeper.connect集群地址,不用都列出,寫一部分即可

 zookeeper集群也可通過kafka中自帶的zookeeper來部署,修改自帶zookeeper配置文件可與zookeeper的配置文件一致

啟動:

[root@ecs-1d01 kafka_2.10-0.10.2.1]#nohup  /data/kafka_2.10-0.10.2.1/bin/zookeeper-server-start.sh  /data/kafka_2.10-0.10.2.1/config/zookeeper.properties > /dev/null 2>&1  &

部署

部署環境:

操作系統                    IP                 kafka版本

rhel6.5              192.168.1.234             2.1.1

rhel6.5              192.168.1.206             2.1.1

rhel6.5              192.168.1.45              2.1.1

1、創建用戶,全部下載kafka

[root@kafka-0001]$useradd  wushaoyu

[root@kafka-0001]$su -  wushaoyu

[wushaoyu@kafka-0001]$ wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.11-2.1.1.tgz

2、創建消息目錄,修改配置文件

[wushaoyu@kafka-0001 ~]$ cd kafka_2.11-2.1.1
[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ mkdir logs
[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ cd config/
[wushaoyu@kafka-0001 config]$ cat server.properties |egrep -v "^$|^#"
broker.id=1                                         #broker的全局唯一編號不能重復,建議與zookeeper的myid對應
listeners=PLAINTEXT://192.168.1.234:9092 #broker監聽ip和端口
num.network.threads
=3 #borker進行網絡處理的線程數 num.io.threads=8 #borker進行I/O處理的線程數 socket.send.buffer.bytes=102400 #發送緩沖區大小,即發送消息先發送到緩沖區,當緩沖區滿了在一起發出去 socket.receive.buffer.bytes=102400 #接收緩沖區大小,接收消息先放到接收緩沖區,當達到這個數量時同步到磁盤 socket.request.max.bytes=104857600 #向kafka套接字請求的最大字節數量,防止服務器outofmemory,大小最好不要超過java的堆棧大小 log.dirs=/home/wushaoyu/kafka_2.11-2.1.1/logs #消息存放目錄,不是日志目錄 num.partitions=1 #每個topic的默認分區數 num.recovery.threads.per.data.dir=1 #處理消息目錄的線程數,若設置了3個消息路徑,改參數為2,那么一共需要6個線程 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 #消息過期時間,默認為1周 log.segment.bytes=1073741824 #日志文件中每個segment的大小,默認為1G,topic的分區是以一堆segment文件存儲的,超過此限制會建立一個新的日志文件。此參數若在創建topic時的指定,那么參數覆蓋,以指定的為准 log.retention.check.interval.ms=300000 #如上設置了每個segment文件大小為1G,那么此時間間隔就是檢查他的大小有沒有達到1G,檢查的時間間隔 zookeeper.connect=192.168.1.234:2181,192.168.1.206:2181,192.168.1.45:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0

配置文件:

log.dirs:Kafka 把所有消息都保存在磁盤上,存放這些日志片段的目錄是通過 log.dirs 指定的。它是一組用逗號分隔的本地文件系統路徑。如果指定了多個路徑,那么 broker 會根據“最少使用”原則,把同一個分區的日志片段保存到同一個路徑下。要注意,broker 會往擁有最少數目分區的路徑新增分區,而不是往擁有最小磁盤空間的路徑新增分區。

num.recovery.threads.per.data.dir:

對於如下 3 種情況,Kafka 會使用可配置的線程池來處理日志片段:
• 服務器正常啟動,用於打開每個分區的日志片段;
• 服務器崩潰后重啟,用於檢查和截短每個分區的日志片段;
• 服務器正常關閉,用於關閉日志片段

    默認情況下,每個日志目錄只使用一個線程。因為這些線程只是在服務器啟動和關閉時會用到,所以完全可以設置大量的線程來達到並行操作的目的。特別是對於包含大量分區的

服務器來說,一旦發生崩潰,在進行恢復時使用並行操作可能會省下數小時的時間。設置此參數時需要注意,所配置的數字對應的是 log.dirs 指定的單個日志目錄。也就是說,如

果 num.recovery.threads.per.data.dir 被設為 8,並且 log.dir 指定了 3 個路徑,那么總共需要 24 個線程。

3、啟動kafka

[wushaoyu@kafka-0002 kafka_2.11-2.1.1]$ ./bin/kafka-server-start.sh ./config/server.properties 
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.      
# Native memory allocation (mmap) failed to map 1073741824 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/wushaoyu/kafka_2.11-2.1.1/hs_err_pid23272.log
此處出現報錯:無法分配足夠的內存,因為部署環境為雲主機,只有1G內存,所以可添加交換分區解決 查看內存大小 [wushaoyu@kafka
-0002 kafka_2.11-2.1.1]$ free -m total used free shared buffers cached Mem: 995 920 75 0 80 641 -/+ buffers/cache: 198 797 Swap: 0 0 0 創建交換分區 [root@kafka-0001 ~]# dd if=/dev/zero of=/tmp/swap bs=1M count=8192 #創建文件,大小為8G 8192+0 records in 8192+0 records out 8589934592 bytes (8.6 GB) copied, 52.9978 s, 162 MB/s [root@kafka-0001 ~]# mkswap /tmp/swap #創建交換分區 mkswap: /tmp/swap: warning: don't erase bootbits sectors on whole disk. Use -f to force. Setting up swapspace version 1, size = 8388604 KiB no label, UUID=84ea82c7-35a3-46be-926a-73dfc7e18548 [root@kafka-0001 ~]# swapon /tmp/swap #啟用交換分區 [root@kafka-0001 ~]# free -m total used free shared buffers cached Mem: 995 928 67 0 22 719 -/+ buffers/cache: 186 809 Swap: 8191 0 8191 再次啟動 [wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-server-start.sh ./config/server.properties & (默認在前台運行) 或[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-server-start.sh -daemon ./config/server.properties

 Zookeeper+Kafka集群驗證及消息發布

創建主題

創建一個topic,3個分區,3個副本
[root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-topics.sh --create --zookeeper 192.168.1.160:2181,192.168.1.200:2181,192.168.1.235:2181 -replication-factor 3 --partitions 3 --topic test1
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic "test1"

查看topic,確認topic創建成功
[root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-topics.sh --list --zookeeper 192.168.1.160:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
test1

查看topic,詳細信息
[root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-topics.sh --describe --zookeeper 192.168.1.160:2181 --topic test1
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
可以描述為:Topic分區數/副本數/副本Leader/副本ISR等信息: 
Topic:test1 PartitionCount:
3 ReplicationFactor:3 Configs: Topic: test1 Partition: 0 Leader: 0 Replicas: 0,3,2 Isr: 0,3,2 Topic: test1 Partition: 1 Leader: 2 Replicas: 2,0,3 Isr: 2,0,3 Topic: test1 Partition: 2 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0

“leader”:該節點負責該分區的所有的讀和寫,每個節點的leader都是隨機選擇的。
“replicas”:備份的節點列表,無論該節點是否是leader或者目前是否還活着,只是顯示。
“isr”:同步備份”的節點列表,也就是活着的節點並且正在同步leader
其中Replicas和Isr中的1,2,0就對應着3個broker他們的broker.id屬性!

發布信息

模擬生產者,發布消息
[root@ecs-1d01 kafka_2.10-0.10.2.1]# ./bin/kafka-console-producer.sh --broker-list 192.168.1.160:9092 --topic test1 OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N hello,world i love you 模擬消費者,接收消息 [root@ecs-a0d8-0002 kafka_2.10-0.10.2.1]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092 --topic test1 --from-beginning OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N k;lkl; love you hello,world hkhj i love you

   #在 producer 里輸入消息,consumer 中就會顯示出同樣的內容,表示消費成功   

   # --from-beginning表示從開始接收,否則只接收新產生的消息      

至此,消息生產和消費沒有問題,Kafka集群部署完成。

https://blog.csdn.net/lkforce/article/details/77776684

https://www.cnblogs.com/caoweixiong/p/11060533.html


Kafka常用命令

1)查看topic

[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-topics.sh --list --zookeeper 192.168.1.234:2181
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
__consumer_offsets
mymsg

2)查看topic msmsg詳情

[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-topics.sh --describe --zookeeper 192.168.1.234:2181 --topic mymsg
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Topic:mymsg    PartitionCount:1    ReplicationFactor:2    Configs:
    Topic: mymsg    Partition: 0    Leader: 1    Replicas: 1,3    Isr: 1,3

3)刪除topic

[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-topics.sh --delete --zookeeper 192.168.1.234:2181 --topic mymsg

4)生產者參數查看

[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-console-producer.sh

5)生成者參數查看

[wushaoyu@kafka-0001 kafka_2.11-2.1.1]$ ./bin/kafka-console-consumer.sh

kafka集群管理工具kafka-manager

    為了簡化開發者和服務工程師維護Kafka集群的工作,yahoo構建了一個叫做Kafka管理器的基於Web工具,叫做 Kafka Manager。kafka-manager 項目地址:https://github.com/yahoo/kafka-manager。這個管理工具可以很容易地發現分布在集群中的哪些topic分布不均勻,或者是分區在整個集群分布不均勻的的情況。它支持管理多個集群、選擇副本、副本重新分配以及創建Topic。這個管理工具也是一個非常好的可以快速瀏覽這個集群的工具,kafka-manager有如下功能:

- 管理多個kafka集群
- 便捷的檢查kafka集群狀態(topics,brokers,備份分布情況,分區分布情況)
- 選擇你要運行的副本
- 基於當前分區狀況進行
- 可以選擇topic配置並創建topic(0.8.1.1和0.8.2的配置不同)
- 刪除topic(只支持0.8.2以上的版本並且要在broker配置中設置delete.topic.enable=true)
- Topic list會指明哪些topic被刪除(在0.8.2以上版本適用)
- 為已存在的topic增加分區
- 為已存在的topic更新配置
- 在多個topic上批量重分區
- 在多個topic上批量重分區(可選partition broker位置)

Githua下載地址:https://github.com/yahoo/kafka-manager/releases

1、部署kafka-manager

[root@ecs-1d01 data]# cd kafka-manager-1.3.3.23/

[root@ecs-1d01 kafka-manager-1.3.3.23]# cd conf/

[root@ecs-1d01 conf]# vim application.conf

#kafka-manager.zkhosts="localhost:2181"

kafka-manager.zkhosts="192.168.1.160:2181,192.168.1.235:200:2181,192.168.1.235:2181"   #把zookeeper都寫進去

2、啟動kafka-manager

[root@ecs-1d01 kafka-manager-1.3.3.23]# nohup ./bin/kafka-manager  -Dhttp.port=9107   > /dev/null 2>&1 &  //默認端口為9000,可自定義指定

[root@ecs-1d01 kafka-manager-1.3.3.23]# ss -tnulp|grep 9107
tcp LISTEN 0 50 :::9107 :::* users:(("java",pid=28816,fd=116))

3、訪問驗證

4、kafka-mamager測試

     如果沒有在 Kafka 中配置過 JMX_PORT,不要選擇第一個復選框。Enable JMX Polling如果選擇了該復選框,Kafka-manager 可能會無法啟動

  可根據實際情況,添加集群中的zookeeper真實數量

   其他broker的配置可以根據自己需要進行配置,默認情況下,點擊【保存】時,會提示幾個默認值為1的配置錯誤,需要配置為>=2的值。

   保存,創建完成

5、查看TOPIC 信息

6、查看broker信息

7、管理kafka-manager

1)新建主題
點擊【Topic】>【Create】可以方便的創建並配置主題。如下顯示。

   由於集群中只有1個節點,所以副本數最多設置為1

2)查看主題

針對Topic->Create新建主題的配置,根據一張圖事宜:

    在上圖一個Kafka集群中,有兩個服務器,每個服務器上都有2個分區。P0,P3可能屬於同一個主題,也可能是兩個不同的主題。
    如果設置的Partitons和Replication Factor都是2,這種情況下該主題的分步就和上圖中Kafka集群顯示的相同,此時P0,P3是同一個主題的兩個分區。P1,P2也是同一個主題的兩個分區,Server1和Server2其中一個會作為Leader進行讀寫操作,另一個通過復制進行同步。
    如果設置的Partitons和Replication Factor都是1,這時只會根據算法在某個Server上創建一個分區,可以是P0~4中的某一個(分區都是新建的,不是先存在4個然后從中取1個)。

kafka參考資料:

http://kafka.apache.org/21/documentation.html

https://www.jianshu.com/p/d3e963ff8b70

https://blog.51cto.com/littledevil/2134694?source=dra

https://www.cnblogs.com/saneri/p/8762168.html

https://www.cnblogs.com/kevingrace/p/9021508.html

https://blog.csdn.net/qq_34834325/article/details/78743490


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM