1 Kafka
1.1 定義
Kafka
是一個分布式流媒體平台,kafka
官網:http://kafka.apache.org/
Kafka
是一種高吞吐量、分布式、基於發布/訂閱
的消息系統,最初由 LinkedIn
公司開發,使用Scala
語言編寫,目前是Apache
的開源項目。
流媒體平台有三個關鍵功能:
- 發布和訂閱記錄流,類似於消息隊列或企業消息傳遞系統。
- 以容錯的持久方式存儲記錄流。
- 記錄發生時處理流。
Kafka
通常用於兩大類應用:
- 構建可在系統或應用程序之間可靠獲取數據的實時流數據管道
- 構建轉換或響應數據流的實時流應用程序
1.1.1 Kafka名詞
下面是Kafka
中涉及到的相關概念:
broker
:Kafka
服務器,負責消息存儲和轉發topic
:消息類別,Kafka
按照topic
來分類消息(即使如此,kafka仍然有點對點和廣播發布類型)partition
:topic
的分區,一個topic
可以包含多個partition
,topic
消息保存在各個partition
上offset
:消息在日志中的位置,可以理解是消息在partition
上的偏移量,也是代表該消息的唯一序號Producer
:消息生產者Consumer
:消息消費者Consumer Group
:消費者分組,每個Consumer
必須屬於一個group
Zookeeper
:保存着集群broker、topic、partition
等meta
數據;另外,還負責broker
故障發現,partition leader
選舉,負載均衡等功能
1.1.2 Kafka核心API
Kafka
有四個核心API:
-
Producer API
(生產者API)允許應用程序發布記錄流至一個或多個kafka
的topics
(主題) -
Consumer API
(消費者API)允許應用程序訂閱一個或多個topics
(主題),並處理所產生的對他們記錄的數據流。 -
Streams API
(流API)允許應用程序充當流處理器,從一個或多個topics
(主題)消耗的輸入流,並產生一個輸出流至一個或多個輸出的topics
(主題),有效地變換所述輸入流,以輸出流。 -
Connector API
(連接器API)允許構建和運行kafka topics
(主題)連接到現有的應用程序或數據系統中重用生產者或消費者。例如,關系數據庫的連接器可能捕獲對表的每個更改。
在Kafka
中,客戶端和服務器之間的通信是通過簡單,高性能,語言無關的TCP
協議完成的。此協議已版本化並保持與舊版本的向后兼容性。Kafka
提供Java客戶端,但客戶端有多種語言版本。
1.2 相關組件介紹
1.2.1 Topic
Topic
是生產者發送消息的目標地址,是消費者的監聽目標
一個服務可以監聽、發送多個 Topics
Kafka 中有一個consumer-group(消費者組)
的概念。
這是一組服務,扮演一個消費者
如果是消費者組接收消息,Kafka
會把一條消息路由到組中的某一個服務
這樣有助於消息的負載均衡,也方便擴展消費者。
Topic
扮演一個消息的隊列。
首先,一條消息發送了
然后,這條消息被記錄和存儲在這個隊列中,不允許被修改
接下來,消息會被發送給此 Topic
的消費者。
但是,這條消息並不會被刪除,會繼續保留在隊列中
像之前一樣,這條消息會發送給消費者、不允許被改動、一直呆在隊列中。
(消息在隊列中能呆多久,可以修改 Kafka 的配置)
1.2.2 Partitions分區
上面 Topic
的描述中,把 Topic
看做了一個隊列,實際上,一個 Topic
是由多個隊列組成的,被稱為Partition(分區)
。
這樣可以便於 Topic
的擴展
生產者發送消息的時候,這條消息會被路由到此 Topic
中的某一個 Partition
消費者監聽的是所有分區
生產者發送消息時,默認是面向 Topic
的,由 Topic
決定放在哪個 Partition
,默認使用輪詢策略
也可以配置 Topic
,讓同類型的消息都在同一個 Partition
。
例如,處理用戶消息,可以讓某一個用戶所有消息都在一個 Partition
。
例如,用戶1發送了3條消息:A、B、C,默認情況下,這3條消息是在不同的 Partition 中(如 P1、P2、P3)。
在配置之后,可以確保用戶1的所有消息都發到同一個分區中(如 P1)
這個功能有什么用呢?
這是為了提供消息的【有序性】。
消息在不同的 Partition 是不能保證有序的,只有一個 Partition 內的消息是有序的
1.2.3 Topics主題 和 partitions分區
一個Topic
可以認為是一類消息,每個topic
將被分成多個partition
(區),每個partition
在存儲層面是append log
文件
主題是發布記錄的類別或訂閱源名稱。Kafka
的主題總是多用戶; 也就是說,一個主題可以有零個,一個或多個消費者訂閱寫入它的數據。
對於每個主題,Kafka
集群都維護一個如下所示的分區日志:
每個分區都是一個有序的,不可變的記錄序列,不斷附加到結構化的提交日志中。分區中的記錄每個都分配了一個稱為偏移的順序ID
號,它唯一地標識分區中的每個記錄。
Kafka
集群持久保存所有已發布的記錄 - 無論是否已使用 - 使用可配置的保留期。例如,如果保留策略設置為兩天,則在發布記錄后的兩天內,它可供使用,之后將被丟棄以釋放空間。Kafka
的性能在數據大小方面實際上是恆定的,因此長時間存儲數據不是問題。
實際上,基於每個消費者保留的唯一元數據是該消費者在日志中的偏移或位置。這種偏移由消費者控制:通常消費者在讀取記錄時會線性地提高其偏移量,但事實上,由於該位置由消費者控制,因此它可以按照自己喜歡的任何順序消費記錄。例如,消費者可以重置為較舊的偏移量來重新處理過去的數據,或者跳到最近的記錄並從“現在”開始消費。
這些功能組合意味着Kafka
消費者consumers
非常cheap
- 他們可以來來往往對集群或其他消費者沒有太大影響。例如,可以使用我們的命令行工具“tail”任何主題的內容,而無需更改任何現有使用者所消耗的內容。
日志中的分區有多種用途。首先,它們允許日志擴展到超出適合單個服務器的大小。每個單獨的分區必須適合托管它的服務器,但主題可能有許多分區,因此它可以處理任意數量的數據。其次,它們充當了並行性的單位 - 更多的是它
1.2.4 Distribution分配
一個Topic
的多個partitions
,被分布在kafka
集群中的多個server
上;每個server
(kafka實例)負責partitions
中消息的讀寫操作;此外kafka
還可以配置partitions
需要備份的個數(replicas
),每個partition
將會被備份到多台機器上,以提高可用性.
基於replicated
方案,那么就意味着需要對多個備份進行調度;每個partition
都有一個server
為leader
;leader
負責所有的讀寫操作,如果leader
失效,那么將會有其他follower
來接管(成為新的leader);follower
只是單調的和leader
跟進,同步消息即可..由此可見作為leader
的server
承載了全部的請求壓力,因此從集群的整體考慮,有多少個partitions就意味着有多少個"leader",kafka會將"leader"均衡的分散在每個實例上,來確保整體的性能穩定
1.2.5 Producers生產者 和 Consumers消費者
1.2.5.1 Producers生產者
Producers
將數據發布到指定的topics
主題。同時Producer
也能決定將此消息歸屬於哪個partition
;比如基於round-robin
方式或者通過其他的一些算法等。
1.2.5.2 Consumers
本質上kafka
只支持Topic
.每個consumer
屬於一個consumer group
;反過來說,每個group
中可以有多個consumer
.發送到Topic
的消息,只會被訂閱此Topic
的每個group
中的一個consumer
消費。
如果所有使用者實例具有相同的使用者組,則記錄將有效地在使用者實例上進行負載平衡。
如果所有消費者實例具有不同的消費者組,則每個記錄將廣播到所有消費者進程。
分析:兩個服務器Kafka群集,托管四個分區(P0-P3),包含兩個使用者組。消費者組A有兩個消費者實例,B組有四個消費者實例。
在Kafka
中實現消費consumption
的方式是通過在消費者實例上划分日志中的分區,以便每個實例在任何時間點都是分配的“公平份額”的獨占消費者。維護組中成員資格的過程由Kafka協議動態處理。如果新實例加入該組,他們將從該組的其他成員接管一些分區; 如果實例死亡,其分區將分發給其余實例。
Kafka
僅提供分區內記錄的總訂單,而不是主題中不同分區之間的記錄。對於大多數應用程序而言,按分區排序與按鍵分區數據的能力相結合就足夠了。但是,如果您需要對記錄進行總訂單,則可以使用僅包含一個分區的主題來實現,但這將意味着每個使用者組只有一個使用者進程。
1.2.5.3 Consumers kafka確保
發送到partitions
中的消息將會按照它接收的順序追加到日志中。也就是說,如果記錄M1
由與記錄M2
相同的生成者發送,並且首先發送M1
,則M1
將具有比M2
更低的偏移並且在日志中更早出現。
消費者實例按照它們存儲在日志中的順序查看記錄。對於消費者而言,它們消費消息的順序和日志中消息順序一致。
如果Topic
的replicationfactor
為N,那么允許N-1個kafka實例失效,我們將容忍最多N-1個服務器故障,而不會丟失任何提交到日志的記錄。
1.2.6 架構和zookeeper關系
Kafka
是集群架構的,ZooKeeper
是重要組件。
ZooKeeper
管理者所有的 Topic
和 Partition
。
Topic
和 Partition
存儲在 Node
物理節點中,ZooKeeper
負責維護這些 Node
有2個 Topic,各自有2個 Partition
這是邏輯上的形式,但在 Kafka 集群中的實際存儲可能是這樣的
Topic A
的 Partition #1
有3份,分布在各個 Node
上。
這樣可以增加 Kafka
的可靠性和系統彈性。
3個 Partition #1
中,ZooKeeper
會指定一個 Leader
,負責接收生產者發來的消息
其他2個 Partition #1 會作為 Follower,Leader 接收到的消息會復制給 Follower
這樣,每個 Partition 都含有了全量消息數據。
即使某個 Node 節點出現了故障,也不用擔心消息的損壞。
Topic A 和 Topic B 的所有 Partition 分布可能就是這樣的
轉載於:https://mp.weixin.qq.com/s/k7DJJGmImcpnaSy9AhAmmQ
1.3 kafka是如何保證消息的有序性
kafka
這樣保證消息有序性的:
一個 topic
,一個 partition
,一個 consumer
,內部單線程消費,單線程吞吐量太低,一般不會用這個。(全局有序性)
寫 N
個內存 queue
,具有相同 key
的數據都到同一個內存 queue
;然后對於 N
個線程,每個線程分別消費一個內存 queue
即可,這樣就能保證順序性。
大家可以看下消息隊列的有序性是怎么推導的:
消息的有序性,就是指可以按照消息的發送順序來消費。有些業務對消息的順序是有要求的,比如先下單再付款,最后再完成訂單,這樣等。假設生產者先后產生了兩條消息,分別是下單消息(M1),付款消息(M2),M1比M2先產生,如何保證M1比M2先被消費呢。
為了保證消息的順序性,可以將將M1、M2發送到同一個Server上,當M1發送完收到ack后,M2再發送。如圖:
這樣還是可能會有問題,因為從MQ服務器到服務端,可能存在網絡延遲,雖然M1先發送,但是它比M2晚到。
那還能怎么辦才能保證消息的順序性呢?將M1和M2發往同一個消費者,且發送M1后,等到消費端ACK成功后,才發送M2就得了。
消息隊列保證順序性整體思路就是這樣啦。比如Kafka
的全局有序消息,就是這種思想的體現: 就是生產者發消息時,1個Topic
只能對應1個Partition
,一個 Consumer
,內部單線程消費。
但是這樣吞吐量太低,一般保證消息局部有序即可。在發消息的時候指定Partition Key
,Kafka
對其進行Hash
計算,根據計算結果決定放入哪個Partition
。這樣Partition Key
相同的消息會放在同一個Partition
。然后多消費者單線程消費指定的Partition
參考鏈接:https://mp.weixin.qq.com/s/gHjuYH6R6Fgfn3WZ8W79Zg
1.4 消息重復
1.4.1 引言
數據重復這個問題其實也是挺正常,全鏈路都有可能會導致數據重復
通常,消息消費時候都會設置一定重試次數來避免網絡波動造成的影響,同時帶來副作用是可能出現消息重復。
整理下消息重復的幾個場景:
生產端
: 遇到異常,基本解決措施都是重試
- 場景一:
leader
分區不可用了,拋LeaderNotAvailableException
異常,等待選出新leader
分區。 - 場景二:
Controller
所在Broker
掛了,拋NotControllerException
異常,等待Controller
重新選舉。 - 場景三:網絡異常、斷網、網絡分區、丟包等,拋
NetworkException
異常,等待網絡恢復。
- 場景一:
消費端
:poll
一批數據,處理完畢還沒提交offset
,機子宕機重啟了,又會poll
上批數據,再度消費就造成了消息重復。
1.4.2 解決重復消息
了解下消息的三種投遞語義:
- 最多一次(
at most once
): 消息只發一次,消息可能會丟失,但絕不會被重復發送。例如:mqtt 中 QoS = 0。 - 至少一次(
at least once
): 消息至少發一次,消息不會丟失,但有可能被重復發送。例如:mqtt 中 QoS = 1 - 精確一次(
exactly once
): 消息精確發一次,消息不會丟失,也不會被重復發送。例如:mqtt 中 QoS = 2。
了解了這三種語義,再來看如何解決消息重復,即如何實現精准一次,可分為三種方法:
Kafka
冪等性Producer
: 保證生產端發送消息冪等。局限性,是只能保證單分區且單會話(重啟后就算新會話)Kafka
事務: 保證生產端發送消息冪等。解決冪等Producer
的局限性。- 消費端冪等: 保證消費端接收消息冪等
1.4.2.1 Kafka 冪等性 Producer
冪等性指:無論執行多少次同樣的運算,結果都是相同的。即一條命令,任意多次執行所產生的影響均與一次執行的影響相同
冪等性使用示例:在生產端添加對應配置即可
- 設置冪等,啟動冪等。
- 配置
acks
,注意
:一定要設置 acks=all,否則會拋異常。 - 配置 max.in.flight.requests.per.connection 需要 <= 5,否則會拋異常
OutOfOrderSequenceException
。- 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
- Kafka >= 1.1, max.in.flight.request.per.connection <= 5
Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設置冪等
props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這里默認為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
為了更好理解,需要了解下 Kafka
冪等機制:
Producer
每次啟動后,會向Broker
申請一個全局唯一的pid
。(重啟后 pid 會變化,這也是弊端之一)Sequence Numbe
:針對每個<Topic, Partition>
都對應一個從0開始單調遞增的Sequence
,同時Broker
端會緩存這個seq num
- 判斷是否重復: 拿
<pid, seq num>
去Broker
里對應的隊列ProducerStateEntry.Queue
(默認隊列長度為 5)查詢是否存在
如果nextSeq == lastSeq + 1
,即服務端seq + 1 == 生產傳入seq
,則接收。
如果nextSeq == 0 && lastSeq == Int.MaxValue
,即剛初始化,也接收。
反之,要么重復,要么丟消息,均拒絕。
這種設計針對解決了兩個問題:
消息重復
: 場景Broker
保存消息后還沒發送ack
就宕機了,這時候Producer
就會重試,這就造成消息重復。消息亂序
: 避免場景,前一條消息發送失敗而其后一條發送成功,前一條消息重試后成功,造成的消息亂序。
那什么時候該使用冪等:
- 如果已經使用
acks=all
,使用冪等也可以。 - 如果已經使用
acks=0 或者 acks=1
,說明系統追求高性能,對數據一致性要求不高。不要使用冪等
使用例子
啟動消息者:可以用 Kafka
提供的腳本
# 舉個栗子:topic 需要自己去修改
$ cd ./kafka-2.7.1-src/bin
$ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
創建 topic : 1副本,2 分區
$ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2
# 查看
$ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
生產者代碼
public class KafkaProducerApplication {
private final Producer<String, String> producer;
final String outTopic;
public KafkaProducerApplication(final Producer<String, String> producer,
final String topic) {
this.producer = producer;
outTopic = topic;
}
public void produce(final String message) {
final String[] parts = message.split("-");
final String key, value;
if (parts.length > 1) {
key = parts[0];
value = parts[1];
} else {
key = null;
value = parts[0];
}
final ProducerRecord<String, String> producerRecord
= new ProducerRecord<>(outTopic, key, value);
producer.send(producerRecord,
(recordMetadata, e) -> {
if(e != null) {
e.printStackTrace();
} else {
System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
}
}
);
}
public void shutdown() {
producer.close();
}
public static void main(String[] args) {
final Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final String topic = "myTopic";
final Producer<String, String> producer = new KafkaProducer<>(props);
final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);
String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
try {
List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
linesToProduce.stream().filter(l -> !l.trim().isEmpty())
.forEach(producerApp::produce);
System.out.println("Offsets and timestamps committed in batch from " + filePath);
} catch (IOException e) {
System.err.printf("Error reading file %s due to %s %n", filePath, e);
} finally {
producerApp.shutdown();
}
}
}
啟動消費者:
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
1.4.2.2 Kafka 事務
使用 Kafka
事務解決冪等的弊端:單會話且單分區冪等。
事務使用示例:分為生產端 和 消費端
Properties props = new Properties();
props.put("enable.idempotence", ture); // 1. 設置冪等
props.put("acks", "all"); // 2. 當 enable.idempotence 為 true,這里默認為 all
props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待數
props.put("transactional.id", "my-transactional-id"); // 4. 設定事務 id
Producer<String, String> producer = new KafkaProducer<String, String>(props);
// 初始化事務
producer.initTransactions();
try{
// 開始事務
producer.beginTransaction();
// 發送數據
producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
// 數據發送及 Offset 發送均成功的情況下,提交事務
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// 數據發送或者 Offset 發送出現異常時,終止事務
producer.abortTransaction();
} finally {
// 關閉 Producer 和 Consumer
producer.close();
consumer.close();
}
這里消費端 Consumer
需要設置下配置:isolation.level
參數
read_uncommitted
: 這是默認值,表明Consumer
能夠讀取到Kafka
寫入的任何消息,不論事務型Producer
提交事務還是終止事務,其寫入的消息都可以讀取。如果用了事務型Producer
,那么對應的Consumer
就不要使用這個值。read_committed
: 表明Consumer
只會讀取事務型Producer
成功提交事務寫入的消息。當然了,它也能看到非事務型Producer
寫入的所有消息。
1.4.2.3 消費端冪等
只要消費端具備了冪等性,那么重復消費消息的問題也就解決了。
典型的方案是使用:消息表,來去重:
上述demo中,消費端拉取到一條消息后,開啟事務,將消息Id 新增到本地消息表中,同時更新訂單信息。
如果消息重復,則新增操作 insert
會異常,同時觸發事務回滾。