需求與場景
上游某業務數據量特別大,進入到kafka一個topic中(當然了這個topic的partition數必然多,有人肯定疑問為什么非要把如此龐大的數據寫入到1個topic里,歷史留下的問題,現狀就是如此龐大的數據集中在一個topic里)。這就需要根據一些業務規則把這個大數據量的topic數據分發到多個(成百上千)topic中,以便下游的多個job去消費自己topic的數據,這樣上下游之間的耦合性就降低了,也讓下游的job輕松了很多,下游的job只處理屬於自己的數據,避免成百上千的job都去消費那個大數據量的topic。數據被分發之后再讓下游job去處理 對網絡帶寬、程序性能、算法復雜性都有好處。
這樣一來就需要 這么一個分發程序,把上下游job連接起來。
分析與思考
-
Flink中有connect算子,可以連接2個流,在這里1個就是上面數據量龐大的業務數據流,另外1個就是規則流(或者叫做配置流,也就是決定根據什么樣的規則分發業務數據)
-
但是問題來了,根據規則分發好了,如何把這些數據sink到kafka多個(成百上千)topic中呢?
-
首先想到的就是添加多個sink,每分發到一個topic,就多添加1個addSink操作,這對於如果只是分發到2、3個topic適用的,我看了一下項目中有時候需要把數據sink到2個topic中,同事中就有人添加了2個sink,完全ok,但是在這里要分發到幾十個、成百上千個topic,就肯定不現實了,不需要解釋吧。
-
sink到kafka中,其實本質上就是用
KafkaProducer往kafka寫數據,那么不知道有沒有想起來,用KafkaProducer寫數據的時候api是怎樣的,public Future<RecordMetadata> send(ProducerRecord<K, V> record);顯然這里需要一個ProducerRecord對象,再看如何實例化ProducerRecord對象,public ProducerRecord(String topic, V value), 也就是說每一個message都指定topic,標明是寫到哪一個topic的,而不必說 我們要寫入10個不同的topic中,我們就一定new 10 個 KafkaProducer -
到上面這一步,如果懂的人就會豁然開朗了,我本來想着可能需要稍微改改flink-connector-kafka實現,讓我驚喜的是flink-connector-kafka已經留有了接口,只要實現
KeyedSerializationSchema這個接口的String getTargetTopic(T element);就行
代碼實現
先看一下KeyedSerializationSchema接口的定義,我們知道kafka中存儲的都是byte[],所以由我們自定義序列化key、value
/**
* The serialization schema describes how to turn a data object into a different serialized
* representation. Most data sinks (for example Apache Kafka) require the data to be handed
* to them in a specific format (for example as byte strings).
*
* @param <T> The type to be serialized.
*/
@PublicEvolving
public interface KeyedSerializationSchema<T> extends Serializable {
/**
* Serializes the key of the incoming element to a byte array
* This method might return null if no key is available.
*
* @param element The incoming element to be serialized
* @return the key of the element as a byte array
*/
byte[] serializeKey(T element);
/**
* Serializes the value of the incoming element to a byte array.
*
* @param element The incoming element to be serialized
* @return the value of the element as a byte array
*/
byte[] serializeValue(T element);
/**
* Optional method to determine the target topic for the element.
*
* @param element Incoming element to determine the target topic from
* @return null or the target topic
*/
String getTargetTopic(T element);
}
重點來了,實現這個String getTargetTopic(T element);就可以決定這個message寫入到哪個topic里。
於是 我們可以這么做,拿到業務數據(我們用的是json格式),然后根據規則分發的時候,就在這條json格式的業務數據里添加一個寫到哪個topic的字段,比如說叫topicKey,
然后我們實現getTargetTopic()方法的時候,從業務數據中取出topicKey字段就行了。
實現如下(這里我是用scala寫的,java類似):
class OverridingTopicSchema extends KeyedSerializationSchema[Map[String, Any]] {
override def serializeKey(element: Map[String, Any]): Array[Byte] = null
override def serializeValue(element: Map[String, Any]): Array[Byte] = JsonTool.encode(element) //這里用JsonTool指代json序列化的工具類
/**
* kafka message value 根據 topicKey字段 決定 往哪個topic寫
* @param element
* @return
*/
override def getTargetTopic(element: Map[String, Any]): String = {
if (element != null && element.contains(“topicKey”)) {
element(“topicKey”).toString
} else null
}
}
之后在new FlinkKafkaProducer對象的時候 把上面我們實現的這個OverridingTopicSchema傳進去就行了。
public FlinkKafkaProducer(
String defaultTopicId, // 如果message沒有指定寫往哪個topic,就寫入這個默認的topic
KeyedSerializationSchema<IN> serializationSchema,//傳入我們自定義的OverridingTopicSchema
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
//....
}
至此,我們只需要把上面new 出來的FlinkKafkaProducer添加到addSink中就能實現把數據sink到kafka多個(成百上千)topic中。
下面簡單追蹤一下FlinkKafkaProducer源碼,看看flink-connector-kafka是如何將我們自定義的KeyedSerializationSchema作用於最終的ProducerRecord
/** 這個是用戶可自定義的序列化實現
* (Serializable) SerializationSchema for turning objects used with Flink into.
* byte[] for Kafka.
*/
private final KeyedSerializationSchema<IN> schema;
@Override
public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException {
checkErroneous();
// 調用我們自己的實現的schema序列化message中的key
byte[] serializedKey = schema.serializeKey(next);
// 調用我們自己的實現的schema序列化message中的value
byte[] serializedValue = schema.serializeValue(next);
// 調用我們自己的實現的schema取出寫往哪個topic
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
// 如果沒有指定寫往哪個topic,就寫往默認的topic
// 這個默認的topic是我們new FlinkKafkaProducer時候作為第一個構造參數傳入(見上面的注釋)
targetTopic = defaultTopicId;
}
Long timestamp = null;
if (this.writeTimestampToKafka) {
timestamp = context.timestamp();
}
ProducerRecord<byte[], byte[]> record;
int[] partitions = topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, transaction.producer);
topicPartitionsMap.put(targetTopic, partitions);
}
if (flinkKafkaPartitioner != null) {
record = new ProducerRecord<>(
targetTopic, // 這里看到了我們上面一開始分析的ProducerRecord
flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions),
timestamp,
serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue);
}
pendingRecords.incrementAndGet();
transaction.producer.send(record, callback);
}
