重點issue - kafka分區技術選型迭代+kafka亂序問題


選型一:為了保證消息有序,只有一個分區partition

優點:

(前提:生產者發送消息的順序,和消息到達kafka的順序一致)...

為了避免多partition,導致消費者消費某些順序敏感的數據,會出現亂序的現象。

 

缺點:

當數據量巨大的時候,所有該topic的數據都擠在同一個partition。這樣子就無法發揮kafka的並發度。

如果是這種情況,同一個消費group里,哪怕有多個消費者,但每個消費者只能選擇某個topic的一個partition消費。該哪怕消費者再多,也無法加快消費速度。

 

 

選型二:為提升並發度,增加partition

選型二下方描述的問題,是發生在結構圖的下半部分。

RiskSummary服務充當發布者,計划把結果發布到kafka時, 原本沒有“優化點:根據UK分區”的時候,會將同一個UK的數據,隨機放在partition 0/1, 繼而有可能導致最終結果出現了亂序。

(前提依舊成立:生產者發送消息的順序,和消息到達kafka的順序一致)...

 

問題表象

由於kafka亂序,導致bridge也亂序,導致綠色正確的數字提前pub,然后被錯誤的黃色數字覆蓋。

 

問題真正原因

上圖1黃1綠的數據,被pub到partition 0

上圖另1黃的數據,被pub到partition 1

 

盡管這三個數據的uk一致,但是被錯誤的pub到不一致的分區。。。而不同的分區之間是不保證順序性的,是各自的線程在消費,因為出現了亂序的問題。如果要解決,就必須保證同一個uk被分到同一個partition...那就需要修改kafka的分區策略。

 

kafka 默認的分區策略

參考:https://blog.csdn.net/qq_38262266/article/details/107356824 

  • 指明 partition 的情況下,直接將數據放在對應的 partiton ;
  • 沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數進行取余得到 partition 值;
  • 既沒有 partition 值又沒有 key 值的情況下,第一次調用時隨機生成一個整數(后面每次調用在這個整數上自增),將這個值與 topic 可用的 partition 總數取余得到 partition 值,也就是常說的 round-robin 算法。

 

通過自定義序列化類,給kafka消息定義key值

通常我們都是用默認的序列化類(例如SimpleStringSchema)來發送一條消息,並沒有指定key值。

xxxString.addSink(new FlinkKafkaProducer<>("XXX-XXX-TOPIC-1", new SimpleStringSchema(), properties)).name("flink-connectors-kafka");

 

有時候我們需要執行發送消息的key,value值,就需要自定義序列化類。由於需要key值,那需要實現KeyedSerializationSchema接口,其有個簡單的實現類KeyedSerializationSchemaWrapper,我們只需extends KeyedSerializationSchemaWrapper即可。

package com.huatai.quant.service.flink.source;

import com.huatai.quant.utils.PartitionUtil;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.*;

public class KafkaCustomSerializationSchema extends KeyedSerializationSchemaWrapper<String> {
    private static Logger logger = LoggerFactory.getLogger(KafkaCustomSerializationSchema.class);

    public KafkaCustomSerializationSchema(SerializationSchema serializationSchema) {
        super(serializationSchema);
    }

    @Override
    public byte[] serializeKey(String element){
        /**
         *  step 1: convert element to hashmap
         */
        Map<String, String> hashMap = PartitionUtil.convertKafkaJsonToMap(element);

        /**
         *  step 2: use hashmap to build uniqueKey
         *  eg: RiskSummaryBO uk --> ATBOOKTRADE12-prop-20220323-FI-FUTURES-2-3
         *   RiskSummaryBO uk partten is : bookName- bookProp- asOfDate- displayType- displaySubType- calcDataSource- summaryType
         */
        List<String> ukAttribute = Arrays.asList("bookName","bookProp","asOfDate","displayType","displaySubType","calcDataSource","summaryType");
        StringBuilder finalKey = new StringBuilder();
        for(int i = 0; i < ukAttribute.size(); i++){
            if(i == ukAttribute.size() -1){
                finalKey.append(hashMap.get(ukAttribute.get(i)));
            }else {
                finalKey.append(hashMap.get(ukAttribute.get(i))).append("-");
            }
        }

        logger.info("The KEY for kafka message is = " + finalKey.toString());
        return finalKey.toString().getBytes(StandardCharsets.UTF_8);
    }
}

注意:

1.其構造參數必須傳入一個SerializationSchema,就可以傳入之前提到的普通序列化類SimpleStringSchema實例即可。

2.KeyedSerializationSchema接口其實也可以serializeValue (猜測:可以對原始消息做修改再pub),再以key, value pub出去。如需要,顯示override即可。

 

如何自定義分區策略

參數含義(按順序):

  • 原始消息
  • 提取出來key
  • 提取出來value(猜測:可以對原始消息做一次修改再pub)
  • topic
  • 分區的數組

 

package com.huatai.quant.service.flink.source;

import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

public class KafkaCustomPartitioner  extends FlinkKafkaPartitioner <String> {
    private static Logger logger = LoggerFactory.getLogger(KafkaCustomPartitioner.class);

    @Override
    public int partition(String s, byte[] bytes, byte[] bytes1, String s2, int[] ints) {
        if(ints != null && ints.length > 0){
            // 按key分配分區
            int partition = Math.abs(Arrays.toString(bytes).hashCode()) % ints.length;
            logger.info("Current element will send to partition = " + partition + " , and totally have " + ints.length + " partitions");
            return partition;
        }
        return 0;
    }
}

 

應用新的序列化策略+分區策略

//old version        
//riskSummaryString.addSink(new FlinkKafkaProducer<>("FICC-NATS-RISKSUMMARY-1", new SimpleStringSchema(), properties)).name("flink-connectors-kafka");

//new version       
riskSummaryString.addSink(new FlinkKafkaProducer<>("FICC-NATS-RISKSUMMARY-1", new KafkaCustomSerializationSchema(new SimpleStringSchema()), properties, Optional.of(new KafkaCustomPartitioner()))).name("flink-connectors-kafka");
    

 

 

參考文獻

Flink實戰:寫入Kafka自定義序列化類和自定義分區器 http://cache.baiducontent.com/c?m=p6eKCqLTI3i2O7VuI0SVEqWZj0N37MUI6zeq7Qnltp8YPeG7yfsRAYGX0ngSSI3ZAD8fUVBaFQY4s-8WmnQkPtpm2S79tmaeUQuzri1HcSjRCE5RifvCnCRWoDM2hgRGce7_FqedQnXJWgTXVEpk3517Iq4waEBZqR0xdHXHP_VMYLcSFBLwxXbllc6oHK9J&p=98769a4799b11cff57eb92204d08&newp=8565841f86cc47a901fcc7710f4492695803ed6339d3d301298ffe0cc4241a1a1a3aecbe25271604d6c37a6002a54a56eafa3770350834f1f689df08d2ecce7e7699&s=fc490ca45c00b124&user=baidu&fm=sc&query=FlinkKafkaProducer+%D7%D4%B6%A8%D2%E5+partitioner&qid=f76a5e9f000004d2&p1=3

kafka分區Partitioner使用 https://blog.csdn.net/u012129558/article/details/80075597

Kafka生產者分區partition策略 https://blog.csdn.net/qq_38262266/article/details/107356824

kafka分區策略 https://www.cnblogs.com/lincf/p/11985026.html

 

選型三:繼續優化 -- 指定分區消費;避免rebalance; 保證消息順序到達

consumer如何指定消費的partition

看下方結構圖的上半部分:

當生產者的數據根據品種進行了分區(債券,非債券),消費者此時也是拆成了多個實例(債券,非債券)。

為了讓“債券”消費者,指定消費“債券”partition0,需要使用這個方法:consumer.assign()直接消息指定分區

 

如何避免消費者rebalance,導致消費者消費的分區發生變化

設置consumer端參數partition.assignment.strategy=Sticky (再平衡時需要放棄原有分區) / CooperativeSticky (再平衡時不必放棄原有分區),這是因為Sticky算法會最大化保證消費分區方案的不變更。假設你的因果消息都有相同的key,那么結合Sticky算法有可能保證即使出現rebalance,要消費的分區依然有原來的consumer負責。

 

如何保證消費到達kafka順序,消息發送順序一致

一個生產者,發兩次消息,但是網絡原因,消息到達的順序和消息發送的順序不一致.
防止亂序可以通過設置max.in.flight.requests.per.connection=1來保證

 

 

 

選型四:繼續優化 -- 不同品種的消息,生產速錄不一致怎么辦

假如我現在的業務數據定義了三個key,但是這三個key對應的消息生產速率不一致,按照老師上面的示意圖展示的是,特定的key只會存儲在特定的一個分區中,那豈不是犧牲了拓展性么,如果其中一個key的生產速率非常大,而另外2個key沒那么大,卻會一直占用分區,不會造成分區的空間浪費嗎?

 

其實在生產環境中用key做邏輯區分並不太常見。如果不同key速率相差很大,可以考慮使用不同的topic

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM