FlinkKafkaConsumer
package pers.aishuang.producer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class FlinkKafkaReader {
public static void main(String[] args) {
//1. 執行環境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//-- 設置並行度
env.setParallelism(1);
env.enableCheckpointing(20);
//2. 消費Kafka
//2.1 消費者配置
Properties props = new Properties();
//-- 設置連接的服務端
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
//-- 設置消費組名稱
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"_vehicle_data_consumer");
//2.2 創建FlinkKafkaConsumer讀取Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"vehicledata_copy",
new SimpleStringSchema(), //反序列化數據結構
props
);
//-- 設置消費策略
//consumer.setStartFromEarliest();
consumer.setStartFromGroupOffsets();
//3. 加載數據源
DataStreamSource<String> source = env.addSource(consumer);
//4. 打印輸出
source.printToErr();
//5. 觸發執行
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
FlinkKafkaProducer
package pers.aishuang.producer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
public class FlinkKafkaWriter {
public static void main(String[] args) {
//1. 執行環境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//-- 設置並行度、chk、重啟策略等參數
env.setParallelism(1);
//2. 數據源-source
DataStreamSource<String> inputStream = env
.readTextFile("SourceDataProcess/datas/sourcedata.txt");
//3. 數據轉換-transformation
//4. 數據落地-sink 【寫入kafka】
//4.1 生產者配置
Properties props = new Properties();
//-- 設置連接的服務端
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
" 192.168.88.161:9092,192.168.88.161:9092,192.168.88.161:9092");
//-- 設置批寫入條數
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"5");
//-- 設置ACK確認選項
props.setProperty(ProducerConfig.ACKS_CONFIG, "0");
//4.2 創建FlinkKafkaProducer寫入Kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>(
"vehicledata_copy",
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String line, @Nullable Long timestamp) {
return new ProducerRecord(
"vehicledata_copy",
line.getBytes()
);
}
},
props,
FlinkKafkaProducer.Semantic.NONE
);
//5. 加載數據終端
inputStream.addSink(producer);
inputStream.printToErr();
//6. 觸發執行
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}