作者:小傅哥
博客:https://bugstack.cn
沉淀、分享、成長,讓自己和他人都能有所收獲!😄
一、前言
不卷了,能用就行!
哈哈哈,說好的不卷了,能湊活用就行了。但每次接到新需求時都手癢,想結合着上一次的架構設計和落地經驗,在這一次需求上在迭代更新,或者找到完全顛覆之前的更優方案。卷完代碼的那一刻總是神清氣爽
其實大部分喜歡寫代碼的一類純粹碼農,都是比較卷的,就比如一個需求在實現上是能用大概是P5
、如果這個做出來的功能不只是能用還非常好用是P6
、除了好用還凝練共性需求開發成通用的組件服務是P7
。每一個成長過來的碼農,都是在造輪子的路上一次次驗證自己的想法和加以實踐,絕對不是一篇篇的八股文就能累出來一個高級的技術大牛。
二、延遲任務場景
什么是延遲任務?
當我們的實際業務需求場景中,有一些活動開始前的狀態變更、訂單結算后的T+1對賬、貸款單息費的產生,都是需要使用到延遲任務來進行觸達。實際的操作一般會有 Quartz、Schedule 來對你的庫表數據進行定時掃描和處理,當條件滿足后做數據狀態的變更或者產生新的數據插入到表中。
這樣一個簡單的需求就是延遲任務最初需求,如果需求前期內容較少、使用方不多,可能在實際開發中就只是一個單台機器直接對着表一頓輪訓就完事了。但隨着業務需求的發展和功能的復雜度提升,往往反饋到研發設計和實現,就不那么簡單了,比如:你需要保障盡可能低延遲完成較大規模的數據量掃描處理,否則就像貸款單息費的產生,已經到了第二天用戶還沒看到自己的息費信息或者是還款后的重新對賬,可能就這個時候就要產生客訴了。
那么,類似這樣的場景該如何設計呢?
三、延遲任務設計
通常的任務中心處理流程主要,主要是由定時任務掃描任務庫表,把即將達到超時時間的任務信息掃描到處理隊列(內存/MQ消息
),再由業務系統進行處理任務,處理完成后更新庫表中的任務狀態。
問題:
- 海量數據規模較大的任務列表數據,在分庫分表下該需要快速掃描。
- 任務掃描服務與業務邏輯處理,耦合在一起,不具有通用性和復用性。
- 細分任務體系有些是需要低延遲處理的,不能等待過長時間。
1. 任務表方式
除了一些較小的狀態變更場景,例如在各自業務的庫表中,就包含了一個狀態字段,這個字段一方面有程序邏輯處理變更的狀態,也有到達指定到期
時間后由任務服務自動變更處理的操作,一般這類功能,直接設計到自己的庫表中即可。
那么還有一些較大也較為頻繁使用的場景,如果都是在每個系統的各自所需的N多個表中,都添加這樣的字段進行維護,就顯得非常冗余了,也不那么易於維護。所以針對這樣的場景就很適合做一個通用的任務延時系統,各業務系統把需要被延時執行的動作提交到延時系統中,再有延時系統在指定時間下進行回調,回調的動作可以是接口或者MQ消息進行觸達。例如可以設計這樣一個任務調度表:
- 抽取的任務調度表,主要是拿到什么任務,在什么時間發起動作,具體的動作處理仍交給業務工程處理。
- 大批量的各自業務的任務進行集中處理,則需要設計一個分庫分表,滿足於后續業務體量的增長。
- 門牌號設計,針對一張表的掃描,如果數據量較大,又不希望只是一個任務掃描一個表,可以多個任務掃描一個表,加到掃描的體量。這個時候就需要一個門牌號來隔離不同任務掃描的范圍,避免掃描出重復的任務數據。
2. 低延遲方式
低延遲處理方案,是在任務表方式的基礎上,新增加的時間把控處理。它可以把即將到期的前一段時間的任務,放置到 Redis 集群隊里中,在消費的時候再從隊列中 pop 出來,這樣可以更快的接近任務的處理時效,避免因為掃庫間隔較大延遲任務執行。
- 在接收業務系統提交進來的延遲任務時,按照執行時間的長短放置到任務庫或者也同步到 Redis 集群中,一些執行時間較晚的任務則可以先放到任務庫,再通過掃描的方式添加到超時任務執行隊列中。
- 那么關於這塊的設計核心在於 Redis 隊列的使用,以及為了保證消費的可靠性需要引入二階段消費、注冊 ZK 注冊中心至少保證一次消費的處理。本文重點主要放在 Redis 隊列的設計,其他更多的邏輯處理,可以按照業務需求進行擴展和完善
Redis 消費隊列
- 按照消息體計算對應數據所屬的槽位
index = CRC32 & 7
- StoreQueue 采用 Slot 按照 SlotKey =
#{topic}_#{index}
和 Sorted Set 的數據結構按執行任務分數排序,存放任務執行信息。定時消息將時間戳作為分數,消費時每次彈出分數小於當前時間戳的一個消息 - 為了保障每條消息至少可消費一次,消費者不是直接 pop 有序集合中的元素,而是將元素從 StoreQueue 移動到 PrepareQueue 並返回消息給消費者。消費成功后再從 PrepareQueue 從刪除,如果消費失敗則從PreapreQueue 重新移動到 StoreQueue,這樣二階段消費的方式進行處理。
- 參考文檔:2021 阿里技術人的百寶黑皮書PDF文,
低延遲的超時中心實現方式
簡單案例
@Test
public void test_delay_queue() throws InterruptedException {
RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("TASK");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
new Thread(() -> {
try {
while (true){
Object take = blockingQueue.take();
System.out.println(take);
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
int i = 0;
while (true){
delayedQueue.offerAsync("測試" + ++i, 100L, TimeUnit.MILLISECONDS);
Thread.sleep(1000L);
}
}
測試數據
2022-02-13 WARN 204760 --- [ Finalizer] i.l.c.resource.DefaultClientResources : io.lettuce.core.resource.DefaultClientResources was not shut down properly, shutdown() was not called before it's garbage-collected. Call shutdown() or shutdown(long,long,TimeUnit)
測試1
測試2
測試3
測試4
測試5
Process finished with exit code -1
- 源碼:https://github.com/fuzhengwei/TimeOutCenter
- 描述:使用 redisson 中的 DelayedQueue 作為消息隊列,寫入后等待消費時間進行 POP 消費。
四、總結
- 調度任務的使用在實際的場景中非常頻繁,例如我們經常使用 xxl-job,也有一些大廠自研的分布式任務調度組件,這些可能原本都是很小很簡單的功能,但經過抽象、整合、提煉,變成了一個個核心通用的中間件服務。
- 當我們在考慮使用任務調度的時候,無論哪種方式的設計和實現,都需要考慮這個功能使用時候的以為迭代和維護性,如果僅僅是一個非常小的場景,又沒多少人使用的話,那么在自己機器上折騰就可以。過渡的設計和使用有時候也會把研發資源代入泥潭
- 其實各項技術的知識點,都像是一個個工具,刀槍棍棒斧鉞鈎,那能怎么結合各自的特點,把這些兵器用起來,才是一個程序員不斷成長的過程。如果你希望了解更多此類有深度的技術內容,可以加入 Lottery 分布式抽獎秒殺系統 學習更有價值的更抗用的實戰手段。