flink之kafka生產和消費實戰-將生產數據存放到mongodb中


傳統要構建一個kafka的生產者和消費者,還是比較費勁的,但是在引入flink插件后,就會變的非常容易;

我的場景:監聽一個topic, 然后消費者將該topic的消息存放到數據庫中,展示在前端,然后在測試需要的時候在前端修改消息,然后將消息重新發送出去;因此在生產者和消費者里面加了一個字段test, 來表示是從自己的服務這里發出去的消息,因此不需要消費並入庫;

在測試生產者和消費者的時候,可以先在自己本地起一個kafka,然后本地生產,服務消費,看代碼是否ok;  或者在服務生產,本地消費,看代碼是否ok

1. 后端是一個springboot工程,首先需要在pom文件中引入依賴

 
         
<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>

<
dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency>

2. 話不多說,直接開始先寫生產者

public void sendKafka(String topic, String server, String message) throws Exception {
        log.info("開始生產");
        JSONObject obj = JSONObject.parseObject(message);
        obj.put("test","test");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", server);
        DataStreamSource<String> text = env.addSource(new MyNoParalleSource(obj.toString())).setParallelism(1);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties);
        text.addSink(producer);
        env.execute("send kafka ok");
    }

可以看到里面用到了MyNoParalleSource類,其作用是構建一個並行度為1的數據流,來生產數據

public class MyNoParalleSource implements SourceFunction<String> {

    String message;

    public MyNoParalleSource(){

    }

    public MyNoParalleSource(String message) {
        this.message = message;
    }

    @Override
    public void run(SourceContext<String> sourceContext) throws Exception {
            sourceContext.collect(this.message);
    }

    @Override
    public void cancel() {

    }
}

此時生產者就寫完了,是不是很優秀,超級簡單;

3. 消費者(由於我的目的是將生產者生產的東西在消費者端存入mongdb數據庫中,因此會比生產者稍微復雜一點)

public  void consumeKafka(String topic, String server) throws Exception {
        log.info("開始消費");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", server);

        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
        //從最早開始消費
        consumer.setStartFromLatest();

        DataStream<String> stream = env.addSource(consumer);
        DataStream<Tuple4<String, String, String, String>> sourceStreamTra = stream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                Boolean flag = true;
                JSONObject obj = JSONObject.parseObject(value);
                if(obj.containsKey("test")){
                    flag = false;
                }
                return StringUtils.isNotBlank(value) && flag;
            }
        }).map(new MapFunction<String, Tuple4<String, String, String, String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Tuple4<String, String, String, String> map(String value)
                    throws Exception {
                JSONObject obj = JSONObject.parseObject(value);
                String dataBase = null;
                String table = null;
                if(obj.containsKey("database")){
                    dataBase = obj.getString("database");
                    table = obj.getString("table");
                }
                
                return new Tuple4<String, String, String, String>(server ,topic, dataBase+"->"+table, obj.toString());
            }
        });
        sourceStreamTra.addSink(new MongoSink());
        env.execute();
    }
public class MongoSink extends RichSinkFunction<Tuple4<String, String, String, String>> {
    private static final long serialVersionUID = 1L;
    MongoClient mongoClient = null;
//    MongoCollection mongoCollection = null;

    @Override
    public void invoke(Tuple4<String, String, String, String> value) throws Exception {
        KafkaRecord kafkaRecord = new KafkaRecord("", value.f0 , value.f1, value.f2, value.f3, new Date(new Timestamp(System.currentTimeMillis()).getTime()));

        if(mongoClient != null){
            mongoClient = MongoDBUtil.getConnect();
            MongoDatabase db = mongoClient.getDatabase("databBase"); // 是自己的數據庫
            MongoCollection mongoCollection = db.getCollection("kafkaRecord");
            mongoCollection.insertOne(new Document(CommonMethod.objectToMap(kafkaRecord)));
        }

    }
    @Override
    public void open(Configuration parms) throws Exception {
        super.open(parms);
        mongoClient = MongoDBUtil.getConnect();
    }

    @Override
    public void close() throws Exception {
        if (mongoClient != null) {
            mongoClient.close();
        }
    }
}
import lombok.Data;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;

import java.io.Serializable;
import java.util.Date;

@Data
@Document(collection = "kafkaRecord")
public class KafkaRecord implements Serializable {
    @Field("_id")
    String id;
    // 具體信息
    String msg;
    //topic
    String topic;

    String server;

    String source;
    //操作時間
    Date time;

    public KafkaRecord(){

    }

    public KafkaRecord(String id, String server, String topic, String source, String msg, Date time){
        this.id = id;
        this.server = server;
        this.msg = msg;
        this.topic = topic;
        this.source = source;
        this.time = time;
    }
}

此時消費者也完事了;

啟動后端服務,生產者發送一條消息,消費者則拿到該消息存到數據庫中; 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM