本文主要是想聊聊flink與kafka結合。當然,單純的介紹flink與kafka的結合呢,比較單調,也沒有可對比性,所以的准備順便幫大家簡單回顧一下Spark Streaming與kafka的結合。
看懂本文的前提是首先要熟悉kafka,然后了解spark Streaming的運行原理及與kafka結合的兩種形式,然后了解flink實時流的原理及與kafka結合的方式。
kafka
kafka作為一個消息隊列,在企業中主要用於緩存數據,當然,也有人用kafka做存儲系統,比如存最近七天的數據。kafka的基本概念請參考:kafka入門介紹
更多kafka的文章請關注浪尖公眾號,閱讀。
首先,我們先看下圖,這是一張生產消息到kafka,從kafka消費消息的結構圖。
當然, 這張圖很簡單,拿這張圖的目的是從中可以得到的跟本節文章有關的消息,有以下兩個:
1,kafka中的消息不是kafka主動去拉去的,而必須有生產者往kafka寫消息。
2,kafka是不會主動往消費者發布消息的,而必須有消費者主動從kafka拉取消息。
spark Streaming結合kafka
Spark Streaming現在在企業中流處理也是用的比較廣泛,但是大家都知道其不是真正的實時處理,而是微批處理。
在spark 1.3以前,SPark Streaming與kafka的結合是基於Receiver方式,顧名思義,我們要啟動1+個Receiver去從kafka里面拉去數據,拉去的數據會每隔200ms生成一個block,然后在job生成的時候,取出該job處理時間范圍內所有的block,生成blockrdd,然后進入Spark core處理。
自Spark1.3以后,增加了direct Stream API,這種呢,主要特點是去掉了Receiver,在生成job,去取rdd的時候,計算每個partition要取數據的offset范圍,然后生成一個kafkardd,該rdd特點是與kafka的分區是一一對應的。
有上面的特點可以看出,Spark Streaming是要生成rdd,然后進行處理的,rdd數據集我們可以理解為靜態的,然每個批次,都會生成一個rdd,該過程就體現了批處理的特性,由於數據集時間段小,數據小,所以又稱微批處理,那么就說明不是真正的實時處理。
flink結合kafka
大家都知道flink是真正的實時處理,他是基於事件觸發的機制進行處理,而不是像spark Streaming每隔若干時間段,生成微批數據,然后進行處理。那么這個時候就有了個疑問,在前面kafka小節中,我們說到了kafka是不會主動往消費者里面吐數據的,需要消費者主動去拉去數據來處理。那么flink是如何做到基於事件實時處理kafka的數據呢?在這里浪尖帶着大家看一下源碼,flink1.5.0為例。
1,flink與kafka結合的demo。
val env=StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer=new FlinkKafkaConsumer010(
?params.getRequired("input-topic"),
?new SimpleStringSchema,
?params.getProperties)
val messageStream=env
?.addSource(kafkaConsumer)
?.map(in=> prefix + in)
// create a Kafka producer for Kafka 0.10.x
val kafkaProducer=new FlinkKafkaProducer010(
?params.getRequired("output-topic"),
?new SimpleStringSchema,
?params.getProperties)
// write data into Kafka
messageStream.addSink(kafkaProducer)
env.execute("Kafka 0.10 Example")
從上面的demo可以看出,數據源的入口就是FlinkKafkaConsumer010,當然這里面只是簡單的構建了一個對象,並進行了一些配置的初始化,真正source的啟動是在其run方法中run方法的調用過程在這里不講解,后面會出教程講解。
首先看一下類的繼承關系
public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T>
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T>
其中,run方法就在FlinkKafkaConsumerBase里,當然其中open方法里面對kafka相關內容進行里初始化。
從輸入到計算到輸出完整的計算鏈條的調用過程,后面浪尖會出文章介紹。在這里只關心flink如何從主動消費數據,然后變成事件處理機制的過程。
由於其FlinkKafkaConsumerBase的run比較長,我這里只看重要的部分,首先是會創建Kafka09Fetcher。
KAFKA_CONSUMER_METRICS_GROUP),
? ? ?useMetrics);
接着下面有段神器,
final AtomicReference<Exception> discoveryLoopErrorRef=new AtomicReference<>();
this.discoveryLoopThread =new Thread(new Runnable() {
? @Override
? public void run() {
? ? ?try {
? ? ? ? // --------------------- partition discovery loop ---------------------
? ? ? ? List<KafkaTopicPartition> discoveredPartitions;
? ? ? ? // throughout the loop, we always eagerly check if we are still running before
? ? ? ? // performing the next operation, so that we can escape the loop as soon as possible
? ? ? ? while (running) {
? ? ? ? ? ?if (LOG.isDebugEnabled()) {
? ? ? ? ? ? ? LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
? ? ? ? ? ?}
? ? ? ? ? ?try {
? ? ? ? ? ? ? discoveredPartitions=partitionDiscoverer.discoverPartitions();
? ? ? ? ? ?} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
? ? ? ? ? ? ? // the partition discoverer may have been closed or woken up before or during the discovery;
? ? ? ? ? ? ? // this would only happen if the consumer was canceled; simply escape the loop
? ? ? ? ? ? ? break;
? ? ? ? ? ?}
? ? ? ? ? ?// no need to add the discovered partitions if we were closed during the meantime
? ? ? ? ? ?if (running && !discoveredPartitions.isEmpty()) {
? ? ? ? ? ? ? kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
? ? ? ? ? ?}
? ? ? ? ? ?// do not waste any time sleeping if we're not running anymore
? ? ? ? ? ?if (running && discoveryIntervalMillis !=0) {
? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ?Thread.sleep(discoveryIntervalMillis);
? ? ? ? ? ? ? } catch (InterruptedException iex) {
? ? ? ? ? ? ? ? ?// may be interrupted if the consumer was canceled midway; simply escape the loop
? ? ? ? ? ? ? ? ?break;
? ? ? ? ? ? ? }
? ? ? ? ? ?}
? ? ? ? }
? ? ?} catch (Exception e) {
? ? ? ? discoveryLoopErrorRef.set(e);
? ? ?} finally {
? ? ? ? // calling cancel will also let the fetcher loop escape
? ? ? ? // (if not running, cancel() was already called)
? ? ? ? if (running) {
? ? ? ? ? ?cancel();
? ? ? ? }
? ? ?}
? }
}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());
它定義了一個線程池對象,去動態發現kafka新增的topic(支持正則形式指定消費的topic),或者動態發現kafka新增的分區。
接着肯定是啟動動態發現分區或者topic線程,並且
kafkaFetcher.runFetchLoop();
// --------------------------------------------------------------------
// make sure that the partition discoverer is properly closed
partitionDiscoverer.close();
discoveryLoopThread.join();
接着,我們進入
// kick off the actual Kafka consumer
consumerThread.start();
這個線程是在構建kafka09Fetcher的時候創建的
LOG,
? ? ?handover,
? ? ?kafkaProperties,
? ? ?unassignedPartitionsQueue,
? ? ?createCallBridge(),
? ? ?getFetcherName() + " for " + taskNameWithSubtasks,
? ? ?pollTimeout,
? ? ?useMetrics,
? ? ?consumerMetricGroup,
? ? ?subtaskMetricGroup);
KafkaConsumerThread 繼承自Thread,然后在其run方法里,首先看到的是
// this is the means to talk to FlinkKafkaConsumer's main thread
final Handover handover=this.handover;
這個handover的作用呢暫且不提,接着分析run方法里面內容
1,獲取消費者
try {
? this.consumer =getConsumer(kafkaProperties);
}
2,檢測分區並且會重分配新增的分區
? }
? else {
? ? ?// if no assigned partitions block until we get at least one
? ? ?// instead of hot spinning this loop. We rely on a fact that
? ? ?// unassignedPartitionsQueue will be closed on a shutdown, so
? ? ?// we don't block indefinitely
? ? ?newPartitions=unassignedPartitionsQueue.getBatchBlocking();
? }
? if (newPartitions !=null) {
? ? ?reassignPartitions(newPartitions);
? }
3,消費數據
? }
? catch (WakeupException we) {
? ? ?continue;
? }
}
4,通過handover將數據發出去
? records=null;
}
由於kafkaFetcher的runFetchLoop方法的分析,我們在這里繼續
1,拉取handover.producer生產的數據
byte[], byte[]> records=handover.pollNext();
2,數據格式整理,並將數據整理好后,逐個Record發送,將循環主動批量拉取kafka數據,轉化為事件觸發。
// get the records for each topic partition
for (KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitionStates()) {
? List<ConsumerRecord<byte[], byte[]>> partitionRecords=
? ? ? ? records.records(partition.getKafkaPartitionHandle());
? for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {
? ? ?final T value=deserializer.deserialize(
? ? ? ? ? ?record.key(), record.value(),
? ? ? ? ? ?record.topic(), record.partition(), record.offset());
? ? ?if (deserializer.isEndOfStream(value)) {
? ? ? ? // end of stream signaled
? ? ? ? running =false;
? ? ? ? break;
? ? ?}
? ? ?// emit the actual record. this also updates offset state atomically
? ? ?// and deals with timestamps and watermark generation
? ? ?emitRecord(value, partition, record.offset(), record);
? }
}
肯定會注意到這行代碼emitRecord(value, partition, record.offset(), record);,英語美文從這里開始flink變成事件觸發的流引擎。
handover-樞紐
handover是在構建kafkaFetcher的時候構建的
this.handover =new Handover();
handover是一個工具,將一組數據或者異常從生產者線程傳輸到消費者線程。它高效的扮演了一個阻塞隊列的特性。該類運行於flink kafka consumer,用來在kafkaConsumer 類和主線程之間轉移數據和異常。
handover有兩個重要方法,分別是:
1,producer
producer是將kafkaConusmer獲取的數據發送出去,在KafkaConsumerThread中調用。代碼如上
2,pollnext
從handover里面拉去下一條數據,會阻塞的,行為很像是從一個阻塞隊列里面拉去數據。
綜述
kafkaConsumer批量拉去數據,flink將其經過整理之后變成,逐個Record發送的事件觸發式的流處理。這就是flink與kafka結合事件觸發時流處理的基本思路。
重要的事情再說一遍:Flink支持動態發現新增topic或者新增partition哦。具體實現思路,前面有代碼為證,后面會對比spark Streaming的這塊(不支持動態發現新增kafka topic或者partition),來詳細講解。
推薦閱讀:
Flink on yarn初步講解
Flink並行度