Flink+Kafka整合的實例


Flink+Kafka整合實例

1.使用工具Intellig IDEA新建一個maven項目,為項目命名為kafka01。

2.我的pom.xml文件配置如下。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hrb.lhr</groupId>
    <artifactId>kafka01</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.1.4</flink.version>
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- explicitly add a standard loggin framework, as Flink does not (in the future) have
           a hard dependency on one specific framework by default -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

</project>

3.在項目的目錄/src/main/java在創建兩個Java類,分別命名為KafkaDemo和CustomWatermarkEmitter,代碼如下所示。

import java.util.Properties;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;


public class KafkaDeme {

        public static void main(String[] args) throws Exception {

                // set up the streaming execution environment
                final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                //默認情況下,檢查點被禁用。要啟用檢查點,請在StreamExecutionEnvironment上調用enableCheckpointing(n)方法,
                // 其中n是以毫秒為單位的檢查點間隔。每隔5000 ms進行啟動一個檢查點,則下一個檢查點將在上一個檢查點完成后5秒鍾內啟動
                env.enableCheckpointing(5000);
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                Properties properties = new Properties();
                properties.setProperty("bootstrap.servers", "10.192.12.106:9092");//kafka的節點的IP或者hostName,多個使用逗號分隔
                properties.setProperty("zookeeper.connect", "10.192.12.106:2181");//zookeeper的節點的IP或者hostName,多個使用逗號進行分隔
                properties.setProperty("group.id", "test-consumer-group");//flink consumer flink的消費者的group.id
                FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<String>("test0", new SimpleStringSchema(),
                        properties);//test0是kafka中開啟的topic
                myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
                DataStream<String> keyedStream = env.addSource(myConsumer);//將kafka生產者發來的數據進行處理,本例子我進任何處理
                keyedStream.print();//直接將從生產者接收到的數據在控制台上進行打印
                // execute program
                env.execute("Flink Streaming Java API Skeleton");

        }
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;

public class CustomWatermarkEmitter implements AssignerWithPunctuatedWatermarks<String> {

    private static final long serialVersionUID = 1L;

    public long extractTimestamp(String arg0, long arg1) {
        if (null != arg0 && arg0.contains(",")) {
            String parts[] = arg0.split(",");
            return Long.parseLong(parts[0]);
        }
        return 0;
    }

    public Watermark checkAndGetNextWatermark(String arg0, long arg1) {
        if (null != arg0 && arg0.contains(",")) {
            String parts[] = arg0.split(",");
            return new Watermark(Long.parseLong(parts[0]));
        }
        return null;
    }
}

4.開啟一台配置好zookeeper和kafka的Ubuntu虛擬機,輸入以下命令分別開啟zookeeper、kafka、topic、producer。(zookeeper和kafka的配置可參考https://www.cnblogs.com/ALittleMoreLove/p/9396745.html)

bin/zkServer.sh start
bin/kafka-server-start.sh config/server.properties
bin/kafka-topics.sh --create --zookeeper 10.192.12.106:2181 --replication-factor 1 --partitions 1 --topic test0
bin/kafka-console-producer.sh --broker-list 10.192.12.106:9092 --topic test0

5.檢測Flink程序是否可以接收到來自Kafka生產者發來的數據,運行Java類KafkaDemo,在開啟kafka生產者的終端下隨便輸入一段話,在IDEA控制台可以收到該信息,如下為kafka生產者終端和控制台。

OK,成功的接收到了來自Kafka生產者的消息^.^。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM