kafka集群部署及使用記錄


 

 

一、 簡介

Kafkascala語言編寫,由Linkedin公司2010年貢獻給Apache成為一個開源的消息系統,它主要用於處理活躍的流式數據。遵從一般的MQ結構。Kafka對消息保存時根據Topic進行歸類,此外kafka集群有多個kafka實例組成,每個實例(server)稱為broker。Kafka依賴於zookeeper集群保存一些meta信息,來保證系統可用性。

注意:官方在Kafka 3.0版本中宣布下一次的升級將棄用java8,選擇 Java 11 作為最低支持版本。

 

二、 應用場景

 

 

 

 

三、 消息傳輸流程

 

 

 

 

 

 

kafka集群的負載均衡:

 

 

 

1.一partition只能被一個消費者消費,一個消費者可以消費多個partition,所以如果設置的partition數量小於consumer的數量,就會導致空閑的consumer沒有消費,所以partition的數量一定要大於或等於consumer的數量。

( 注意:topic有多個分區,才能實現多個consumer消費一個topic。)

 

2.多副本冗余的高可用機制,比如Partition1有一個副本是Leader,另外一個副本是Follower,Leader和Follower兩個副本是分布在不同機器上的。

四、 API

  1. 基礎API
  2. Producer :消息生產者,決定消息發送到指定Topic的哪個分區上。

 

  1. Consumer :消息消費者,向 Kafka broker 取消息的客戶端;

 

  1. Consumer Group(CG):消費者組,由多個 consumer 組成。消費者組內每個消費者負責消費不同分區的數據,一個分區只能由一個組內消費者消費;消費者組之間互不影響。所有的消費者都屬於某個消費者組,即消費者組是邏輯上的一個訂閱者。

 

  1. Broker :經紀人,一台Kafka服務器就是一個broker。一個集群由多個broker 組成。一個broker可以容納多個topic。

 

  1. Topic :主題,可以理解為一個隊列,生產者和消費者面向的都是一個 topic。通俗來講的話,就是放置“消息”的地方,是消息投遞的一個容器。假如把消息看作是信封的話,那么 Topic 就是一個郵箱 。

 

  1. Partition:為了實現水平擴展和負載均衡,一個非常大的 topic 可以分布到多個 broker(即服務器)上,每台服務器上又可以分為多個 partition,每個 partition 是一個有序的隊列;

 

  1. Replica:副本(Replication),為保證集群中的某個節點發生故障時,該節點上的 partition 數據不丟失,且 Kafka仍然能夠繼續工作,Kafka 提供了副本機制,一個 topic 的每個分區都有若干個副本,一個 leader 和若干個 follower。

 

  1. Leader:每個分區多個副本的“主”,生產者發送數據的對象,以及消費者消費數據的對象都是 leader。

 

  1. Follower:每個分區多個副本中的“從”,實時從 leader 中同步數據,保持和 leader 數據的同步。 leader 發生故障時,某個 Follower 會成為新的 leader。

 擴展API

  1. Consumer Group消費者組是 Kafka 提供的可擴展且具有容錯性的消費者機   制。組內可以有多個消費者,共享一個公共ID,這個ID 被稱為 Group ID。組內的所有消費者協調在一起來消費訂閱主題(Topics)的所有分區(Partition)。當然,每個分區只能由同一個消費者組內的一個 Consumer 實例來消費。(新版本的Consumer Group將位移保存在Broker端的內部主題中)

       三大特性:

      1. 組內可以有一個或多個 Consumer 實例。這里的實例可以是一個單獨的進程,也可以是同一進程下的線程。在實際場景中,使用進程更為常見一些。

      2. Group ID 是一個字符串,在一個 Kafka 集群中,它標識唯一的一個 Consumer Group。

      3. 組內所有主題的單個分區,只能分配給組內的某個 Consumer 實例消費。這個分區當然也可以被其他的 Group 消費

    兩大模型:

      Kafka僅僅使用Consumer Group這一種機制,卻同時實現了傳統消息引擎系統的兩大模型:

    1. 如果所有消費者實例都屬於同一個Group,那么它實現的是消息隊列模型; (隊列的處理方式是一組消費者從服務器讀取消息,一條消息只由其中的一個消費者來處理。)

      2. 如果所有消費者實例分別屬於不同的Group,那么它實現的就是發布/訂閱模型; (發布-訂閱模型中,消息被廣播給所有的消費者,接收到消息的消費者都可以處理此消息。)

 

    

   2. KRaft模式: kafka之前一直使用Zookeeper來進行所有Broker的管理,每個Broker服務器啟動時都會到連接到Zookeeper注冊,創建brokers節點,寫入IP,端口等信息。當Broker發生狀態變化,比            如下線,對應Broker節點也就被刪除。2.8版本以后使用 Raft 模式,可以不再依賴Zookeeper,將之前存放在Zookeeper的元數據(元數據將被視為日志)、配置信息都會保存在 @metadata 這個                Topic          中,自動在kafka集群中復制。這樣 Kafka 就會簡單輕巧很多。

 

 

五、 代碼實例

1.配置kafka.properties:

 

 

 

 

  1. 封裝的生產者:                                                                         

 /**
     * kafka封裝生產者發送消息
     * @param reqDTO  入參:1. topic(主題名)
     *                     2. mesgKey(消息key)
     *                     3. mesgData(消息體)
     */
    public KafkaResDTO ProviderService(KafkaReqDTO reqDTO) throws Exception {
        KafkaResDTO respDTO = new KafkaResDTO();
        if (logger.isDebugEnabled()) {
            logger.debug(">>>>kafka發送消息入參->{}", reqDTO);
        }
        if (reqDTO == null) {
            logger.error("請求參數為空,kafka消息發送失敗。");
            respDTO.setRespMesg("請求參數為空,消息發送失敗。");
            respDTO.setRespCode("500");
            return respDTO;
        } else {
            if (StringUtils.isBlank(reqDTO.getTopic())) {
                logger.error("主題為空,kafka消息發送失敗。");
                respDTO.setRespMesg("主題為空,kafka消息發送失敗。");
                respDTO.setRespCode("500");
                return respDTO;
            } else {
                try {
                    kafkaTemplate.send(reqDTO.getTopic(),JSONUtil.toJsonStr(reqDTO.getMesgKey()),JSONUtil.toJsonStr(reqDTO.getMesgData()));
                    logger.info("kafka消息發送成功:{}", reqDTO);
                    respDTO.setRespCode("200");
                    respDTO.setRespMesg("kafka消息發送成功");
                } catch (Exception e) {
                    //e.printStackTrace();
                    logger.error("kafka發送消息失敗:{}", e.getMessage());
                    respDTO.setRespMesg("kafka發送消息失敗:{}" + e);
                    respDTO.setRespCode("500");
                }
            }
        }
        return respDTO;
    }

  1. 消費者(需要引入該項目依賴:opp-code-kafka-project)

@KafkaListener(topics = {"t3"},groupId = "group1",containerFactory="kafkaListenerContainerFactory")
public void m1(ConsumerRecord<String, String> record){
    try {
        System.out.println("消費key和value "+record.key()+" : "+record.value());
        //ack.acknowledge();
        logger.info("kafka消息消費成功:{}",record);

    } catch (Exception e) {
        //e.printStackTrace();
        logger.error("kafka消息消費失敗:{}",record,e.getMessage());
    }

}

六、 消息確認機制

1.生產者:ack確認機制

2.消費者:

  1.自動提交確認

  2.手動提交確認

       #禁止自動提交
  spring.kafka.consumer.enable-auto-commit=false
  spring.kafka.consumer.auto-offset-reset=earliest
  #手動確認消息
  spring.kafka.listener.ack-mode=manual_immediate
  # 批量一次最大拉取數據量
  max-poll-records: 3
  # 自動提交時間間隔,這種直接拉到數據就提交 容易丟數據
  auto-commit-interval: 2000
  # 批量拉取間隔,要大於批量拉取數據的處理時間,時間間隔太小會有重復消費
  max.poll.interval.ms: 5000

 

七、 重復消費問題

  1. 重復消費的原因:

    1:消費者端宕機、重啟或者被強行kill進程,導致消費者消費的offset沒有提交。

    2:設置enable.auto.commit為true,如果在關閉消費者進程之前,取消了消費者的訂閱,則有可能部分offset沒提交,下次重啟會重復消費。

         3:客戶端消費超時被判定掛掉而消費者重新分配分區, 導致重復消費。

 

 

  1. 解決方法:
    1. 提高消費者的處理速度。例如:對消息處理中比較耗時的步驟可通過異步的方式進行處理、利用多線程處理等。在縮短單條消息消費的同時,根據實際場景可將max.poll.interval.ms值設置大一點,避免不必要的Rebalance。可根據實際消息速率適當調小max.poll.records的值。
    2. 引入消息去重機制。例如:生成消息時,在消息中加入唯一標識符如消息id等。在消費端,可以保存最近的max.poll.records條消息id到redis或mysql表中,這樣在消費消息時先通過查詢去重后,再進行消息的處理。
    3. 保證消費者邏輯冪等性。

 

 

 

八、 可視化工具(Kafka Tool)

  1. 下載地址:http://www.kafkatool.com/download.html
  2. 簡單使用(查看主題消息):

 

 

九、 linux集群部署

1.服務器

18.5.204.164

18.5.204.165

 

2.JDK

  安裝環境所需jdk

 

3.zookeeper

1.使用的是穩定版:zookeeper-3.5.6-bin

  官網下載鏈接:https://zookeeper.apache.org/releases.html

 

2.將zookeeper包解壓在 /home/fwzt/kafka/local文件夾下

 

  3.cd zookeeper-3.5.6/conf/  然后 cp zoo_sample.cfg zoo.cfg復制一份配置文件並重命名zoo.cfg

 

  4.cd zookeeper-3.5.6/  然后 新建data和zlog 兩個同級文件夾

 

  5.vi zoo.cfg  進行編輯:

  修改dataDir路徑: dataDir=/home/fwzt/kafka/local/zookeeper-3.5.6/data

修改日志路徑:dataLogDir=/home/fwzt/kafka/local/zookeeper-3.5.6/zlog

端口默認:clientPort=2181

配置兩台服務器集群: server.0=18.5.204.164:2888:3888
     server.1=18.5.204.165:2888:3888

 

6.cd zookeeper-3.5.6/data 然后 vi myid 輸入0 保存(此處是創建serverID,此處的對應zoo.cfg文件里配置集群時在server.后面的那個0)

 

7.vi /etc/profile 進行編輯:

      #zookeeper

export ZOOKEEPER_HOME=/home/fwzt/kafka/local/zookeeper-3.5.6

export PATH=$PATH:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf

 

  8.執行 source /etc/profile使文件生效

 

9.其他集群服務器使用相同配置,只有第6步需要改變:vi myid 輸入的節點數字和本服務器節點數字對應。

 

10.啟動:sh zkServer.sh start  

(先啟動的服務都會報錯 屬於正常,等最后一台啟動后,就都不會再報錯)

 

  查看是否啟動:zkServer.sh status 

  

此處為啟動成功(Mode: follower 為集群副本的從, Mode: leader為集群副本的主)

 

停止  sh zkServer.sh stop

 

 

 4.Kafka

1.使用版本:kafka_2.12-2.4.0

官網下載鏈接:https://kafka.apache.org/downloads

 

2.將kafka_2.12-2.4.0包解壓在 /home/fwzt/kafka/local文件夾下

 

3.cd kafka_2.12-2.4.0/  然后 mkdir logs

 

4.cd kafka_2.12-2.4.0/config/  然后 vi server.properties 進行編輯:

  修改:broker.id=0 (每台此處設置和zookeepermyid相對應)

修改:listeners=PLAINTEXT://18.5.204.164:9092 (每台服務器使用各自的ip)

修改topic創建分區數:num.partitions=5

修改日志保留時間:log.retention.hours=72

修改日志路徑:log.dirs=/home/fwzt/kafka/local/kafka_2.12-2.4.0/logs

修改對應zookeeper的集群ip:zookeeper.connect=18.5.204.164:2181,18.5.204.165:2182

 

5.vi /etc/profile 進行編輯(覆蓋之前zookeeper配置):

#ZOOKEEPER_HOME

export ZOOKEEPER_HOME=/home/fwzt/kafka/local/zookeeper-3.5.6

#kafka

export KAFKA_HOME=/home/fwzt/kafka/local/kafka_2.12-2.4.0

export PATH=$PATH:${KAFKA_HOME}/bin:$ZOOKEEPER_HOME/bin:$ZOOKEEPER_HOME/conf

 

6.執行 source /etc/profile使文件生效

 

7.其他集群服務器使用相同配置,只需改動 broker.id 和 listeners

 

8.常規啟動:nohup bin/kafka-server-start.sh config/server.properties &

 

進程守護模式啟動:

  nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 & 

 

查看是否啟動:ps -ef | grep kafka

 

關閉:bin/kafka-server-stop.sh

 


 

創建主題:

 .\bin\windows\kafka-topics.bat --create --zookeeper localhost:2182 --replication-factor 1 --partitions 1 --topic test

查看所有topic

.\bin\windows\kafka-topics.bat --list --zookeeper localhost:2182

查看具體topic:

.\bin\windows\kafka-topics.bat --describe --zookeeper localhost:2182 --topic Test

 創建生產者:

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

創建消費者:

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

 

 

 

集群:

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 3 --topic my-test

 

bin\windows\kafka-topics.bat --zookeeper localhost:2181,localhost:2182,localhost:2183 --describe --topic my-test

 

bin\windows\kafka-console-producer.bat --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic test1

 

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test1 --from-beginning

 

 

查詢是否有未消費的消息

kafkabin目錄下,執行命令:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group group1

 

可以看到當前的消費進度(CURRENT-OFFSET)、消息進度(LOG-END-OFFSET)、落后量(LAG:

 

 

 

 

 

 

 

application.properties配置文件版本:

#====================================== kafka 生產者 ==============================================

# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
# 寫入失敗時,重試次數
spring.kafka.producer.retries=3
#指定創建信息nio-buffer緩沖區大小約1M
spring.kafka.producer.buffer-memory=1024000
#累計約1M條就發發送,必須小於緩沖區大小,否則報錯無法分配內存(減少IO次數,過大則延時高,瞬間IO大)
spring.kafka.producer.batch-size=1024000
#默認0ms立即發送,不修改則上兩條規則相當於無效(這個屬性時個map列表,producer的其它配置也配置在這里,詳細↑官網,這些配置會注入給KafkaProperties這個配置bean中,供#spring自動配置kafkaTemplate這個對象時使用)
#spring.kafka.producer.properties.linger.ms=1000
#acks=0 把消息發送到kafka就認為發送成功
#acks=1 把消息發送到kafka leader分區,並且寫入磁盤就認為發送成功
# acks=all 把消息發送到kafka leader分區,並且leader分區的副本follower對消息進行了同步就任務發送成功spring.kafka.producer.acks=1
#由於網絡傳輸過來的是byte[],生產者端key/value需要序列化(可轉為String和Byte數組兩種選擇)
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#====================================== kafka 消費者 ==============================================

#spring.kafka.bootstrap-servers=127.0.0.1:9092 本項目在生產者處已配置kafka的ip
#指定默認的消費組組名
spring.kafka.consumer.group-id: group-test
#自動偏移量:  1.earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
#           2.latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據
#           3.none :topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
spring.kafka.consumer.auto-offset-reset: earliest
# 設置自動提交offset
spring.kafka.consumer.enable-auto-commit=true
#如果設置自動提交offset,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認值為5000。
spring.kafka.consumer.auto-commit-interval=100
#由於網絡傳輸過來的是byte[],消費者端key/value需要反序列化(可轉為String和Byte數組兩種選擇)
#spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

 

消費者手動確認機制:

 #禁止自動提交
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=earliest
#手動確認消息
spring.kafka.listener.ack-mode=manual_immediate
# 批量一次最大拉取數據量
max-poll-records: 3
# 自動提交時間間隔,這種直接拉到數據就提交 容易丟數據
auto-commit-interval: 2000
# 批量拉取間隔,要大於批量拉取數據的處理時間,時間間隔太小會有重復消費
max.poll.interval.ms: 5000

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM