前言
之前文章 《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch 寫了如何將 Kafka 中的數據存儲到 ElasticSearch 中,里面其實就已經用到了 Flink 自帶的 Kafka source connector(FlinkKafkaConsumer)。存入到 ES 只是其中一種情況,那么如果我們有多個地方需要這份通過 Flink 轉換后的數據,是不是又要我們繼續寫個 sink 的插件呢?確實,所以 Flink 里面就默認支持了不少 sink,比如也支持 Kafka sink connector(FlinkKafkaProducer),那么這篇文章我們就講講如何將數據寫入到 Kafka。
准備
添加依賴
Flink 里面支持 Kafka 0.8、0.9、0.10、0.11 ,以后有時間可以分析下源碼的實現。
這里我們需要安裝下 Kafka,請對應添加對應的 Flink Kafka connector 依賴的版本,這里我們使用的是 0.11 版本:
1 |
<dependency> |
Kafka 安裝
這里就不寫這塊內容了,可以參考我以前的文章 Kafka 安裝及快速入門。
這里我們演示把其他 Kafka 集群中 topic 數據原樣寫入到自己本地起的 Kafka 中去。
配置文件
1 |
kafka.brokers=xxx:9092,xxx:9092,xxx:9092 |
目前我們先看下本地 Kafka 是否有這個 metric-test topic 呢?需要執行下這個命令:
1 |
bin/kafka-topics.sh --list --zookeeper localhost:2181 |
可以看到本地的 Kafka 是沒有任何 topic 的,如果等下我們的程序運行起來后,再次執行這個命令出現 metric-test topic,那么證明我的程序確實起作用了,已經將其他集群的 Kafka 數據寫入到本地 Kafka 了。
程序代碼
Main.java
1 |
public class Main { |
運行結果
啟動程序,查看運行結果,不段執行上面命令,查看是否有新的 topic 出來:
執行命令可以查看該 topic 的信息:
1 |
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic metric-test |
分析
上面代碼我們使用 Flink Kafka Producer 只傳了三個參數:brokerList、topicId、serializationSchema(序列化)
其實也可以傳入多個參數進去,現在有的參數用的是默認參數,因為這個內容比較多,后面可以抽出一篇文章單獨來講。
總結
本篇文章寫了 Flink 讀取其他 Kafka 集群的數據,然后寫入到本地的 Kafka 上。我在 Flink 這層沒做什么數據轉換,只是原樣的將數據轉發了下,如果你們有什么其他的需求,是可以在 Flink 這層將數據進行各種轉換操作,比如這篇文章中的一些轉換:《從0到1學習Flink》—— Flink Data transformation(轉換),然后將轉換后的數據發到 Kafka 上去。
打開支付寶掃一掃,即可進行掃碼打賞哦