前言
本文針對解決Kafka不同Topic之間存在一定的數據關聯時的順序消費問題。
如存在Topic-insert和Topic-update分別是對數據的插入和更新,當insert和update操作為同一數據時,應保證先insert再update。
1、問題引入
kafka的順序消費一直是一個難以解決的問題,kafka的消費策略是對於同Topic同Partition的消息可保證順序消費,其余無法保證。如果一個Topic只有一個Partition,那么這個Topic對應consumer的消費必然是有序的。不同的Topic的任何情況下都無法保證consumer的消費順序和producer的發送順序一致。
如果不同Topic之間存在數據關聯且對消費順序有要求,該如何處理?本文主要解決此問題。
2、解決思路
現有Topic-insert和Topic-update,數據唯一標識為id,對於id=1的數據而言,要保證Topic-insert消費在前,Topic-update消費在后。
兩個Topic的消費為不同線程處理,所以為了保證在同一時間內的同一數據標識的消息僅有一個業務邏輯在處理,需要對業務添加鎖操作。
使用synchronized進行加鎖的話,會影響無關聯的insert和update的數據消費能力,如id=1的insert和id=2的update,在synchronized的情況下,無法並發處理,這是沒有必要的,我們需要的是對於id=1的insert和id=1的update在同一時間只有一個在處理,所以使用細粒度鎖來完成加鎖的操作。
細粒度鎖實現:https://blog.csdn.net/qq_38245668/article/details/105891161
PS:如果為分布式系統,細粒度鎖需要使用分布式鎖的對應實現。
在對insert和update加鎖之后,其實還是沒有解決消費順序的問題,只是確保了同一時間只有一個業務在處理。 對於消費順序異常的問題,也就是先消費了update再消費insert的情況。
處理方式:消費到update數據,校驗庫中是否存在當前數據(也就是是否執行insert),如果沒有,就將當前update數據存入緩存,key為數據標識id,在insert消費時檢查是否存在id對應的update緩存,如果有,就證明當前數據的消費順序異常,需執行update操作,再將緩存數據移除。
3、實現方案
消息發送:
kafkaTemplate.send("TOPIC_INSERT", "1");
kafkaTemplate.send("TOPIC_UPDATE", "1");
監聽代碼示例:
KafkaListenerDemo.java
@Component
@Slf4j
public class KafkaListenerDemo {
// 消費到的數據緩存
private Map<String, String> UPDATE_DATA_MAP = new ConcurrentHashMap<>();
// 數據存儲
private Map<String, String> DATA_MAP = new ConcurrentHashMap<>();
private WeakRefHashLock weakRefHashLock;
public KafkaListenerDemo(WeakRefHashLock weakRefHashLock) {
this.weakRefHashLock = weakRefHashLock;
}
@KafkaListener(topics = "TOPIC_INSERT")
public void insert(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
// 模擬順序異常,也就是insert后消費,這里線程sleep
Thread.sleep(1000);
String id = record.value();
log.info("接收到insert :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
log.info("開始處理 {} 的insert", id);
// 模擬 insert 業務處理
Thread.sleep(1000);
// 從緩存中獲取 是否存在有update數據
if (UPDATE_DATA_MAP.containsKey(id)){
// 緩存數據存在,執行update
doUpdate(id);
}
log.info("處理 {} 的insert 結束", id);
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
@KafkaListener(topics = "TOPIC_UPDATE")
public void update(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws InterruptedException{
String id = record.value();
log.info("接收到update :: {}", id);
Lock lock = weakRefHashLock.lock(id);
lock.lock();
try {
// 測試使用,不做數據庫的校驗
if (!DATA_MAP.containsKey(id)){
// 未找到對應數據,證明消費順序異常,將當前數據加入緩存
log.info("消費順序異常,將update數據 {} 加入緩存", id);
UPDATE_DATA_MAP.put(id, id);
}else {
doUpdate(id);
}
}finally {
lock.unlock();
}
acknowledgment.acknowledge();
}
void doUpdate(String id) throws InterruptedException{
// 模擬 update
log.info("開始處理update::{}", id);
Thread.sleep(1000);
log.info("處理update::{} 結束", id);
}
}
日志(代碼中已模擬必現消費順序異常的場景):
接收到update ::1
消費順序異常,將update數據 1 加入緩存
接收到insert ::1
開始處理 1 的insert
開始處理update::1
處理update::1 結束
處理 1 的insert 結束
觀察日志,此方案可正常處理不同Topic再存在數據關聯的消費順序問題。
版權聲明:本文為CSDN博主「方片龍」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/qq_38245668/article/details/105900011
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2022最新版)
4.別再寫滿屏的爆爆爆炸類了,試試裝飾器模式,這才是優雅的方式!!
覺得不錯,別忘了隨手點贊+轉發哦!