Flink 自定義source和sink,獲取kafka的key,輸出指定key


--------20190905更新-------  

沙雕了,可以用  JSONKeyValueDeserializationSchema,接收ObjectNode的數據,如果有key,會放在ObjectNode中

if (record.key() != null) {
            node.set("key", mapper.readValue(record.key(), JsonNode.class));
        }
        if (record.value() != null) {
            node.set("value", mapper.readValue(record.value(), JsonNode.class));
        }
        if (includeMetadata) {
            node.putObject("metadata")
                .put("offset", record.offset())
                .put("topic", record.topic())
                .put("partition", record.partition());
        }

 

-------------------

Flink 的  FlinkKafkaConsumer、FlinkKafkaProducer,在消費、生成kafka 數據的時候,不能指定key,又時候,我們又需要這個key。

val kafkaSource = new FlinkKafkaConsumer[ObjectNode]("kafka_demo", new JsonNodeDeserializationSchema(), Common.getProp)
    val sink = new FlinkKafkaProducer[String]("kafka_demo_out", new SimpleStringSchema(), Common.getProp)
    sink.setWriteTimestampToKafka(true)

    env.addSource(kafkaSource)
      .map(node => {
        node.put("token", System.currentTimeMillis())
        node.toString
      })
      .addSink(sink)

下面通過flink 的自定source、sink 實現,消費、生成kafka 數據的時候,獲取數據的key ,和輸出不同key的數據

思路: 使用kafka 原生的api,KafkaConsuemr和KafkaProducer 消費、生產kafka的數據,就可以獲取到key值

kafka 生產者:

object KafkaKeyMaker {
  val topic = "kafka_key"
  def main(args: Array[String]): Unit = {

    val producer = new KafkaProducer[String, String](Common.getProp)
    while (true) {
      val map = Map("user"->"venn", "name"->"venn","pass"->System.currentTimeMillis())
      val jsonObject: JSONObject = new JSONObject(map)
      println(jsonObject.toString())
// key : msgKey + long val msg
= new ProducerRecord[String, String](topic, "msgKey" + System.currentTimeMillis(), jsonObject.toString()) producer.send(msg) producer.flush() Thread.sleep(3000) } } }

kafka 消費者:

object KafkaKeyReceive{
  val topic = "kafka_key"
  def main(args: Array[String]): Unit = {
    val consumer = new KafkaConsumer[String, String](Common.getProp)
    consumer.subscribe(util.Arrays.asList(topic + "_out"))
    while (true) {
      val records = consumer.poll(500)
      val tmp = records.iterator()
      while (tmp.hasNext){
        val record = tmp.next()
        val key = record.key()
        val value = record.value()
        println("receive -> key : " + key + ", value : " + value)
      }
      Thread.sleep(3000)
    }
  }
}

flink 代碼,自定義source、sink

import com.venn.common.Common
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.collection.JavaConversions._
/**
  * Created by venn on 19-4-26.
  */
object KafkaSourceKey {

  def main(args: Array[String]): Unit = {
    // environment
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    
    env.addSource(new RichSourceFunction[String] { // kafka consumer 對象 var consumer: KafkaConsumer[String, String] = null // 初始化方法 override def open(parameters: Configuration): Unit = { consumer = new KafkaConsumer[String, String](Common.getProp) // 訂閱topic val list = List("kafka_key") consumer.subscribe(list) } // 執行方法,拉取數據,獲取到的數據,會放到source 的緩沖區 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { println("run") while (true) { val records = consumer.poll(500) val tmp = records.iterator() while (tmp.hasNext) { val record = tmp.next() val key = record.key() val value = record.value() ctx.collect("key : " + key + ", value " + value) } } } override def cancel(): Unit = { println("cancel") } }).map(s => s + "map")
      .addSink(new RichSinkFunction[String] { // kafka producer 對象 var producer: KafkaProducer[String, String] = null // 初始化 override def open(parameters: Configuration): Unit = { producer = new KafkaProducer[String, String](Common.getProp) } override def close(): Unit = { if (producer == null) { producer.flush() producer.close() } } // 輸出數據,每條結果都會執行一次,並發高的時候,可以按需做flush override def invoke(value: String, context: SinkFunction.Context[_]): Unit = { println("flink : " + value) val msg = new ProducerRecord[String, String]( "kafka_key_out", "key" + System.currentTimeMillis(), value) producer.send(msg) producer.flush() } }) // execute job
    env.execute("KafkaToKafka")
  }

}

kafka 生產者數據:

{"user" : "venn", "name" : "venn", "pass" : 1561355358148}
{"user" : "venn", "name" : "venn", "pass" : 1561355361271}
{"user" : "venn", "name" : "venn", "pass" : 1561355364276}
{"user" : "venn", "name" : "venn", "pass" : 1561355367279}
{"user" : "venn", "name" : "venn", "pass" : 1561355370283}

flink 輸出數據:

run
flink : key : msgKey1561355358180, value {"user" : "venn", "name" : "venn", "pass" : 1561355358148}map
flink : key : msgKey1561355361271, value {"user" : "venn", "name" : "venn", "pass" : 1561355361271}map
flink : key : msgKey1561355364276, value {"user" : "venn", "name" : "venn", "pass" : 1561355364276}map
flink : key : msgKey1561355367279, value {"user" : "venn", "name" : "venn", "pass" : 1561355367279}map
flink : key : msgKey1561355370283, value {"user" : "venn", "name" : "venn", "pass" : 1561355370283}map
flink : key : msgKey1561355373289, value {"user" : "venn", "name" : "venn", "pass" : 1561355373289}map
flink : key : msgKey1561355376293, value {"user" : "venn", "name" : "venn", "pass" : 1561355376293}map

kafka 消費者:

receive -> key : key1561355430411, value : key : msgKey1561355430356, value {"user" : "venn", "name" : "venn", "pass" : 1561355430356}map
receive -> key : key1561355433427, value : key : msgKey1561355433359, value {"user" : "venn", "name" : "venn", "pass" : 1561355433359}map
receive -> key : key1561355436441, value : key : msgKey1561355436364, value {"user" : "venn", "name" : "venn", "pass" : 1561355436364}map
receive -> key : key1561355439456, value : key : msgKey1561355439367, value {"user" : "venn", "name" : "venn", "pass" : 1561355439367}map
receive -> key : key1561355442473, value : key : msgKey1561355442370, value {"user" : "venn", "name" : "venn", "pass" : 1561355442370}map
receive -> key : key1561355445391, value : key : msgKey1561355445374, value {"user" : "venn", "name" : "venn", "pass" : 1561355445374}map

 

注:這樣設計有個問題,沒辦法做到精確一次:

  1、source 的精確一次可以使用kafka 的低級api,每次從指定的offset 讀取數據,提交新的offset,然后將當前的offset 存到狀態中,這樣即使程序失敗,重啟到上一個checkpoint狀態,數據也不會重復。

  2、sink 的處理比較麻煩,以官網介紹的 “兩段提交”的方法,提交生產者的數據。簡單來說,就是每次數據處理完后,需要提交數據到kafka,不做真正的提交,僅寫入一些已定義的狀態變量,當chckpoint成功時Flink負責提交這些寫入,否則就終止取消掉。

 

參考zhisheng 大佬的 博客 : 《從0到1學習Flink》—— 如何自定義 Data Source ?

《從0到1學習Flink》—— 如何自定義 Data Sink ?

兩段提交的一篇翻譯: 【譯】Flink + Kafka 0.11端到端精確一次處理語義的實現


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM