理解flink connector


一:連接外部存儲系統的方式

flink是新一代的流式計算引擎,它需要從不同的第三方存儲引擎讀取數據,進行一定的處理,寫出到不同的存儲引擎,Connector就相當於是一個連接器,連接flink系統和外界存儲系統。

常用的連接方式有以下幾種:

  flink內部預定義的source和sink

  flink內部提供了一些Boundled connector

  使用第三方Apache Bahir項目中的連接器

  通過異步IO的方式

二:每種連接方式的簡單說明

2.1 預定義的source和sink

大致分為以下幾類:

基於文件:

  source:readTextFile,readFile        sink:writeAsText,writeAsCsv

基於socket

  source:socketTextStream          sink:writeToSocket

基於Collection,iterators

  source:fromCollection,fromElement     sink:print,printoToError

2.2 Boundled connector

  flink內部提供了一些source和sink,例如:kafka的source和sink,es的sink。

  常用的有以下幾個:

    Apache Kafka(source/sink)

    Apache Cassandra(sink)

    ElasticSearch(sink)

    Hdfs(sink)

    RabbitMQ(source/sink)

  以上connector是flink的一部分,但是不在flink的二進制發布包中,需要從網上下載jar包或者使用Maven依賴。

2.3 Apache Bahir中的連接器

  Apache Bahir 最初是從 Apache Spark 中獨立出來項目提供,以提供不限於 Spark 相關的擴展/插件、連接器和其他可插入組件的實現。通過提供多樣化的流連接器(streaming connectors)和 SQL 數據源擴展分析平台的覆蓋面。如有需要寫到 flume、redis 的需求的話,可以使用該項目提供的 connector。

  常用的有以下幾個:

  Apache ActiveMQ(source/sink)

  Apache Flume(sink)

  Redis(sink)

  akka(sink)

  netty(source)

2.4 Async I/O

  流計算中經常需要與外部存儲系統交互,比如需要關聯 MySQL 中的某個表。一般來說,如果用同步 I/O 的方式,會造成系統中出現大的等待時間,影響吞吐和延遲。為了解決這個問題,異步 I/O 可以並發處理多個請求,提高吞吐,減少延遲。

  主要用於讀取外部數據庫,例如mysql,oracle,hbase等。

三:flink connect kafka

  我們比較常用的就是使用flink讀取kafka,也就是消費kafka的數據,kafka是一個分布式的、分區的、多副本的、 支持高吞吐的、發布訂閱消息系統。生產環境環境中也經常會跟 kafka 進行一些數據的交換,比如利用 kafka consumer 讀取數據,然后進行一系列的處理之后,再將結果寫出到 kafka 中。這里會主要分兩個部分進行介紹,一是 Flink kafka Consumer,一個是 Flink kafka Producer。

3.1 反序列化數據

  因為 kafka 中數據都是以二進制 byte 形式存儲的。讀到 Flink 系統中之后,需要將二進制數據轉化為具體的 java、scala 對象。具體需要實現一個 schema 類,定義如何序列化和反序列數據。反序列化時需要實現 DeserializationSchema 接口,並重寫 deserialize(byte[] message) 函數,如果是反序列化 kafka 中 kv 的數據時,需要實現 KeyedDeserializationSchema 接口,並重寫 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) 函數。

   另外 Flink 中也提供了一些常用的序列化反序列化的 schema 類。例如,SimpleStringSchema,按字符串方式進行序列化、反序列化。TypeInformationSerializationSchema,它可根據 Flink 的 TypeInformation 信息來推斷出需要選擇的 schema。JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,並返回 ObjectNode,可以使用 .get(“property”) 方法來訪問相應字段。

3.2 消費起始位置設置

  • setStartFromGroupOffsets,也是默認的策略,從 group offset 位置讀取數據,group offset 指的是 kafka broker 端記錄的某個 group 的最后一次的消費位置。但是 kafka broker 端沒有該 group 信息,會根據 kafka 的參數”auto.offset.reset”的設置來決定從哪個位置開始消費。

  • setStartFromEarliest,從 kafka 最早的位置開始讀取。

  • setStartFromLatest,從 kafka 最新的位置開始讀取。

  • setStartFromTimestamp(long),從時間戳大於或等於指定時間戳的位置開始讀取。Kafka 時戳,是指 kafka 為每條消息增加另一個時戳。該時戳可以表示消息在 proudcer 端生成時的時間、或進入到 kafka broker 時的時間。

  • setStartFromSpecificOffsets,從指定分區的 offset 位置開始讀取,如指定的 offsets 中不存某個分區,該分區從 group offset 位置開始讀取。此時需要用戶給定一個具體的分區、offset 的集合。

3.3 topic和partition的動態發現

   實際的生產環境中可能有這樣一些需求,比如場景一,有一個 Flink 作業需要將五份數據聚合到一起,五份數據對應五個 kafka topic,隨着業務增長,新增一類數據,同時新增了一個 kafka topic,如何在不重啟作業的情況下作業自動感知新的 topic。場景二,作業從一個固定的 kafka topic 讀數據,開始該 topic 有 10 個 partition,但隨着業務的增長數據量變大,需要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業情況下動態感知新擴容的 partition?

    針對上面的兩種場景,首先需要在構建 FlinkKafkaConsumer 時的 properties 中設置 flink.partition-discovery.interval-millis 參數為非負值,表示開啟動態發現的開關,以及設置的時間間隔。此時 FlinkKafkaConsumer 內部會啟動一個單獨的線程定期去 kafka 獲取最新的 meta 信息。針對場景一,還需在構建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表達式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。針對場景二,設置前面的動態發現參數,在定期獲取 kafka 最新 meta 信息時會匹配新的 partition。為了保證數據的正確性,新發現的 partition 從最早的位置開始讀取。

3.3 commit offset方式

  Flink kafka consumer commit offset 方式需要區分是否開啟了 checkpoint。

    如果 checkpoint 關閉,commit offset 要依賴於 kafka 客戶端的 auto commit。需設置 enable.auto.commit,auto.commit.interval.ms 參數到 consumer properties,就會按固定的時間間隔定期 auto commit offset 到 kafka。
    如果開啟 checkpoint,這個時候作業消費的 offset 是 Flink 在 state 中自己管理和容錯。此時提交 offset 到 kafka,一般都是作為外部進度的監控,想實時知道作業消費的位置和 lag 情況。此時需要 setCommitOffsetsOnCheckpoints 為 true 來設置當 checkpoint 成功時提交 offset 到 kafka。此時 commit offset 的間隔就取決於 checkpoint 的間隔,所以此時從 kafka 一側看到的 lag 可能並非完全實時,如果 checkpoint 間隔比較長 lag 曲線可能會是一個鋸齒狀。

3.4 flink kafka producer

   使用 FlinkKafkaProducer 往 kafka 中寫數據時,如果不單獨設置 partition 策略,會默認使用 FlinkFixedPartitioner,該 partitioner 分區的方式是 task 所在的並發 id 對 topic 總 partition 數取余:parallelInstanceId % partitions.length。

  • 此時如果 sink 為 4,paritition 為 1,則 4 個 task 往同一個 partition 中寫數據。但當 sink task < partition 個數時會有部分 partition 沒有數據寫入,例如 sink task 為2,partition 總數為 4,則后面兩個 partition 將沒有數據寫入。

  • 如果構建 FlinkKafkaProducer 時,partition 設置為 null,此時會使用 kafka producer 默認分區方式,非 key 寫入的情況下,使用 round-robin 的方式進行分區,每個 task 都會輪循的寫下游的所有 partition。該方式下游的 partition 數據會比較均衡,但是缺點是 partition 個數過多的情況下需要維持過多的網絡連接,即每個 task 都會維持跟所有 partition 所在 broker 的連接。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM