kafka的安裝和使用;kafka常用操作命令


kafka:基於發布/訂閱的分布式消息系統、數據管道;最初用來記錄活動數據--包括頁面訪問量(Page View)、被查看內容方面的信息以及搜索情況等內容和運營數據--服務器的性能數據(CPU、IO使用率、請求時間、服務日志等等數據)。Kafka是一個分布式流數據系統,使用Zookeeper進行集群的管理。kafka自己做為Broker Server

  • scala編寫
  • 水平擴展
  • 高吞吐率

 

1、安裝:

  • 下載:去官網 https://kafka.apache.org/downloads 下載二機制版本 https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.0/kafka_2.12-1.1.0.tgz

直接網頁下載或者命令行下載 wget -c http://mirrors.shu.edu.cn/apache/kafka/1.1.0/kafka_2.12-1.1.0.tgz

  • 解壓:把二機制包放到某個linux centos機器下解壓tar -zxvf kafka_2.12-1.1.0.tgz
  • 啟動zk:kafka依賴zookeeper,需要先啟動zookeeper。安裝包自帶zookeeper,可以直接啟動,如果已經單獨安裝zookeeper的話,就不需要重啟啟動,如果需要再啟動一個zookeeper,修改下zookeeper的配置文件,修改端口,不要造成端口沖突,zk的默認端口是2181。 啟動命令:
    bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  • 啟動kafka:需要先修改kafka配置文件:修改 kafka-server 的配置文件 config/server.properties,修改其中的broker.id和log位置。然后啟動kafak服務。啟動命令:bin/kafka-server-start.sh config/server.properties ,kafka默認端口9092。后台啟動:bin/kafka-server-start.sh  -daemon config/server.properties。配置broker.id必須是一個整數,且不可以和其他的kafak代理節點的值重復,即每個kafka節點為一個broker,擁有唯一的id值。
broker.id=1
log.dir=/data/kafka/logs-1
  • 單機多BROKER 集群配置:新生成幾個配置文件config/server1.properties、config/server2.properties。然后,修改里面的配置文件broker.id、 log.dir、  listeners分別指定不同的broker、日志文件位置、監聽端口。然后分別使用這些配置文件啟動即可。

2、使用:

  • 創建 TOPIC:使用 kafka-topics.sh 創建單分區單副本的 topic test:

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

  查看 topic 列表:

bin/kafka-topics.sh --list --zookeeper localhost:2181

 

  • 產生消息:使用 kafka-console-producer.sh 發送消息:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  向test發送一條包含key的消息:echo '00000,{"name":"Steve", "title":"Captain America"}' | kafka-console-producer.sh --broker-list localhost:9092  --topic test --property parse.key=true --property key.separator=,
  向test發送一條消息:  echo "The first record" | kafka-console-producer.sh --broker-list localhost:9092 --topic test

 

 

  • 消費消息:使用 kafka-console-consumer.sh 接收消息並在終端打印:

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    或者
    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
       將消息的key也輸出
     
       kafka-console-consumer.sh --zookeeper localhost:2181 --topic test  --from-beginning  --property print.key= true  --property key.separator=,



     
  • 查看consumer group列表,使用--list參數:bin/kafka-consumer-groups.sh --bootstrap-server host:9092 --list 
    查看特定consumer group 詳情,使用--group與--describe參數
    bin/kafka-consumer-groups.sh  --bootstrap-server HOST:9292 --group YOUR_GROUP_ID --describe
  • 看指定topic上每個partition的offset,是用於查看指定topic上相應分區的消息數,並不是consumer消費的偏移量
    bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list host:9092 --topic topic
  • 查看描述 TOPICS 信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

3、錯誤處理:Caused by: java.net.UnknownHostException: hostname: unknown error

在/etc/hosts里添加下hostname和127.0.0.1的映射就可以了:比如 127.0.0.1  localhost

 

啟動時錯誤:[2018-05-14 21:40:33,305] ERROR There was an error in one of the threads during logs loading: kafka.common.KafkaException: Found directory /home/admin/kafka_2.12-1.1.0/bin, 'bin' is not in the form of topic-partition or topic-partition.uniqueId-delete (if marked for deletion).

錯誤原因:server.properties里的配置log.dir路徑不對

 

4、認證和加密

使用ssl加密和認證,這樣使用kafka的時候,必須使用證書進行連接

https://blog.csdn.net/Regan_Hoo/article/details/78770058

https://apereo.github.io/cas/4.0.x/installation/JAAS-Authentication.html#jaas-configuration-file

 

python客戶端使用方法

1、首先安裝客戶端:pip install kafka-python [--user]  :https://github.com/dpkp/kafka-python

2、注意mac和部分linux使用kafka-pyhton,需要指定api_version,參考:https://github.com/dpkp/kafka-python/issues/1308 和 https://github.com/dpkp/kafka-python/pull/1411,正確用法:api_version可以先不指定,不行的話再指定

#! /usr/bin/env python

import time
from kafka import KafkaProducer

producer=KafkaProducer(bootstrap_servers="10.5.9.6:9092")
i=0
while True:
    ts = int(time.time()*1000)
    producer.send("test",value=str(i),key=str(i),timestamp_ms=ts,partition= 0)

    #第1個參數為 topic名稱,必須指定

    #key : 鍵,必須是字節字符串,可以不指定(但key和value必須指定1個),默認為None

    #value : 值,必須是字節字符串,可以不指定(但key和value必須指定1個),默認為None

    #partition : 指定發送的partition,由於kafka默認配置1個partition,固為0


    producer.flush()
    print i
    i=i+1
    time.sleep(1)

 

#! /usr/bin/env python
from kafka import KafkaConsumer

consumer=KafkaConsumer("test",bootstrap_servers=["10.5.9.6:9092"])
# consumer=KafkaConsumer("test",group_id='test_group',bootstrap_servers=["10.5.9.6:9092"],consumer_timeout_ms=1000) # 為topic:test創建group:test_group
# topic可以寫到KafkaConsumer的參數里,也可以是下面的寫法。group_id:指定此消費者實例屬於的組名;若不指定 consumer_timeout_ms,默認一直循環等待接收,若指定,則超時返回,不再等待.consumer_timeout_ms : 毫秒數
# consumer.subscribe(pattern= '^my.*') 使用正則表達式訂閱多個topic
# consumer.subscribe(topics= ['my_topic', 'topic_1']) 訂閱多個topic
for message in consumer:
    print message

 

注意config/config/server.properties,一定要設置為本機的大網IP,不然其他機器的producer和consumer無法訪問這個機器的broker,參見參考4的說明

 

 

Kafka架構

術語:

  • Broker

    Kafka集群包含一個或多個服務器,這種服務器被稱為broker。可以在一個機器上部署多個broker,也可以在不同的機器上部署多個broker。物理概念,指服務於Kafka的一個node。

  • Topic:可以理解為是一個queue序列

    每條發布到Kafka集群的消息都有一個類別,這個類別被稱為Topic。(物理上不同Topic 的 消息分開存儲,邏輯上一個Topic的消息雖然保存於一個或多個broker上但用戶只需指定消息的Topic即可生產或消費數據而不必關心數據存於何處)

  • Partition

    Parition是物理上的概念,每個Topic包含一個或多個Partition。是Kafka下數據存儲的基本單元。同一個topic的數據,會被分散的存儲到多個partition中,這些partition可以在同一台機器上,也可以是在多台機器上。為了做到均勻分布,通常partition的數量通常是Broker Server數量的整數倍。

  • Producer

    負責發布消息到Kafka broker

  • Consumer

    消息消費者,向Kafka broker讀取消息的客戶端。

  • Consumer Group

    每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則會創建默認的group)。同樣是邏輯上的概念,是Kafka實現單播和廣播兩種消息模型的手段。同一個topic的數據,會廣播給不同的group;同一個group中的worker,只有一個worker能拿到這個數據。換句話說,對於同一個topic,每個group都可以拿到同樣的所有數據,但是數據進入group后只能被其中的一個worker消費。group內的worker可以使用多線程或多進程來實現,也可以將進程分散在多台機器上,worker的數量通常不超過partition的數量,且二者最好保持整數倍關系,因為Kafka在設計時假定了一個partition只能被一個worker消費(同一group內)。

    為了便於實現MQ中的多播,重復消費等引入的概念。如果ConsumerA以及ConsumerB同在一個UserGroup,那么ConsumerA消費的數據ConsumerB就無法消費了。

    即:所有usergroup中的consumer使用一套offset。



  • 什么是消費者組(Consumer Group)

    consumer group是kafka提供的可擴展且具有容錯性的消費者機制。既然是一個組,那么組內必然可以有多個消費者或消費者實例(consumer instance),它們共享一個公共的ID,即group ID。組內的所有消費者協調在一起來消費訂閱主題(subscribed topics)的所有分區(partition)。當然,每個分區只能由同一個消費組內的一個consumer來消費。理解consumer group記住下面這三個特性就好了:

    1)consumer group下可以有一個或多個consumer instance,consumer instance可以是一個進程,也可以是一個線程
    2)group.id是一個字符串,唯一標識一個consumer group
    3)consumer group下訂閱的topic下的每個分區只能分配給某個group下的一個consumer(當然該分區還可以被分配給其他group)

  • Offset

    Offset專指Partition以及User Group而言,記錄某個user group在某個partiton中當前已經消費到達的位置。

  • 查看group信息:test1234、test_group是主動創建的

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list 

  • group如何創建?cousumer消費時,為topic指定group名字,就會創建相應的group
  • 一個topic可以同屬於多個group,topic會廣播給所有包含它的group
  • 一個group可以包含多個topic,這個group會收到所有它包含的topic的信息
  • 屬於同一個group的多個客戶端(或者線程、進程),其中只有一個可以收到相同topic的信息,其他收不到
  • 默認從上次的最后一次消費繼續消費(這樣保證不會重復消費),也可以通過設置從頭開始消費
  • 一個topic中partition的數量,就是每個user group中消費該topic的最大並行度數量。

 

Python Kafka的幾個客戶端對比基准測試:https://www.ctolib.com/topics-103354.html

 

kafka-python文檔地址:https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html

使用參考:https://zhuanlan.zhihu.com/p/38330574

更多操作kafka的python三方包比較參考:https://github.com/muscledreamer/Kafka_Demo

 

查看Topic的分區和副本情況
命令:

bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181  --topic test0

運行結果:

Topic:test0   PartitionCount:16       ReplicationFactor:3     Configs:

       Topic: test0  Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 1,0,2

       Topic: test0  Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,0,2

       Topic: test0  Partition: 2    Leader: 2       Replicas: 2,0,1 Isr: 1,0,2

       Topic: test0  Partition: 3    Leader: 1       Replicas: 1,2,0 Isr: 1,0,2

       Topic: test0  Partition: 4    Leader: 2       Replicas: 2,0,1 Isr: 1,0,2

       Topic: test0  Partition: 5    Leader: 0       Replicas: 0,1,2 Isr: 1,0,2

       Topic: test0  Partition: 6    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

       Topic: test0  Partition: 7    Leader: 2       Replicas: 2,1,0 Isr: 1,0,2

       Topic: test0  Partition: 8    Leader: 2      Replicas: 2,0,1 Isr: 0,1,2

       Topic: test0  Partition: 9    Leader: 0       Replicas: 0,2,1 Isr: 0,1,2

       Topic: test0  Partition: 10   Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

       Topic: test0  Partition: 11   Leader: 2       Replicas: 2,1,0 Isr: 1,0,2

       Topic: test0  Partition: 12   Leader: 0       Replicas: 0,2,1 Isr: 0,1,2

       Topic: test0  Partition: 13   Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

       Topic: test0  Partition: 14   Leader: 2       Replicas: 2,1,0 Isr: 1,0,2

       Topic: test0  Partition: 15   Leader: 0       Replicas: 0,1,2 Isr: 0,1,2

 

結果分析:

第一行顯示partitions的概況,列出了Topic名字,partition總數,存儲這些partition的broker數

以下每一行都是其中一個partition的詳細信息:

leader

是該partitons所在的所有broker中擔任leader的broker id,每個broker都有可能成為leader

replicas

顯示該partiton所有副本所在的broker列表,包括leader,不管該broker是否是存活,不管是否和leader保持了同步。

isr

in-sync replicas的簡寫,表示存活且副本都已同步的的broker集合,是replicas的子集

舉例:

比如上面結果的第一行:Topic: test0  Partition:0    Leader: 0       Replicas: 0,2,1 Isr: 1,0,2

Partition: 0

該partition編號是0

Replicas: 0,2,1

代表partition0 在broker0,broker1,broker2上保存了副本

Isr: 1,0,2

代表broker0,broker1,broker2都存活而且目前都和leader保持同步

Leader: 0

代表保存在broker0,broker1,broker2上的這三個副本中,leader是broker0

leader負責讀寫,broker1、broker2負責從broker0同步信息,平時沒他倆什么事

當producer發送一個消息時,producer自己會判斷發送到哪個partiton上,如果發到了partition0上,消息會發到leader,也就是broker0上,broker0處理這個消息,broker1、broker2從broker0同步這個消息

如果這個broker0掛了,那么kafka會在Isr列表里剩下的broker1、broker2中選一個新的leader

Kafka入門之六:Kafka的Consumer實驗:https://blog.yaodataking.com/2016/11/13/kafka-6/

 

參考:

1、http://www.54tianzhisheng.cn/2018/01/04/Kafka/

2、http://www.infoq.com/cn/articles/kafka-analysis-part-1

3、http://www.infoq.com/cn/profile/%E9%83%AD%E4%BF%8A

4、https://blog.csdn.net/cysdxy/article/details/52337364

5、https://www.jianshu.com/p/51a6789b9d39

6、https://www.jianshu.com/p/ede62642a438

7、http://windrocblog.sinaapp.com/?p=1860

8、https://blog.csdn.net/weixin_40596016/article/details/79562023

9、http://alexstocks.github.io/html/kafka.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM