本文講述:本地 Flink 1.7.0 (Java SDK) 讀取本地 Kafka 數據,不做任何處理直接打印輸出到控制台
環境:win10 + WSL
0. 下載 Flink 及 Kafka 並解壓
步驟略過
1. 啟動 Kafka 並創建 topic
以下命令都在解壓后的 Kafka 文件夾內執行
1.1 啟動 Kafka
啟動 zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
啟動 kafka 服務
./bin/kafka-server-start.sh config/server.properties
1.2 創建 topic
./bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
1.3 啟動生產者和消費者
啟動生產者
./bin/kafka-console-producer.sh --topic topic2 --broker-list localhost:9092
啟動消費者
./bin/kafka-console-consumer.sh --topic topic2 --from-beginning --bootstrap-server localhost:9092
此時,在生產者窗口中輸入任意字符,即可在消費者窗口中看到相應的輸出
2. 啟動 flink
進入解壓后的 flink 文件夾內
./bin/start-cluster.sh
3. 使用 JAVA 中讀取 Kafka 內容
代碼如下:
package com.xjr7670;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Properties;
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty("group.id", "g2"); // 第 1 個參數是固定值 group.id,第 2 個參數是自定義的組 ID
DeserializationSchema<String> deserializationSchema = new SimpleStringSchema();
String topic = "topic2";
DataStream<String> text = env.addSource(new FlinkKafkaConsumer011<String>(topic, deserializationSchema, properties));
text.print();
env.execute("Flink-Kafka demo");
}
}
POM 依賴配置略過。以上代碼可直接執行,執行后,在 Kafka 生產者窗口中輸入消息,即可在代碼輸出窗口中看到同樣的消息輸出
如果執行時遇到報錯:
java.lang.ClassNotFoundException: org.apache.flink.api.common.serialization.DeserializationSchema
需要添加 flink/lib 下的包到項目內