一:連接外部存儲系統的方式
flink是新一代的流式計算引擎,它需要從不同的第三方存儲引擎讀取數據,進行一定的處理,寫出到不同的存儲引擎,Connector就相當於是一個連接器,連接flink系統和外界存儲系統。
常用的連接方式有以下幾種:
flink內部預定義的source和sink
flink內部提供了一些Boundled connector
使用第三方Apache Bahir項目中的連接器
通過異步IO的方式
二:每種連接方式的簡單說明
2.1 預定義的source和sink
大致分為以下幾類:
基於文件:
source:readTextFile,readFile sink:writeAsText,writeAsCsv
基於socket
source:socketTextStream sink:writeToSocket
基於Collection,iterators
source:fromCollection,fromElement sink:print,printoToError
2.2 Boundled connector
flink內部提供了一些source和sink,例如:kafka的source和sink,es的sink。
常用的有以下幾個:
Apache Kafka(source/sink)
Apache Cassandra(sink)
ElasticSearch(sink)
Hdfs(sink)
RabbitMQ(source/sink)
以上connector是flink的一部分,但是不在flink的二進制發布包中,需要從網上下載jar包或者使用Maven依賴。
2.3 Apache Bahir中的連接器
Apache Bahir 最初是從 Apache Spark 中獨立出來項目提供,以提供不限於 Spark 相關的擴展/插件、連接器和其他可插入組件的實現。通過提供多樣化的流連接器(streaming connectors)和 SQL 數據源擴展分析平台的覆蓋面。如有需要寫到 flume、redis 的需求的話,可以使用該項目提供的 connector。
常用的有以下幾個:
Apache ActiveMQ(source/sink)
Apache Flume(sink)
Redis(sink)
akka(sink)
netty(source)
2.4 Async I/O
流計算中經常需要與外部存儲系統交互,比如需要關聯 MySQL 中的某個表。一般來說,如果用同步 I/O 的方式,會造成系統中出現大的等待時間,影響吞吐和延遲。為了解決這個問題,異步 I/O 可以並發處理多個請求,提高吞吐,減少延遲。
主要用於讀取外部數據庫,例如mysql,oracle,hbase等。
三:flink connect kafka
我們比較常用的就是使用flink讀取kafka,也就是消費kafka的數據,kafka是一個分布式的、分區的、多副本的、 支持高吞吐的、發布訂閱消息系統。生產環境環境中也經常會跟 kafka 進行一些數據的交換,比如利用 kafka consumer 讀取數據,然后進行一系列的處理之后,再將結果寫出到 kafka 中。這里會主要分兩個部分進行介紹,一是 Flink kafka Consumer,一個是 Flink kafka Producer。
3.1 反序列化數據
因為 kafka 中數據都是以二進制 byte 形式存儲的。讀到 Flink 系統中之后,需要將二進制數據轉化為具體的 java、scala 對象。具體需要實現一個 schema 類,定義如何序列化和反序列數據。反序列化時需要實現 DeserializationSchema 接口,並重寫 deserialize(byte[] message) 函數,如果是反序列化 kafka 中 kv 的數據時,需要實現 KeyedDeserializationSchema 接口,並重寫 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函數。
3.2 消費起始位置設置
-
setStartFromGroupOffsets,也是默認的策略,從 group offset 位置讀取數據,group offset 指的是 kafka broker 端記錄的某個 group 的最后一次的消費位置。但是 kafka broker 端沒有該 group 信息,會根據 kafka 的參數”auto.offset.reset”的設置來決定從哪個位置開始消費。
-
setStartFromEarliest,從 kafka 最早的位置開始讀取。
-
setStartFromLatest,從 kafka 最新的位置開始讀取。
-
setStartFromTimestamp(long),從時間戳大於或等於指定時間戳的位置開始讀取。Kafka 時戳,是指 kafka 為每條消息增加另一個時戳。該時戳可以表示消息在 proudcer 端生成時的時間、或進入到 kafka broker 時的時間。
-
setStartFromSpecificOffsets,從指定分區的 offset 位置開始讀取,如指定的 offsets 中不存某個分區,該分區從 group offset 位置開始讀取。此時需要用戶給定一個具體的分區、offset 的集合。
3.3 topic和partition的動態發現
實際的生產環境中可能有這樣一些需求,比如場景一,有一個 Flink 作業需要將五份數據聚合到一起,五份數據對應五個 kafka topic,隨着業務增長,新增一類數據,同時新增了一個 kafka topic,如何在不重啟作業的情況下作業自動感知新的 topic。場景二,作業從一個固定的 kafka topic 讀數據,開始該 topic 有 10 個 partition,但隨着業務的增長數據量變大,需要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業情況下動態感知新擴容的 partition?
3.3 commit offset方式
Flink kafka consumer commit offset 方式需要區分是否開啟了 checkpoint。
3.4 flink kafka producer
使用 FlinkKafkaProducer 往 kafka 中寫數據時,如果不單獨設置 partition 策略,會默認使用 FlinkFixedPartitioner,該 partitioner 分區的方式是 task 所在的並發 id 對 topic 總 partition 數取余:parallelInstanceId % partitions.length。
-
此時如果 sink 為 4,paritition 為 1,則 4 個 task 往同一個 partition 中寫數據。但當 sink task < partition 個數時會有部分 partition 沒有數據寫入,例如 sink task 為2,partition 總數為 4,則后面兩個 partition 將沒有數據寫入。
-
如果構建 FlinkKafkaProducer 時,partition 設置為 null,此時會使用 kafka producer 默認分區方式,非 key 寫入的情況下,使用 round-robin 的方式進行分區,每個 task 都會輪循的寫下游的所有 partition。該方式下游的 partition 數據會比較均衡,但是缺點是 partition 個數過多的情況下需要維持過多的網絡連接,即每個 task 都會維持跟所有 partition 所在 broker 的連接。