如何用Flink把數據sink到kafka多個不同(成百上千)topic中


需求與場景

上游某業務數據量特別大,進入到kafka一個topic中(當然了這個topic的partition數必然多,有人肯定疑問為什么非要把如此龐大的數據寫入到1個topic里,歷史留下的問題,現狀就是如此龐大的數據集中在一個topic里)。這就需要根據一些業務規則把這個大數據量的topic數據分發到多個(成百上千)topic中,以便下游的多個job去消費自己topic的數據,這樣上下游之間的耦合性就降低了,也讓下游的job輕松了很多,下游的job只處理屬於自己的數據,避免成百上千的job都去消費那個大數據量的topic。數據被分發之后再讓下游job去處理 對網絡帶寬、程序性能、算法復雜性都有好處。

這樣一來就需要 這么一個分發程序,把上下游job連接起來。

分析與思考

  1. Flink中有connect算子,可以連接2個流,在這里1個就是上面數據量龐大的業務數據流,另外1個就是規則流(或者叫做配置流,也就是決定根據什么樣的規則分發業務數據)

  2. 但是問題來了,根據規則分發好了,如何把這些數據sink到kafka多個(成百上千)topic中呢?

  3. 首先想到的就是添加多個sink,每分發到一個topic,就多添加1個addSink操作,這對於如果只是分發到2、3個topic適用的,我看了一下項目中有時候需要把數據sink到2個topic中,同事中就有人添加了2個sink,完全ok,但是在這里要分發到幾十個、成百上千個topic,就肯定不現實了,不需要解釋吧。

  4. sink到kafka中,其實本質上就是用KafkaProducer往kafka寫數據,那么不知道有沒有想起來,用KafkaProducer寫數據的時候api是怎樣的,public Future<RecordMetadata> send(ProducerRecord<K, V> record); 顯然這里需要一個ProducerRecord對象,再看如何實例化ProducerRecord對象,public ProducerRecord(String topic, V value), 也就是說每一個message都指定topic,標明是寫到哪一個topic的,而不必說 我們要寫入10個不同的topic中,我們就一定new 10 個 KafkaProducer

  5. 到上面這一步,如果懂的人就會豁然開朗了,我本來想着可能需要稍微改改flink-connector-kafka實現,讓我驚喜的是flink-connector-kafka已經留有了接口,只要實現KeyedSerializationSchema這個接口的String getTargetTopic(T element);就行

代碼實現

先看一下KeyedSerializationSchema接口的定義,我們知道kafka中存儲的都是byte[],所以由我們自定義序列化key、value

/**
 * The serialization schema describes how to turn a data object into a different serialized
 * representation. Most data sinks (for example Apache Kafka) require the data to be handed
 * to them in a specific format (for example as byte strings).
 *
 * @param <T> The type to be serialized.
 */
@PublicEvolving
public interface KeyedSerializationSchema<T> extends Serializable {

	/**
	 * Serializes the key of the incoming element to a byte array
	 * This method might return null if no key is available.
	 *
	 * @param element The incoming element to be serialized
	 * @return the key of the element as a byte array
	 */
	byte[] serializeKey(T element);

	/**
	 * Serializes the value of the incoming element to a byte array.
	 *
	 * @param element The incoming element to be serialized
	 * @return the value of the element as a byte array
	 */
	byte[] serializeValue(T element);

	/**
	 * Optional method to determine the target topic for the element.
	 *
	 * @param element Incoming element to determine the target topic from
	 * @return null or the target topic
	 */
	String getTargetTopic(T element);
}

重點來了,實現這個String getTargetTopic(T element);就可以決定這個message寫入到哪個topic里。

於是 我們可以這么做,拿到業務數據(我們用的是json格式),然后根據規則分發的時候,就在這條json格式的業務數據里添加一個寫到哪個topic的字段,比如說叫topicKey
然后我們實現getTargetTopic()方法的時候,從業務數據中取出topicKey字段就行了。

實現如下(這里我是用scala寫的,java類似):

class OverridingTopicSchema extends KeyedSerializationSchema[Map[String, Any]] {

  override def serializeKey(element: Map[String, Any]): Array[Byte] = null

  override def serializeValue(element: Map[String, Any]): Array[Byte] = JsonTool.encode(element) //這里用JsonTool指代json序列化的工具類

  /**
    * kafka message value 根據 topicKey字段 決定 往哪個topic寫
    * @param element
    * @return
    */
  override def getTargetTopic(element: Map[String, Any]): String = {
    if (element != null && element.contains(“topicKey”)) {
      element(“topicKey”).toString
    } else null
  }
}

之后在new FlinkKafkaProducer對象的時候 把上面我們實現的這個OverridingTopicSchema傳進去就行了。

public FlinkKafkaProducer(
		String defaultTopicId,  // 如果message沒有指定寫往哪個topic,就寫入這個默認的topic
		KeyedSerializationSchema<IN> serializationSchema,//傳入我們自定義的OverridingTopicSchema
		Properties producerConfig,
		Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
		FlinkKafkaProducer.Semantic semantic,
		int kafkaProducersPoolSize) {
                    //....
}

至此,我們只需要把上面new 出來的FlinkKafkaProducer添加到addSink中就能實現把數據sink到kafka多個(成百上千)topic中。

下面簡單追蹤一下FlinkKafkaProducer源碼,看看flink-connector-kafka是如何將我們自定義的KeyedSerializationSchema作用於最終的ProducerRecord

        /**  這個是用戶可自定義的序列化實現
	 * (Serializable) SerializationSchema for turning objects used with Flink into.
	 * byte[] for Kafka.
	 */
	private final KeyedSerializationSchema<IN> schema;

        @Override
	public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
		checkErroneous();
// 調用我們自己的實現的schema序列化message中的key
		byte[] serializedKey = schema.serializeKey(next);

// 調用我們自己的實現的schema序列化message中的value
		byte[] serializedValue = schema.serializeValue(next);
                
// 調用我們自己的實現的schema取出寫往哪個topic
		String targetTopic = schema.getTargetTopic(next);

		if (targetTopic == null) {
// 如果沒有指定寫往哪個topic,就寫往默認的topic
// 這個默認的topic是我們new  FlinkKafkaProducer時候作為第一個構造參數傳入(見上面的注釋)
			targetTopic = defaultTopicId;
		}

		Long timestamp = null;
		if (this.writeTimestampToKafka) {
			timestamp = context.timestamp();
		}
		ProducerRecord<byte[], byte[]> record;
		int[] partitions = topicPartitionsMap.get(targetTopic);
		if (null == partitions) {
			partitions = getPartitionsByTopic(targetTopic, transaction.producer);
			topicPartitionsMap.put(targetTopic, partitions);
		}
		if (flinkKafkaPartitioner != null) {
			record = new ProducerRecord<>(
				targetTopic, // 這里看到了我們上面一開始分析的ProducerRecord
				flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
				timestamp,
				serializedKey,
				serializedValue);
		} else {
			record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
		}
		pendingRecords.incrementAndGet();
		transaction.producer.send(record, callback);
	}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM