Kafka與Flink集成


Apache Flink是新一代的分布式流式數據處理框架,它統一的處理引擎既可以處理批數據(batch data)也可以處理流式數據(streaming data)。在實際場景中,Flink利用Apache Kafka作為上下游的輸入輸出十分常見,本文將給出一個可運行的實際例子來集成兩者。

1. 目標

本例模擬中將集成Kafka與Flink:Flink實時從Kafka中獲取消息,每隔10秒去統計機器當前可用的內存數並將結果寫入到本地文件中。

 2. 環境准備

  • Apache Kafka 0.11.0.0
  • Apache Flink 1.3.1
  • Gradle 3.5 (版本號不是強要求)

本例運行在Windows環境,但可以很容易地移植到其他平台上。

3. 創建Flink Streaming工程

本例使用Intellij IDEA作為項目開發的IDE。首先創建Gradle project,group為'huxihx.flink.demo',artifact id為‘flink-kafka-demo’,version為‘1.0-SNAPSHOT’。整個項目結構如圖所示:

4. 增加kafka和kafka-connector依賴

增加下列gradle依賴:

    compile group: 'org.apache.flink', name: 'flink-connector-kafka-0.10_2.11', version: '1.3.1'
    compile group: 'org.apache.flink', name: 'flink-streaming-java_2.11', version: '1.3.1'
    compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.11.0.0'

設置gradle打包依賴

jar {
    manifest {
        attributes(
                "Manifest-Version": 1.0,
                "Main-Class": "huxihx.KafkaMessageStreaming")
    }
    from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } }
    into('assets') {
        from 'assets'
    }
}

5. 啟動Flink環境(本例使用local測試環境)

F:\SourceCode\flink-1.3.1
> bin\start-local.bat
Starting Flink job manager. Webinterface by default on http://localhost:8081/.
Don't close this batch window. Stop job manager by pressing Ctrl+C.

6. 啟動Kafka單節點集群

啟動Zookeeper:

cd F:\SourceCode\zookeeper
> bin\zkServer.cmd

啟動Kafka broker:

> cd F:\SourceCode\kafka_1
> set JMX_PORT=9999 
> bin\windows\kafka-server-start.bat F:\\SourceCode\\configs\\server.properties

7. 代碼開發

代碼主要由兩部分組成:

  • MessageSplitter類、MessageWaterEmitter類和KafkaMessageStreaming類:Flink streaming實時處理Kafka消息類
  • KafkaProducerTest類和MemoryUsageExtrator類:構建Kafka測試消息

本例中,Kafka消息格式固定為:時間戳,主機名,當前可用內存數。其中主機名固定設置為machine-1,而時間戳和當前可用內存數都是動態獲取。由於本例只會啟動一個Kafka producer來模擬單台機器發來的消息,因此在最終的統計結果中只會統計machine-1這一台機器的內存。下面我們先來看flink部分的代碼實現。

MessageSplitter類(將獲取到的每條Kafka消息根據“,”分割取出其中的主機名和內存數信息)

public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
        if (value != null && value.contains(",")) {
            String[] parts = value.split(",");
            out.collect(new Tuple2<>(parts[1], Long.parseLong(parts[2])));
        }
    }
}

MessageWaterEmitter類(根據Kafka消息確定Flink的水位)

public class MessageWaterEmitter implements AssignerWithPunctuatedWatermarks<String> {
    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(String lastElement, long extractedTimestamp) {
        if (lastElement != null && lastElement.contains(",")) {
            String[] parts = lastElement.split(",");
            return new Watermark(Long.parseLong(parts[0]));
        }
        return null;
    }

    @Override
    public long extractTimestamp(String element, long previousElementTimestamp) {
        if (element != null && element.contains(",")) {
            String[] parts = element.split(",");
            return Long.parseLong(parts[0]);
        }
        return 0L;
    }
}

KafkaMessageStreaming類(Flink入口類,封裝了對於Kafka消息的處理邏輯。本例每10秒統計一次結果並寫入到本地文件)

public class KafkaMessageStreaming {

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000); // 非常關鍵,一定要設置啟動檢查點!!
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id", "flink-group");

        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<>(args[0], new SimpleStringSchema(), props);
        consumer.assignTimestampsAndWatermarks(new MessageWaterEmitter());

        DataStream<Tuple2<String, Long>> keyedStream = env
                .addSource(consumer)
                .flatMap(new MessageSplitter())
                .keyBy(0)
                .timeWindow(Time.seconds(10))

                .apply(new WindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<String, Long>> out) throws Exception {
                        long sum = 0L;
                        int count = 0;
                        for (Tuple2<String, Long> record: input) {
                            sum += record.f1;
                            count++;
                        }
                        Tuple2<String, Long> result = input.iterator().next();
                        result.f1 = sum / count;
                        out.collect(result);
                    }
                });

        keyedStream.writeAsText(args[1]);
        env.execute("Flink-Kafka demo");
    }
}

實現了這些代碼之后我們已然可以打包進行部署了,不過在其之前我們先看下Kafka producer測試類的實現——該類每1秒發送一條符合上面格式的Kafka消息供下游Flink集群消費。

MemoryUsageExtrator類(很簡單的工具類,提取當前可用內存字節數)

public class MemoryUsageExtrator {

    private static OperatingSystemMXBean mxBean =
            (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();

    /**
     * Get current free memory size in bytes
     * @return  free RAM size
     */
    public static long currentFreeMemorySizeInBytes() {
        return mxBean.getFreePhysicalMemorySize();
    }
}

KafkaProducerTest類(發送Kafka消息)

public class KafkaProducerTest {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        int totalMessageCount = 10000;
        for (int i = 0; i < totalMessageCount; i++) {
            String value = String.format("%d,%s,%d", System.currentTimeMillis(), "machine-1", currentMemSize());
            producer.send(new ProducerRecord<>("test", value), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.out.println("Failed to send message with exception " + exception);
                    }
                }
            });
            Thread.sleep(1000L);
        }
        producer.close();
    }

    private static long currentMemSize() {
        return MemoryUsageExtrator.currentFreeMemorySizeInBytes();
    }
}

8. 部署Flink jar包

8.1 打包Flink jar包

> cd flink-kafka-demo
> gradle clean build

生成的jar包在項目目錄下的build/libs/下,本例中是flink-kafka-demo-1.0-SNAPSHOT.jar

8.2 部署jar包

> bin\flink.bat run -c huxihx.KafkaMessageStreaming  F:\\Projects\\flink-kafka-demo\\build\\libs\\flink-kafka-demo-1.0-SNAPSHOT.jar test F:\\temp\result.txt  

KafkaMessageStreaming類接收兩個命令行參數,第一個是Kafka topic名字,第二個是輸出文件路徑

部署成功之后,可以在Flink控制台(本例中是http://localhost:8081/)中看到job已成功部署,如下圖所示:

8. 運行KafkaProducerTest

運行Kafka producer,給Flink job創建輸入數據,然后啟動一個終端,監控輸出文件的變化,

> cd F:\temp
> tail -f result.txt
(machine-1,3942129078)
(machine-1,3934864179)
(machine-1,4044071321)
(machine-1,4091437056)
(machine-1,3925701836)
(machine-1,3753678438)
(machine-1,3746314649)
......

可以看到,Flink每隔10s就會保存一條新的統計記錄到result.txt文件中,該記錄會統計主機名為machine-1的機器在過去10s的平均可用內存字節數。

9. 總結

 本文給出了一個可運行的Flink + Kafka的項目配置及代碼實現。值得注意的是,上面例子中用到的Flink Kafka connector使用了Kafka新版本consumer的API,因此不再需要連接Zookeeper信息。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM