Flink對接kafka


啟動kafka和flink

1、進入zookeeper的bin目錄下啟動zookeeper

./zkServer.sh start

2、進入kafka的bin目錄下啟動kafka

/kafka-server-start.sh -daemon /opt/module/kafka-0.11/config/server.properties

3、進入flink的bin目錄下啟動flink

./start-cluster.sh 

kafka啟動生產者

kafka主題為sensor

./bin/kafka-console-producer.sh --broker-list 192.168.158.202:90992 --topic sensor

添加pom依賴

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>

執行

Java代碼如下

package com.test.apitest.souceTest;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;

public class SourceTest02_kafka {
    public static void main(String[] args) throws Exception {
        // 創建執行環境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // kafka配置項
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","192.168.153.202:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");



        // 從kafka中讀取數據
        DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer011<String>("sensor", new SimpleStringSchema(), properties));

        sensor.print();
        //執行任務
        env.execute();
    }
}

kafka生產數據

 

 flink消費數據

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM