FlinkKafkaConsumer
package pers.aishuang.producer;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Properties;
public class FlinkKafkaReader {
public static void main(String[] args) {
//1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//-- 设置并行度
env.setParallelism(1);
env.enableCheckpointing(20);
//2. 消费Kafka
//2.1 消费者配置
Properties props = new Properties();
//-- 设置连接的服务端
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
//-- 设置消费组名称
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"_vehicle_data_consumer");
//2.2 创建FlinkKafkaConsumer读取Kafka
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
"vehicledata_copy",
new SimpleStringSchema(), //反序列化数据结构
props
);
//-- 设置消费策略
//consumer.setStartFromEarliest();
consumer.setStartFromGroupOffsets();
//3. 加载数据源
DataStreamSource<String> source = env.addSource(consumer);
//4. 打印输出
source.printToErr();
//5. 触发执行
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}
FlinkKafkaProducer
package pers.aishuang.producer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
public class FlinkKafkaWriter {
public static void main(String[] args) {
//1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//-- 设置并行度、chk、重启策略等参数
env.setParallelism(1);
//2. 数据源-source
DataStreamSource<String> inputStream = env
.readTextFile("SourceDataProcess/datas/sourcedata.txt");
//3. 数据转换-transformation
//4. 数据落地-sink 【写入kafka】
//4.1 生产者配置
Properties props = new Properties();
//-- 设置连接的服务端
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
" 192.168.88.161:9092,192.168.88.161:9092,192.168.88.161:9092");
//-- 设置批写入条数
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"5");
//-- 设置ACK确认选项
props.setProperty(ProducerConfig.ACKS_CONFIG, "0");
//4.2 创建FlinkKafkaProducer写入Kafka
FlinkKafkaProducer producer = new FlinkKafkaProducer<>(
"vehicledata_copy",
new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String line, @Nullable Long timestamp) {
return new ProducerRecord(
"vehicledata_copy",
line.getBytes()
);
}
},
props,
FlinkKafkaProducer.Semantic.NONE
);
//5. 加载数据终端
inputStream.addSink(producer);
inputStream.printToErr();
//6. 触发执行
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}