- Flink 里面預定義了一些 source 和 sink。
- Flink 內部也提供了一些 Boundled connectors。
- 通過異步 IO 方式。
- 自定義 Source & Sink。
一、Flink 預定義 & 自定義 Source 和 Sink
https://www.cnblogs.com/xiexiandong/p/12770187.html
二、Flink 綁定的connectors
需要專門引入對應的依賴,主要是實現外部數據進出Flink。
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
- Apache Bahir
- Apache ActiveMQ (source/sink)
- Apache Flume (sink)
- Redis (sink)
- Akka (sink)
- Netty (source)
三、Flink kafka Connector
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.0</version>
</dependency>
3.1、Flink-Kafka-Consumer

def main(args: Array[String]): Unit = {
// kafka 配置
val kafkaConsumerProps: Properties = new Properties()
kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
// 獲取 kafkaConsumer
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)
val logPath: String = "/tmp/logs/flink_log"
val conf: Configuration = new Configuration()
// 開啟spark-webui
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//配置webui的日志文件
conf.setString("web.log.path", logPath)
// 配置 taskManager 的日志文件,否則打印日志到控制台
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
// 獲取本地運行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
// 添加數據源
val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
sourceDataStream.print()
// 提交運行
env.execute("FlinkKafkaExample")
}
創建 FlinkKafkaConsumer:
三個構造參數:
- 要消費的topic(topic name / topic list/正表達式)
- DeserializationSchema / KeyedDeserializationSchema(反序列化Kafka中的數據))
- Kafka consumer的屬性,其中三個屬性必須提供:
- bootstrap.servers(逗號分隔的Kafka broker列表)
- group.id(consumer group id)
1、反序列化數據
- 因為 kafka 中數據都是以二進制 byte 形式存儲的。讀到 Flink 系統中之后,需要將二進制數據轉化為具體的 java、scala 對象。具體需要實現一個 schema 類,定義如何序列化和反序列數據。
- 反序列化時需要實現 DeserializationSchema 接口,並重寫 deserialize(byte[] message) 函數,如果是反序列化 kafka 中 kv 的數據時,需要實現 KeyedDeserializationSchema 接口。
-
// kafka 數據類型樣例類 case class KafkaEvent(message: String, eventTime: Long) //實現 DeserializationSchema 用來反序列化kafka中的數據,kafka 中的數據按照二進制存儲,系列化為 java/scala對象 //重寫 deserialize(message: Array[Byte]) 方法 //如果 kafka 中的數據是按照 KV 格式存儲的,需要實現 KeyedDeserializationSchema class KafkaEventDeserializationSchema extends DeserializationSchema[KafkaEvent] { // 標記為無界流 override def isEndOfStream(nextElement: KafkaEvent): Boolean = false override def deserialize(message: Array[Byte]): KafkaEvent = { KafkaEvent(new String(message), System.currentTimeMillis()) } // 得到自定義序列化類型 override def getProducedType: TypeInformation[KafkaEvent] = TypeInformation.of(new TypeHint[KafkaEvent] {}) }
-
- 另外 Flink 中也提供了一些常用的序列化反序列化的 schema 類。
- 例如,SimpleStringSchema,按字符串方式進行序列化、反序列化。
- TypeInformationSerializationSchema,它可根據 Flink 的 TypeInformation 信息來推斷出需要選擇的 schema。
- JsonDeserializationSchema 使用 jackson 反序列化 json 格式消息,並返回 ObjectNode,可以使用 .get(“property”) 方法來訪問相應字段。
2、消費起始位置設置
在構造好的 FlinkKafkaConsumer 類后面調用如下相應函數,設置合適的起始位置。
- setStartFromGroupOffsets,也是默認的策略,從 group offset 位置讀取數據,group offset 指的是 kafka broker 端記錄的某個 group 的最后一次的消費位置。但是 kafka broker 端沒有該 group 信息,會根據 kafka 的參數”auto.offset.reset”的設置來決定從哪個位置開始消費。
- setStartFromEarliest,從 kafka 最早的位置開始讀取。
- setStartFromLatest,從 kafka 最新的位置開始讀取。
- setStartFromTimestamp(long),從時間戳大於或等於指定時間戳的位置開始讀取。Kafka 時戳,是指 kafka 為每條消息增加另一個時戳。該時戳可以表示消息在 proudcer 端生成時的時間、或進入到 kafka broker 時的時間。
- setStartFromSpecificOffsets,從指定分區的 offset 位置開始讀取,如指定的 offsets 中不存某個分區,該分區從 group offset 位置開始讀取。此時需要用戶給定一個具體的分區、offset 的集合。
- 需要注意的是,因為 Flink 框架有容錯機制,如果作業故障,如果作業開啟 checkpoint,會從上一次 checkpoint 狀態開始恢復。或者在停止作業的時候主動做 savepoint,啟動作業時從 savepoint 開始恢復。這兩種情況下恢復作業時,作業消費起始位置是從之前保存的狀態中恢復,與上面提到跟 kafka 這些單獨的配置無關。
情形 | 誰決定起始位置 |
---|---|
第一次啟動, 無savepoint(常規情況) | 由消費模式決定 |
通過savepoint啟動(應用升級,比如加 大並行度) | 由savepoint記錄的offset決定 |
有checkpoint,失敗后,job恢復的情況 | 由checkpoint的snapshoot中記錄的offset決定 |
無checkpoint,失敗后,job恢復的情況 | 由消費模式決定 |
3、topic 和 partition 動態發現
實際的生產環境中可能有這樣一些需求,比如:
- 場景一,有一個 Flink 作業需要將五份數據聚合到一起,五份數據對應五個 kafka topic,隨着業務增長,新增一類數據,同時新增了一個 kafka topic,如何在不重啟作業的情況下作業自動感知新的 topic。
- 場景二,作業從一個固定的 kafka topic 讀數據,開始該 topic 有 10 個 partition,但隨着業務的增長數據量變大,需要對 kafka partition 個數進行擴容,由 10 個擴容到 20。該情況下如何在不重啟作業情況下動態感知新擴容的 partition。
針對上面的兩種場景,首先需要在構建 FlinkKafkaConsumer 時的 properties 中設置 flink.partition-discovery.interval-millis 參數為非負值,表示開啟動態發現的開關,以及設置的時間間隔。此時 FlinkKafkaConsumer 內部會啟動一個單獨的線程定期去 kafka 獲取最新的 meta 信息。
kafkaConsumerProps.setProperty("flink.partition-discovery.interval-millis", "30000")
val topicPattern: Pattern = java.util.regex.Pattern.compile("test-topic-[0-9]")
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topicPattern, new SimpleStringSchema(), kafkaConsumerProps)
- 針對場景一,還需在構建 FlinkKafkaConsumer 時,topic 的描述可以傳一個正則表達式描述的 pattern。每次獲取最新 kafka meta 時獲取正則匹配的最新 topic 列表。
- 針對場景二,設置前面的動態發現參數,在定期獲取 kafka 最新 meta 信息時會匹配新的 partition。為了保證數據的正確性,新發現的 partition 從最早的位置開始讀取。
4、commit offset 方式
Flink kafka consumer commit offset 方式需要區分是否開啟了 checkpoint。
- 如果 checkpoint 關閉,commit offset 要依賴於 kafka 客戶端的 auto commit。需設置 enable.auto.commit,auto.commit.interval.ms 參數到 consumer properties,就會按固定的時間間隔定期 auto commit offset 到 kafka。
enable.auto.commit 的默認值是 true;就是默認采用自動提交的機制。
auto.commit.interval.ms 的默認值是 5000,單位是毫秒。
- 如果開啟 checkpoint,這個時候作業消費的 offset 是 Flink 在 state 中自己管理和容錯。此時提交 offset 到 kafka,一般都是作為外部進度的監控,想實時知道作業消費的位置和 lag 情況。此時需要 setCommitOffsetsOnCheckpoints 為 true 來設置當 checkpoint 成功時提交 offset 到 kafka。此時 commit offset 的間隔就取決於 checkpoint 的間隔,所以此時從 kafka 一側看到的 lag 可能並非完全實時,如果 checkpoint 間隔比較長 lag 曲線可能會是一個鋸齒狀。
3.2、Flink-Kafka-Producer
def main(args: Array[String]): Unit = {
// 獲取 kafkaConsumer
val kafkaConsumerProps: Properties = new Properties()
kafkaConsumerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
kafkaConsumerProps.setProperty("group.id", KafkaConfig.CONSUMER_GROUP)
val flinkKafkaConsumer: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](KafkaConfig.SOURCE_TOPIC, new SimpleStringSchema(), kafkaConsumerProps)
// 獲取 KafkaProducer
var kafkaProducerProps: Properties = new Properties()
kafkaProducerProps.setProperty("bootstrap.servers", KafkaConfig.BOOTSTRAP_SERVERS)
val flinkKafkaProducer: FlinkKafkaProducer011[String] = new FlinkKafkaProducer011[String](KafkaConfig.PRODUCER_TOPIC, new SimpleStringSchema(), kafkaProducerProps)
val logPath: String = "/tmp/logs/flink_log"
val conf: Configuration = new Configuration()
// 開啟spark-webui
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
//配置webui的日志文件
conf.setString("web.log.path", logPath)
// 配置 taskManager 的日志文件,否則打印日志到控制台
conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath)
// 獲取本地運行環境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
val sourceDataStream: DataStream[String] = env.addSource(flinkKafkaConsumer)
// 結果輸出
sourceDataStream.print()
// 結果存入 kafka
sourceDataStream.addSink(flinkKafkaProducer)
// 開啟任務
env.execute("FlinkKafkaProducerExample")
}

1、Producer 分區
- 使用 FlinkKafkaProducer 往 kafka 中寫數據時,如果不單獨設置 partition 策略,會默認使用 FlinkFixedPartitioner(此時無論是帶 key 的數據,還是不帶 key 都采用),該 partitioner 分區的方式是 task 所在的並發 id 對 topic 總 partition 數取余:parallelInstanceId % partitions.length。
- 此時如果 sink 為 4,paritition 為 1,則 4 個 task 往同一個 partition 中寫數據。
- 但當 sink task < partition 個數時會有部分 partition 沒有數據寫入,例如 sink task 為2,partition 總數為 4,則后面兩個 partition 將沒有數據寫入。
-
- 如果構建 FlinkKafkaProducer 時,partition 設置為 null,此時會使用 kafka producer 默認分區方式
- 非 key 寫入的情況下,使用 round-robin 的方式進行分區,每個 task 都會輪循的寫下游的所有 partition。該方式下游的 partition 數據會比較均衡,但是缺點是 partition 個數過多的情況下需要維持過多的網絡連接,即每個 task 都會維持跟所有 partition 所在 broker 的連接。
- 帶 key 的數據會根據 key,相同 key 數據分區的相同的 partition,如果 key 為 null,再輪詢寫。
-
2、容錯
Flink kafka 09、010 版本下:
- 能達到 at-least-once 語義(數據有可能重復),通過 setLogFailuresOnly 為 false,setFlushOnCheckpoint 為 true 來控制。
- setLogFailuresOnly,默認為 false,是控制寫 kafka 失敗時,是否只打印失敗的 log 不拋異常讓作業停止。
- setFlushOnCheckpoint,默認為 true,是控制是否在 checkpoint 時 fluse 數據到 kafka,保證數據已經寫到 kafka。否則數據有可能還緩存在 kafka 客戶端的 buffer 中,並沒有真正寫出到 kafka,此時作業掛掉數據即丟失,不能做到至少一次的語義。
Flink kafka 011 版本下:
- 通過兩階段提交的 sink 結合 kafka 事務的功能,可以保證端到端精准一次。
- 詳細原理參考:https://www.ververica.com/blog/end-to-end-exactly-once-processing-apache-flink-apache-kafka