Flink筆記(二十三):Flink 整合 Kafka (實現 Exactly-Once)


原文鏈接:https://blog.csdn.net/lzb348110175/article/details/104363792

本文開頭附:Flink 學習路線系列 ^ _ ^

Flink 整合 Kafka 基本步驟,請參考:Flink 基礎整合 Kafka。本文僅用來介紹 Flink 整合 Kafka 實現 Exactly-Once。

1.什么是Exactly-Once
       恰好處理一次的意思。不管在處理的時候是否有異常發生,計算的結果都一樣。即使在發現機器或者軟件故障時,都不會出現數據丟失以及重復處理的情況。(就是每條數據只會被處理一次)

       Flink 中哪些 Source 、Sink支持 Exactly-Once 呢,Flink官方文檔(鏈接)為我們做了描述。如下圖所示:

Source:

Sink:

       我們發現很多都是 at least once(至少一次),我們可以基於冪等操作(冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同),只要是偏移量記錄正確,如果你記錄了2次數據,可以將原來的數據覆蓋掉,以此來實現 exactly-once 操作

2.場景(Demo)
       Flink 實時讀取 Kafka 中的數據,並保證 Exaclty-Once。即:當任務出現異常,觸發重啟策略任務被重啟后,對已經消費過的消息不進行重復消費。

2.1 代碼
/**
* TODO Flink整合 Kafka,實現 Exactly-Once
*
* @author liuzebiao
* @Date 2020-2-15 9:53
*/
public class RestartStrategyDemo {

public static void main(String[] args) throws Exception {

/**1.創建流運行環境**/
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

/**請注意此處:**/
//1.只有開啟了CheckPointing,才會有重啟策略
env.enableCheckpointing(5000);
//2.默認的重啟策略是:固定延遲無限重啟
//此處設置重啟策略為:出現異常重啟3次,隔5秒一次
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(2)));
//系統異常退出或人為 Cancel 掉,不刪除checkpoint數據
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//設置Checkpoint模式(與Kafka整合,一定要設置Checkpoint模式為Exactly_Once)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);


/**2.Source:讀取 Kafka 中的消息**/
//Kafka props
Properties properties = new Properties();
//指定Kafka的Broker地址
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.204.210:9092,192.168.204.211:9092,192.168.204.212:9092");
//指定組ID
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "testGroup");
//如果沒有記錄偏移量,第一次從最開始消費
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Kafka的消費者,不自動提交偏移量
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer("testTopic", new SimpleStringSchema(), properties);

//Checkpoint成功后,還要向Kafka特殊的topic中寫偏移量(此處不建議改為false )
//設置為false后,則不會向特殊topic中寫偏移量。
//kafkaSource.setCommitOffsetsOnCheckpoints(false);
//通過addSource()方式,創建 Kafka DataStream
DataStreamSource<String> kafkaDataStream = env.addSource(kafkaSource);

/**3.Transformation過程**/
SingleOutputStreamOperator<Tuple2<String, Integer>> streamOperator = kafkaDataStream.map(str -> Tuple2.of(str, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));

/**此部分讀取Socket數據,只是用來人為出現異常,觸發重啟策略。驗證重啟后是否會再次去讀之前已讀過的數據(Exactly-Once)*/
/*************** start **************/
DataStreamSource<String> socketTextStream = env.socketTextStream("localhost", 8888);

SingleOutputStreamOperator<String> streamOperator1 = socketTextStream.map(new MapFunction<String, String>() {
@Override
public String map(String word) throws Exception {
if ("error".equals(word)) {
throw new RuntimeException("Throw Exception");
}
return word;
}
});
/************* end **************/

//對元組 Tuple2 分組求和
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = streamOperator.keyBy(0).sum(1);

/**4.Sink過程**/
sum.print();

/**5.任務執行**/
env.execute("RestartStrategyDemo");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
2.2 測試
2.2.1 Kafka創建Topic
創建 Topic 命令如下:
      bin/kafka-topics.sh --create --zookeeper 192.168.204.210:2181,192.168.204.211:2181,192.168.204.212:2181 --replication-factor 1 --partitions 3 --topic testTopic

Topic創建成功:


2.2.2 寫入數據到 Kafka
       我們使用命令的方式,向Kafka集群中的某一台寫入消息(本文向192.168.204.210節點 Kafka 寫入數據)。命令如下:
      bin/kafka-console-producer.sh --broker-list 192.168.204.210:9092 --topic testTopic

寫入部分數據:


2.2.2 測試動圖


2.2.3 測試結果解析
當輸入 3個AAA 、1個BBB,返回計算結果(AAA,3)、(BBB,1);
此時在 socket 端口號 8888 中輸入“error”,人為原因導致實時任務異常。通過動圖可以看到任務已經出發重啟策略開始重啟;
在任務重啟前,動圖中我已經清空了控制台日志。當任務重啟完成,我們搜索 AAA,發現並沒有搜到內容,說明之前已經消費的消息沒有被重復消費,已經實現了 Exactly-Once;
當再次 1個AAA 、1個BBB時,任務接着異常前的結果繼續累加,返回計算結果(AAA,4)、(BBB,2)。
2.3 Flink整合Kafka發現的其他問題
       1.kafkaSource.setCommitOffsetsOnCheckpoints(boolean);方法,是用來干什么的?

      官方文檔有介紹,你可參考:Flink官方文檔。代碼中不建議將kafkaSource.setCommitOffsetsOnCheckpoints(boolean);方法設置為 false。此處,我們對該方法來做一個書面介紹:

      ①你如果禁用CheckPointing,則Flink Kafka Consumer依賴於內部使用的Kafka客戶端的自動定期偏移量提交功能。該偏移量會被記錄在 Kafka 中的 _consumer_offsets 這個特殊記錄偏移量的 Topic 中。

      ②你如果啟用CheckPointing,偏移量則會被記錄在 StateBackend 中。該方法kafkaSource.setCommitOffsetsOnCheckpoints(boolean);設置為 ture 時,偏移量會在 StateBackend 和 Kafka 中的 _consumer_offsets Topic 中都會記錄一份;設置為 false 時,偏移量只會在 StateBackend 中的 存儲一份。

附:查看 Kafka 中 _consumer_offsets Topic 的偏移量命令:

/usr/local/env/kafka/bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server 192.168.204.210:9092,192.168.204.211:9092,192.168.204.212:9092 --formatter “kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter” --consumer.config /usr/local/env/kafka/config/consumer.properties --from-beginning | grep 你的Topic名稱


執行代碼如下圖所示:

       2. 動圖中,Flink任務重啟時,並未指定 savePoint 路徑,為什么還能夠恢復數據?
     ①如果任務重啟時,指定savePoint路徑(Checkpoint路徑),它則會從指定的savePoint路徑恢復數據

     ②如果不指定 savePoint 路徑,任務會從 Kafka 中的_consumer_offsets這個 topic 中,查看有沒有相同group.id 的 topic 的偏移量,如果有的話就會接着之前寫入的偏移量來讀。

     這就是Flink任務重啟時,並未指定 savePoint 路徑,還能夠恢復數據的原因↑↑↑

Flink 整合 Kafka 實現 Exactly-Once,介紹到此為止


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM