Apache Flink是新一代的分布式流式數據處理框架,它統一的處理引擎既可以處理批數據(batch data)也可以處理流式數據(streaming data)。在實際場景中,Flink利用Apache Kafka作為上下游的輸入輸出十分常見,本文將給出一個可運行的實際例子來集成兩者。
1. 目標
本例模擬中將集成Kafka與Flink:Flink實時從Kafka中獲取消息,每隔10秒去統計機器當前可用的內存數並將結果寫入到本地文件中。
2. 環境准備
- Apache Kafka 0.11.0.0
- Apache Flink 1.3.1
- Gradle 3.5 (版本號不是強要求)
本例運行在Windows環境,但可以很容易地移植到其他平台上。
3. 創建Flink Streaming工程
本例使用Intellij IDEA作為項目開發的IDE。首先創建Gradle project,group為'huxihx.flink.demo',artifact id為‘flink-kafka-demo’,version為‘1.0-SNAPSHOT’。整個項目結構如圖所示:
4. 增加kafka和kafka-connector依賴
增加下列gradle依賴:
compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1' compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1' compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'
設置gradle打包依賴
jar { manifest { attributes( "Manifest-Version": 1.0, "Main-Class": "huxihx.KafkaMessageStreaming") } from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } into('assets') { from 'assets' } }
5. 啟動Flink環境(本例使用local測試環境)
F:\SourceCode\flink-1.3.1 > bin\start-local.bat Starting Flink job manager. Webinterface by default on http://localhost:8081/. Don't close this batch window. Stop job manager by pressing Ctrl+C.
6. 啟動Kafka單節點集群
啟動Zookeeper:
cd F:\SourceCode\zookeeper
> bin\zkServer.cmd
啟動Kafka broker:
> cd F:\SourceCode\kafka_1 > set JMX_PORT=9999 > bin\windows\kafka-server-start.bat F:\\SourceCode\\configs\\server.properties
7. 代碼開發
代碼主要由兩部分組成:
- MessageSplitter類、MessageWaterEmitter類和KafkaMessageStreaming類:Flink streaming實時處理Kafka消息類
- KafkaProducerTest類和MemoryUsageExtrator類:構建Kafka測試消息
本例中,Kafka消息格式固定為:時間戳,主機名,當前可用內存數。其中主機名固定設置為machine-1,而時間戳和當前可用內存數都是動態獲取。由於本例只會啟動一個Kafka producer來模擬單台機器發來的消息,因此在最終的統計結果中只會統計machine-1這一台機器的內存。下面我們先來看flink部分的代碼實現。
MessageSplitter類(將獲取到的每條Kafka消息根據“,”分割取出其中的主機名和內存數信息)
public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> { @Override public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception { if (value != null && value.contains(",")) { String[] parts = value.split(","); out.collect(new Tuple2<>(parts[1], Long.parseLong(parts[2]))); } } }
MessageWaterEmitter類(根據Kafka消息確定Flink的水位)
public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> { @Nullable @Override public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) { if (lastElement != null && lastElement.contains(",")) { String[] parts = lastElement.split(","); return new Watermark(Long.parseLong(parts[0])); } return null; } @Override public long extractTimestamp(String element, long previousElementTimestamp) { if (element != null && element.contains(",")) { String[] parts = element.split(","); return Long.parseLong(parts[0]); } return 0L; } }
KafkaMessageStreaming類(Flink入口類,封裝了對於Kafka消息的處理邏輯。本例每10秒統計一次結果並寫入到本地文件)
public class KafkaMessageStreaming { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 非常關鍵,一定要設置啟動檢查點!! env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("group.id", "flink-group"); FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), props); consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter()); DataStream<Tuple2<String, Long>> keyedStream = env .addSource(consumer) .flatMap(new MessageSplitter()) .keyBy(0) .timeWindow(Time.seconds(10)) .apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() { @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception { long sum = 0L; int count = 0; for (Tuple2<String, Long> record: input) { sum += record.f1; count++; } Tuple2<String, Long> result = input.iterator().next(); result.f1 = sum / count; out.collect(result); } }); keyedStream.writeAsText(args[1]); env.execute("Flink-Kafka demo"); } }
實現了這些代碼之后我們已然可以打包進行部署了,不過在其之前我們先看下Kafka producer測試類的實現——該類每1秒發送一條符合上面格式的Kafka消息供下游Flink集群消費。
MemoryUsageExtrator類(很簡單的工具類,提取當前可用內存字節數)
public class MemoryUsageExtrator { private static OperatingSystemMXBean mxBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); /** * Get current free memory size in bytes * @return free RAM size */ public static long currentFreeMemorySizeInBytes() { return mxBean.getFreePhysicalMemorySize(); } }
KafkaProducerTest類(發送Kafka消息)
public class KafkaProducerTest { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); int totalMessageCount = 10000; for (int i = 0; i < totalMessageCount; i++) { String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize()); producer.send(new ProducerRecord<>("test", value), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { System.out.println("Failed to send message with exception " + exception); } } }); Thread.sleep(1000L); } producer.close(); } private static long currentMemSize() { return MemoryUsageExtrator.currentFreeMemorySizeInBytes(); } }
8. 部署Flink jar包
8.1 打包Flink jar包
> cd flink-kafka-demo > gradle clean build
生成的jar包在項目目錄下的build/libs/下,本例中是flink-kafka-demo-1.0-SNAPSHOT.jar
8.2 部署jar包
> bin\flink.bat run -c huxihx.KafkaMessageStreaming F:\\Projects\\flink-kafka-demo\\build\\libs\\flink-kafka-demo-1.0-SNAPSHOT.jar test F:\\temp\result.txt
KafkaMessageStreaming類接收兩個命令行參數,第一個是Kafka topic名字,第二個是輸出文件路徑
部署成功之后,可以在Flink控制台(本例中是http://localhost:8081/)中看到job已成功部署,如下圖所示:
8. 運行KafkaProducerTest
運行Kafka producer,給Flink job創建輸入數據,然后啟動一個終端,監控輸出文件的變化,
> cd F:\temp > tail -f result.txt (machine-1,3942129078) (machine-1,3934864179) (machine-1,4044071321) (machine-1,4091437056) (machine-1,3925701836) (machine-1,3753678438) (machine-1,3746314649) ......
可以看到,Flink每隔10s就會保存一條新的統計記錄到result.txt文件中,該記錄會統計主機名為machine-1的機器在過去10s的平均可用內存字節數。
9. 總結
本文給出了一個可運行的Flink + Kafka的項目配置及代碼實現。值得注意的是,上面例子中用到的Flink Kafka connector使用了Kafka新版本consumer的API,因此不再需要連接Zookeeper信息。