傳統要構建一個kafka的生產者和消費者,還是比較費勁的,但是在引入flink插件后,就會變的非常容易;
我的場景:監聽一個topic, 然后消費者將該topic的消息存放到數據庫中,展示在前端,然后在測試需要的時候在前端修改消息,然后將消息重新發送出去;因此在生產者和消費者里面加了一個字段test, 來表示是從自己的服務這里發出去的消息,因此不需要消費並入庫;
在測試生產者和消費者的時候,可以先在自己本地起一個kafka,然后本地生產,服務消費,看代碼是否ok; 或者在服務生產,本地消費,看代碼是否ok
1. 后端是一個springboot工程,首先需要在pom文件中引入依賴
<properties>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.7.0</version> </dependency>
2. 話不多說,直接開始先寫生產者
public void sendKafka(String topic, String server, String message) throws Exception { log.info("開始生產"); JSONObject obj = JSONObject.parseObject(message); obj.put("test","test"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", server); DataStreamSource<String> text = env.addSource(new MyNoParalleSource(obj.toString())).setParallelism(1); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(topic, new SimpleStringSchema(), properties); text.addSink(producer); env.execute("send kafka ok"); }
可以看到里面用到了MyNoParalleSource類,其作用是構建一個並行度為1的數據流,來生產數據
public class MyNoParalleSource implements SourceFunction<String> { String message; public MyNoParalleSource(){ } public MyNoParalleSource(String message) { this.message = message; } @Override public void run(SourceContext<String> sourceContext) throws Exception { sourceContext.collect(this.message); } @Override public void cancel() { } }
此時生產者就寫完了,是不是很優秀,超級簡單;
3. 消費者(由於我的目的是將生產者生產的東西在消費者端存入mongdb數據庫中,因此會比生產者稍微復雜一點)
public void consumeKafka(String topic, String server) throws Exception { log.info("開始消費"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", server); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties); //從最早開始消費 consumer.setStartFromLatest(); DataStream<String> stream = env.addSource(consumer); DataStream<Tuple4<String, String, String, String>> sourceStreamTra = stream.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { Boolean flag = true; JSONObject obj = JSONObject.parseObject(value); if(obj.containsKey("test")){ flag = false; } return StringUtils.isNotBlank(value) && flag; } }).map(new MapFunction<String, Tuple4<String, String, String, String>>() { private static final long serialVersionUID = 1L; @Override public Tuple4<String, String, String, String> map(String value) throws Exception { JSONObject obj = JSONObject.parseObject(value); String dataBase = null; String table = null; if(obj.containsKey("database")){ dataBase = obj.getString("database"); table = obj.getString("table"); } return new Tuple4<String, String, String, String>(server ,topic, dataBase+"->"+table, obj.toString()); } }); sourceStreamTra.addSink(new MongoSink()); env.execute(); }
public class MongoSink extends RichSinkFunction<Tuple4<String, String, String, String>> { private static final long serialVersionUID = 1L; MongoClient mongoClient = null; // MongoCollection mongoCollection = null; @Override public void invoke(Tuple4<String, String, String, String> value) throws Exception { KafkaRecord kafkaRecord = new KafkaRecord("", value.f0 , value.f1, value.f2, value.f3, new Date(new Timestamp(System.currentTimeMillis()).getTime())); if(mongoClient != null){ mongoClient = MongoDBUtil.getConnect(); MongoDatabase db = mongoClient.getDatabase("databBase"); // 是自己的數據庫 MongoCollection mongoCollection = db.getCollection("kafkaRecord"); mongoCollection.insertOne(new Document(CommonMethod.objectToMap(kafkaRecord))); } } @Override public void open(Configuration parms) throws Exception { super.open(parms); mongoClient = MongoDBUtil.getConnect(); } @Override public void close() throws Exception { if (mongoClient != null) { mongoClient.close(); } } }
import lombok.Data; import org.springframework.data.mongodb.core.mapping.Document; import org.springframework.data.mongodb.core.mapping.Field; import java.io.Serializable; import java.util.Date; @Data @Document(collection = "kafkaRecord") public class KafkaRecord implements Serializable { @Field("_id") String id; // 具體信息 String msg; //topic String topic; String server; String source; //操作時間 Date time; public KafkaRecord(){ } public KafkaRecord(String id, String server, String topic, String source, String msg, Date time){ this.id = id; this.server = server; this.msg = msg; this.topic = topic; this.source = source; this.time = time; } }
此時消費者也完事了;
啟動后端服務,生產者發送一條消息,消費者則拿到該消息存到數據庫中;