Flink流处理-Flink集成Kafka


FlinkKafkaConsumer

package pers.aishuang.producer;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

public class FlinkKafkaReader {
    public static void main(String[] args) {
        //1. 执行环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //-- 设置并行度
        env.setParallelism(1);
        env.enableCheckpointing(20);

        //2. 消费Kafka
        //2.1 消费者配置
        Properties props = new Properties();
        //-- 设置连接的服务端
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        //-- 设置消费组名称
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"_vehicle_data_consumer");
        //2.2 创建FlinkKafkaConsumer读取Kafka
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                "vehicledata_copy",
                new SimpleStringSchema(), //反序列化数据结构
                props
        );
        //-- 设置消费策略
        //consumer.setStartFromEarliest();
        consumer.setStartFromGroupOffsets();

        //3. 加载数据源
        DataStreamSource<String> source = env.addSource(consumer);

        //4. 打印输出
        source.printToErr();

        //5. 触发执行
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

FlinkKafkaProducer

package pers.aishuang.producer;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import javax.annotation.Nullable;
import java.util.Properties;

public class FlinkKafkaWriter {
    public static void main(String[] args) {
        //1. 执行环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //-- 设置并行度、chk、重启策略等参数
        env.setParallelism(1);

        //2. 数据源-source
        DataStreamSource<String> inputStream = env
                .readTextFile("SourceDataProcess/datas/sourcedata.txt");

        //3. 数据转换-transformation

        //4. 数据落地-sink 【写入kafka】
        //4.1 生产者配置
        Properties props = new Properties();
        //-- 设置连接的服务端
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                " 192.168.88.161:9092,192.168.88.161:9092,192.168.88.161:9092");
        //-- 设置批写入条数
        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG,"5");
        //-- 设置ACK确认选项
        props.setProperty(ProducerConfig.ACKS_CONFIG, "0");
        //4.2 创建FlinkKafkaProducer写入Kafka
        FlinkKafkaProducer producer = new FlinkKafkaProducer<>(
                "vehicledata_copy",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String line, @Nullable Long timestamp) {
                        return new ProducerRecord(
                                "vehicledata_copy",
                                line.getBytes()
                        );
                    }
                },
                props,
                FlinkKafkaProducer.Semantic.NONE
        );

        //5. 加载数据终端
        inputStream.addSink(producer);
        inputStream.printToErr();

        //6. 触发执行
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM