Flink 讀取 Kafka 數據 (極簡版)


本文講述:本地 Flink 1.7.0 (Java SDK) 讀取本地 Kafka 數據,不做任何處理直接打印輸出到控制台

環境:win10 + WSL

步驟略過

1. 啟動 Kafka 並創建 topic

以下命令都在解壓后的 Kafka 文件夾內執行

1.1 啟動 Kafka

啟動 zookeeper

 ./bin/zookeeper-server-start.sh config/zookeeper.properties

啟動 kafka 服務

./bin/kafka-server-start.sh config/server.properties

1.2 創建 topic

./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

1.3 啟動生產者和消費者

啟動生產者

./bin/kafka-console-producer.sh --topic topic2 --broker-list localhost:9092

啟動消費者

./bin/kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092

此時,在生產者窗口中輸入任意字符,即可在消費者窗口中看到相應的輸出

進入解壓后的 flink 文件夾內

./bin/start-cluster.sh

3. 使用 JAVA 中讀取 Kafka 內容

代碼如下:

package com.xjr7670;


import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

public class StreamingJob {

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
		properties.setProperty("group.id", "g2");    // 第 1 個參數是固定值 group.id,第 2 個參數是自定義的組 ID
		DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
		String topic = "topic2";
		DataStream<String> text = env.addSource(new FlinkKafkaConsumer011<String>(topic, deserializationSchema, properties));
		text.print();
		
		env.execute("Flink-Kafka demo");
	}
}

POM 依賴配置略過。以上代碼可直接執行,執行后,在 Kafka 生產者窗口中輸入消息,即可在代碼輸出窗口中看到同樣的消息輸出
如果執行時遇到報錯:

java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema

需要添加 flink/lib 下的包到項目內


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM