消息隊列之kafka


消息隊列之activeMQ

消息隊列之RabbitMQ

1.kafka介紹

kafka是由scala語言開發的一個多分區,多副本的並且居於zookeeper協調的分布式的發布-訂閱消息系統。具有高吞吐、可持久化、可水平擴展、支持流處理等特性;能夠支撐海量數據的數據傳遞;並且將消息持久化到磁盤中,並對消息創建了備份保證了數據的安全。kafka在保證了較高的處理速度的同時,又能保證數據處理的低延遲和數據的零丟失。

kafka的特性:

  1. 高吞吐量,低延遲:kafka每秒可以處理幾十萬條消息,延遲最低大概毫秒,每個主題可以分為多個分區,消費組對分區進行消費操作
  2. 可擴展性:支持熱擴展
  3. 持久性,可靠性:消息被持久化到本地磁盤,並且支持數據備份
  4. 容錯性:允許集群中節點失敗,如副本的數量為n,則允許n-1個節點失敗
  5. 高並發:允許上千個客戶端同時讀寫
  6. 可伸縮性:kafka在運行期間可以輕松的擴展或者收縮;可以擴展一個kafka主題來包含更多的分區

kafka的主要應用場景:

  • 消息處理
  • 網站跟蹤
  • 指標存儲
  • 日志聚合
  • 流式處理
  • 事件朔源

基本流程:

kafka的關鍵角色:

  • Producer:生產者即數據的發布者,該角色將消息發布到kafka的topic中
  • Consumer:消費者,可以從broker中讀取數據
  • Consumer Group:每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於默認的group)
  • Topic:划分數據的所屬類的一個類別屬性
  • Partition:topic中的數據分割為一個或多個partition,每個topic中至少含有一個partition
  • Partition offset:每條消息都有一個當前partition下的唯一的64字節的offset,它指名了這條消息的起始位置
  • Replicas of Partition:副本,是一個分區的備份
  • Broker:kafka集群中包含一個或多個服務器 ,服務器的節點稱為broker
  • Leader:每個partition由多個副本,其中有且僅有一個作為leader,leader是當前負責數據的讀寫的partition
  • Follower:Follower跟隨Leader,所有的寫請求都是通過leader路由,數據變更會廣播到所有的follower上,follower與leader的數據保持同步
  • AR:分區中所有的副本統稱為AR
  • ISR:所有與leader部分保持一定程度的副本組成ISR
  • OSR:與leader副本同步滯后過多的副本
  • HW:高水位,標識了一個特定的offset,消費者只能拉去到這個offset之前的消息
  • LEO:即日志末端位移,記錄了該副本底層日志中的下一條消息的位移值

2.kafka的安裝

安裝kafka的前提是安裝zookeeper以及jdk環境。我這里安裝的版本是jdk1.8.0_20,kafka_2.11-1.0.0,zookeeper-3.4.14。kafka與jdk的版本一定要對應。我之前用的kafka_2.12_2.3.0,就不行

1.將kafka的文件上傳到home目錄下並解壓縮到/usr/local目錄下

root@localhost home]# tar -xvzf kafka_2.11-1.0.0.tgz -C /usr/local

2.進入kafka的config

[root@localhost local]# cd /usr/local/kafka_2.11-1.0.0/config

3.編輯server.properties文件

# 如果是集群環境中,則每個broker.id要設置為不同
broker.id=0
# 將下面這一行打開,這相當於kafka對外提供服務的入口
listeners=PLAINTEXT://192.168.189.150:9092
# 日志存儲位置:log.dirs=/tmp/kafka_logs 改為
log.dirs=/usr/local/kafka_2.11-1.0.0/logs
# 修改zookeeper的地址
zookeeper.connect=192.168.189.150:2181
# 修改zookeeper的連接超時時長,默認為6000(可能會超時)
zookeeper.connection.timeout.ms=10000

3.啟動zookeeper

因為我是配置的zookeeper集群,所以需要將三台zookeeper都啟動。只啟動單台服務器zookeeper在選舉的時候將不可進行(當整個集群超過半數機器宕機,zookeeper會認為集群處於不可用狀態)

[root@localhost ~]# zkServer.sh start
# 查看狀態
[root@localhost ~]# zkServer.sh status

4.啟動kafka

[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh config/server.properties
# 也可以使用后台啟動的方式,如果不使用后台啟動,則在啟動后操作需要新開一個窗口才能操作
[root@localhost kafka_2.11-1.0.0]# bin/kafka-server-start.sh -daemon config/server.properties

5.創建一個主題

# --zookeeper: 指定了kafka所連接的zookeeper的服務地址
# --partitions: 指定了分區的個數
# --replication-factor: 指定了副本因子
[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic charon --partitions 2 --replication-factor 1
Created topic "charon".

6.展示所有的主題(驗證創建的主題是否有問題)

[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --list
charon

7.查看某個主題的詳情

[root@localhost kafka_2.11-1.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic charon
Topic:charon	PartitionCount:2	ReplicationFactor:1	Configs:
	Topic: charon	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
	Topic: charon	Partition: 1	Leader: 0	Replicas: 0	Isr: 0

8.新開一個窗口啟動消費者接收消息.

--bootstrap-server:指定連接kafka集群的地址,9092是kafka服務的端口。因為我的配置文件中配置的是具體地址,所以需要寫明具體地址。否則會報 [Producer clientId=console-producer] Connection to node -1 could not be established. Broker may not be available.的錯

[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.189.150:9092 --topic charon

9.新開一個窗口啟動生產者產生消息

--bootstrap-server:指定連接kafka集群的地址,9092是kafka服務的端口。因為我的配置文件中配置的是地址。

[root@localhost kafka_2.11-1.0.0]# bin/kafka-console-producer.sh --broker-list 192.168.189.150:9092 --topic charon

10.產生消息並消費消息

# 生產者生產消息
>hello charon good evening
# 消費者這邊接收到的消息
hello charon good evening

當然上面這種方式,只有在同一個網段才能實現。

3.生產者和消費者

kafka生產流程:

1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader

2)producer將消息發送給該leader

3)leader將消息寫入本地log

4)followers從leader pull消息,寫入本地log后向leader發送ACK

5)leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)並向producer發送ACK

消費組:

kafka消費者是消費組的一部分,當多個消費者形成一個消費組來消費主題的時候,每個消費者都會收到來自不同分區的消息。假如消費者都在同一個消費者組里面,則是工作-隊列模型。假如消費者在不同的消費組里面,則是發布-訂閱模型。

當單個消費者無法跟上數據的生成速度時,就可以增加更多的消費者來分擔負載,每個消費者只處理部分分區的消息,從而實現單個應用程序的橫向伸縮。但是千萬不要讓消費者的數量少於分區的數量,因為此時會有多余的消費者空閑。

當有多個應用程序都需要從kafka獲取消息時,讓每個應用程序對應一個消費者組,從而使每個應用程序都能獲取一個或多個topic的全部消息。每個消費者對應一個線程,如果要在同一個消費者組中運行多個消費者,需要讓每個消費者運行在自己的線程中。

4.代碼實踐

1.添加依賴:

<!--添加kafka的依賴-->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
</dependency>

生產者代碼:

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * @className: Producer
 * @description: kafka的生產者
 * @author: charon
 * @create: 2021-01-18 08:52
 */
public class Producer {

    /**topic*/
    private static final String topic = "charon";

    public static void main(String[] args) {
        // 配置kafka的屬性
        Properties properties = new Properties();
        // 設置地址
        properties.put("bootstrap.servers","192.168.189.150:9092");
        // 設置應答類型,默認值為0。(0:生產者不會等待kafka的響應;1:kafka的leader會把這條消息寫到本地日志文件中,但不會等待集群中其他機器的成功響應;
        // -1(all):leader會等待所有的follower同步完成,確保消息不會丟失,除非kafka集群中的所有機器掛掉,保證可用性)
        properties.put("acks","all");
        // 設置重試次數,大於0,客戶端會在消息發送失敗是重新發送
        properties.put("reties",0);
        // 設置批量大小,當多條消息需要發送到同一個分區時,生產者會嘗試合並網絡請求,提交效率
        properties.put("batch.size",10000);
        // 生產者設置序列化方式,默認為:org.apache.kafka.common.serialization.StringSerializer
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 創建生產者
        KafkaProducer producer = new KafkaProducer(properties);
        for (int i = 0; i < 5; i++) {
            String message = "hello,charon message "+ i ;
            producer.send(new ProducerRecord(topic,message));
            System.out.println("生產者發送消息:" + message);
        }
        producer.close();
    }
}

消費者代碼:

package kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * @className: Consumer
 * @description: kafka的消費者
 * @author: charon
 * @create: 2021-01-18 08:53
 */
public class Consumer implements Runnable{

    /**topic*/
    private static final String topic = "charon";

    /**kafka消費者*/
    private static KafkaConsumer kafkaConsumer;

    /**消費消息*/
    private static ConsumerRecords<String,String> msgList;

    public static void main(String[] args) {
        // 配置kafka的屬性
        Properties properties = new Properties();
        // 設置地址
        properties.put("bootstrap.servers","192.168.189.150:9092");
        // 消費者設置反序列化方式
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        // 設置消費組
        properties.put("group.id","test01");
        // 設置允許自動提交
        properties.put("enable.auto.commit","true");
        // 設置自動提交的時間間隔
        properties.put("auto.commit.interval.ms","1000");
        // 設置連接的超時市場
        properties.put("session.timeout.ms","30000");
        // 創建消費者
        kafkaConsumer = new KafkaConsumer(properties);
        // 指定分區
        kafkaConsumer.subscribe(Arrays.asList(topic));
        Consumer consumer = new Consumer();
        new Thread(consumer).start();
        // kafkaConsumer.close();
    }

    @Override
    public void run() {
        for (;;){
            // 獲取數據的超時1000ms
            msgList = kafkaConsumer.poll(1000);
            if(null != msgList && msgList.count() > 0){
                for (ConsumerRecord<String,String> consumerRecord: msgList ) {
                    System.out.println("消費者接受到消息,開始消費:" + consumerRecord);
                    System.out.println("topic= "+consumerRecord.topic()+" ,partition= "+consumerRecord.partition()+" ,offset= "+consumerRecord.offset()+" ,value="+consumerRecord.value()+"\n");
                }
            }else{
                // 如果沒有接受到數據,則阻塞一段時間
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

5.提交和偏移量

kafka不會像activemq那樣需要得到消費者確認,所以消費者需要追蹤kafka的消息消費到分區中的哪個位置了,這個位置就叫偏移量。把更新分區當前位置的操作叫做提交。如果消費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之后,每個消費者可能分配到新的分區上,而不是之前處理的那個,為了能夠繼續之前的工作,消費者需要讀取每個分區最后一次提交的偏移量,然后從偏移量指定的地方繼續處理。

這樣的話就可能會有以下兩種情況:

1.提交的偏移量小於客戶端處理的偏移量

如果提交的偏移量小於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息就會被重新處理。

2.提交的偏移量大於客戶端處理的偏移量

如果提交的偏移量大於客戶端處理的最后一個消息的偏移量,那么處於兩個偏移量之間的消息就會丟失。

kafka的提交方式:

  • 自動提交模式:消費者拉取數據之后自動提交偏移量,不關心后續對消息的處理是否正確。優點是:消費快,適用於數據一致性弱的業務場景,缺點為:消息容易丟失或者重復消費

    將enable.auto.commit被設為 true

  • 手動提交模式:消費者拉取數據之后做業務處理,而且需要業務處理完成才算真正消費成功。缺點:在broker對提交請求做出回應之前,應用程序會一直阻塞,會限制應用程序的吞吐量

    將enable.auto.commit被設為 false;

    在消息處理完成后手動調用consumer.commitSync();

  • 異步提交:只需要發送提交請求,無需等待broker的響應

    在消息處理完成后手動調用consumer.commitAsync();這個方法也支持回調,在broker作出響應時會執行回調,回調經常被用於記錄提交失敗將錯誤信息和偏移量記錄下來,如果重新提交,則需要注意提交的順序。

6.再均衡監聽器

在為消費者分配新的分區或者移除舊的分區時,可以通過消費者API執行一些應用程序代碼,在調用subscribe(Pattern pattern, ConsumerRebalanceListener listener)時,可以傳入一個再均衡監聽器。

需要實現的兩個方法:

  • public void onPartitionRevoked(Collection partitions);

    在再均衡開始之前和消費者停止讀取消息之后被調用,如果在這里提交偏移量,下一個接管分區的消費者就知道從哪里開始讀取了,要注意提交的是最近處理過的偏移量,而不是批次中還在處理的最后一個偏移量。

  • public void onPartitionAssigned(Collection partitions)

    在重新分配分區之后和消費者開始夫區消息之前被調用

7.kafka消息重復和丟失分析

首先來看看kafka的應答類型:

  • ack=0:生產者無需等待來自broker的確認而繼續發送下一批消息(數據傳輸效率最高,但可靠性最低)
  • ack=1:生產者在ISR中的leader已成功收到數據並寫入到本地日志文件中,但不會等待集群中其他follower的成功響應
  • ack=-1:生產者需要等待ISR中的所有的follower同步完成,確保消息不會丟失,除非kafka集群中的所有機器掛掉,保證可用性(可靠性最高,但也不能保證數據不丟失)

如果是單機環境中,三者沒有區別。

kafka的消息重復和丟失可能發生在三個階段:

1.生產者階段的原因為:生產者發送的消息沒有收到正確的broker的響應,導致生產者重試。

生產者發送一條消息,broker羅盤以后因為網絡等種種原因,發送端得到一個發送失敗的響應或者網絡中斷,然后prodcuer收到一個可恢復的exception重試消息導致消息重試。

重試過程:

  1. new KafkaProducer()后創建一個后台線程KafkaThread掃描RecordAccumulator中是否有消息;
  2. 調用KafkaProducer.send()發送消息,實際上只是把消息保存到RecordAccumulator中;
  3. 后台線程KafkaThread掃描到RecordAccumulator中有消息后,將消息發送到kafka集群;
  4. 如果發送成功,那么返回成功;
  5. 如果發送失敗,那么判斷是否允許重試。如果不允許重試,那么返回失敗的結果;如果允許重試,把消息再保存到RecordAccumulator中,等待后台線程KafkaThread掃描再次發送;

解決方式:

1.啟動kafka的冪等性。要啟動kafka的冪等性,需要修改配置文件中的:enable.idempotenmce=true,同時要求ack=all且retries>1。如果要提高數據的可靠性,還需要min.insync.replicas這個參數配合,如果ISR的副本數少於min.insync.replicas則會產生異常,原因:消息被拒絕,同步副本數量少於所需的數量

冪等性的原理:

每個生產者都有一個PID,服務端回通過PID關聯記錄每個生產者的狀態,每個生產者的每個消息會帶上一個遞增的序列(sequence),服務端會記錄每個生產者對應的當前的最大的序列(PID+seq),如果新的消息帶上的序列不大於當前的最大的seq就拒絕這條消息,如果消息落盤會同時更新最大的seq,這個時候重發的消息會唄服務器拒掉從而避免了消息重復。

2.設置ack=0,即不需要確認,不重試。但可能會丟失數據,所以適用於吞吐量指標重要性高於數據丟失,例如:日志收集。

2.生產者和broker階段的原因:

  1. ack=0,不重試。生產者發送消息后,不管結果如何,如果發送失敗數據也就丟失了。

  2. ack=1,leader宕機(crash)了,生產者發送消息完,只等待leader寫入成功就返回了,leader宕機了,這是follower還沒來得及同步,那么消息就丟失了。

  3. unclean.leader.election.enable 配置為true。允許選舉ISR以外的副本作為leader,會導致數據丟失,默認為fase(非ISR中的副本不能參與選舉)。

    生產者發送完異步消息,只等待leader寫入成功就返回了,leader宕機了,這時ISR中沒有follower,leader從OSR中選舉,因為OSR中本來落后於leader而造成數據丟失。

解決方式:

1.配置:ack=-1,retries>1,unclean.leader.election.enable=false

生產者發送完消息,等待follower同步完在返回,如果異常則重試,這時副本的數量可能影響吞吐量,最大不超過5個,一般三個就夠了。

2.配置:min.insync.replicas > 1

當生產者將ack設置為all或-1時,min.insync副本指定必須確認寫操作成功的最小副本數量,如果不能滿足這個最小值,則生產者將引發一個異常。當一起使用時,min.insync.replicas和ack允許執行更大的持久性保證。

3.失敗的offset單獨記錄

生產者發送消息,回自動重試,遇到不可恢復的異常會拋出,這時可以捕獲異常記錄到數據庫或緩存中,進行單獨處理。

3.消費階段的原因:數據消費完沒有及時提交offset到broker。消息消費端在消費過程中掛掉沒有及時提交offset到broker,另一個消費者啟動拿到之前記錄的offset開始消費,由於offset的滯后性可能會導致新啟動的客戶端有少量重復消費。

解決方式:

1.取消自動提交,每次消費完或者程序退出時手動提交,這也沒有辦法保證不會有重復。

2.做冪等性,盡量讓下游做冪等或者盡量每消費一條消息都記錄offset。對於少書嚴格的場景可能需要吧offset或唯一ID和下游狀態更新放在同一個數據庫里做事務來保證精確的一次更新或者在下游數據庫表里同時記錄消費的offset。然后更新數據的時候用消費位點做樂觀鎖拒絕掉舊的位點的數據更新。

參考文章:

https://www.cnblogs.com/qingyunzong/p/9004509.html

https://www.cnblogs.com/frankdeng/p/9310704.html

https://www.jianshu.com/p/6845469d99e6

https://www.cnblogs.com/wangzhuxing/p/10124308.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM