kafka監聽出現的問題,解決和剖析


問題如下:

  1. kafka為什么監聽不到數據
  2. kafka為什么會有重復數據發送
  3. kafka數據重復如何解決
  4. 為什么kafka會出現倆個消費端都可以消費問題
  5. kafka監聽配置文件

 

一. 解決問題一(kafka監聽不到數據)

  首先kafka監聽不得到數據,檢查如下

  1. 檢查配置文件是否正確(可能會出現改了監聽地址,監聽Topic,監聽的地址的數量問題)
  2. 檢查接收數據的正確性(比如原生的代碼,可能是用byte序列化接收的數據,而你接收使用String。也是配置文件序列化問題,還有與發送者商量問題)
  3. 檢查kafka版本問題(一般的版本其實是沒什么問題的,只有個別版本會出現監聽不到問題)
  4. 沒有加
    @Component    犯了最不應該出差錯的問題

  如果出現監聽不到數據的問題,那么就試試更改方法一二,如果不可以在去試試方法三,之前出現這個問題也是查過 一般查到都會說  “低版本的服務器接收不到高版本的生產者發送的消息”,但是凈由測試使用 用1.0.5RELEASE 和 2.6.3反復測試,並沒有任何的問題。

如果按照版本一致,那么根本就不現實,因為可能不同的項目,springboot版本不一致的話,可能有的springboot版本低,那么你還得要求自己維護項目版本升級?如果出現第四種情況就無話可說了。

 

二. 解決問題二(kafka為什么會有重復數據發送)

  重復數據的發送問題如下

  1. 可能在發送者的那里的事務問題。mysql存儲事務發生異常導致回滾操作,但是kafka消息卻是已經發送到了服務器中。此事肯定會出現重復問題
  2. 生產者設置時間問題,生產發送設置的時間內,消息沒完成發送,生產者以為消費者掛掉,便重新發送一個,導致重復
  3. offset問題,當項目重啟,offset走到某一個位置已扔到kafka服務器中,但是項目被重啟.那么offset會是在原本重啟的那一個點的地方再次發送一次,這是kafka設計的問題,防止出現丟失數據問題

 

三. 解決問題三(kafka數據重復如何解決)

  目前我是使用的Redis進行的排重法,用的是Redis中的set,保證里面不存在重復,保證Redis里面不會存入太多的臟數據。並定期清理

  粘貼一下我的排重(Redis排重法)

        //kafka prefix
            String cache = "kafka_cache";
            //kafka suffix
            Calendar c = Calendar.getInstance();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd  HH:mm:ss");
            //0點,目前是為了設置為這一天的固定時間。這個完全可以去寫個工具類自己弄,為了看的更清楚,麻煩了一點的寫入
            SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd  00:00:00");
            String gtimeStart = sdf2.format(c.getTime());
            long time = sdf.parse(gtimeStart).getTime();


            //此位置為了設置是否是新的一天,新的一天需要設置定時時間,保證redis中不會存儲太多無用數據
            Boolean flag = false;
            //數據接收
            Set<String> range = new HashSet<>();
            //判斷是否存在
            if (redisTemplate.hasKey(cache + time)) {
                //存在則取出這個set
                range = redisTemplate.opsForSet().members(cache + time);
            }else {
                //不存在,則為下面過期時間的設置鋪墊
                flag = true;
            }
            //判斷監聽到的數據是否是重復
            if (range.contains("測試需要")) {
                //重復則排出,根據邏輯自己修改
                continue;
            } else {
                //添加進去
                redisTemplate.opsForSet().add(cache + time, i+"");
                if (flag){
                    //設置為24小時,保證新一天使用,之前使用的存儲會消失掉
                    redisTemplate.expire(cache + time,24,TimeUnit.HOURS);
                    //不會在進入這個里面,如果多次的存入過期時間,那么這個key的過期時間就永遠是24小時,一直就不會過期
                    flag = false;
                }
            }

 

 

四. 解決問題四(為什么kafka會出現倆個消費端都可以消費問題)

  原因是因為在不同group-id之下,kafka接收到以后,會給監聽他的每一個組發送一個他所收到的消息,但是兩個消費端監聽同一個group-id,那么就只有一個消費端可以消費到。

 

 

五. 粘一下我的監聽配置文件

# 指定kafka 代理地址,可以多個,用逗號間隔
spring.kafka.bootstrap-servers= localhost:9092
# 指定默認消費者group id
spring.kafka.consumer.group-id= test
# 是否自動提交
spring.kafka.consumer.enable-auto-commit= true
# 提交間隔的毫秒
spring.kafka.consumer.auto-commit-interval.ms=60000
# 最大輪詢的次數
spring.kafka.consumer.max-poll-records=1
# 將偏移量重置為最新偏移量
spring.kafka.consumer.auto-offset-reset=earliest
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

 

 

如有什么地方錯誤或者不明白請下方評論指出,謝謝。討論解決使我們共同進步


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM