簡介: 訂單狀態流轉是交易系統的最為核心的工作,訂單系統往往都會存在狀態多、鏈路長、邏輯復雜的特點,還存在多場景、多類型、多業務維度等業務特性。在保證訂單狀態流轉穩定性的前提下、可擴展性和可維護性是我們需要重點關注和解決的問題。
作者 | 亮言
來源 | 阿里技術公眾號
一 背景
訂單狀態流轉是交易系統的最為核心的工作,訂單系統往往都會存在狀態多、鏈路長、邏輯復雜的特點,還存在多場景、多類型、多業務維度等業務特性。在保證訂單狀態流轉穩定性的前提下、可擴展性和可維護性是我們需要重點關注和解決的問題。
以高德打車業務的訂單狀態為例,訂單狀態就有乘客下單、司機接單、司機已到達乘車點、開始行程、行程結束、確認費用、支付成功、訂單取消、訂單關閉等;訂單車型有專車、快車、出租車等幾種車型,而專車又分舒適型、豪華型、商務型等;業務場景接送機、企業用車、城際拼車等等場景。
當訂單狀態、類型、場景、以及其他一些維度組合時,每一種組合都可能會有不同的處理邏輯、也可能會存在共性的業務邏輯,這種情況下代碼中各種if-else肯定是不敢想象的。怎么處理這種"多狀態+多類型+多場景+多維度"的復雜訂單狀態流轉業務,又要保證整個系統的可擴展性和可維護性,本文的解決思路和方案同大家一起探討。
二 實現方案
要解決"多狀態+多類型+多場景+多維度"的復雜訂單狀態流轉業務,我們從縱向和橫向兩個維度進行設計。縱向主要從業務隔離和流程編排的角度出發解決問題、而橫向主要從邏輯復用和業務擴展的角度解決問題。
1 縱向解決業務隔離和流程編排
狀態模式的應用
通常我們處理一個多狀態或者多維度的業務邏輯,都會采用狀態模式或者策略模式來解決,我們這里不討論兩種設計模式的異同,其核心其實可以概括為一個詞"分而治之",抽象一個基礎邏輯接口、每一個狀態或者類型都實現該接口,業務處理時根據不同的狀態或者類型調用對應的業務實現,以到達邏輯相互獨立互不干擾、代碼隔離的目的。
這不僅僅是從可擴展性和可維護性的角度出發,其實我們做架構做穩定性、隔離是一種減少影響面的基本手段,類似的隔離環境做灰度、分批發布等,這里不做擴展。
/** * 狀態機處理器接口 */ public interface StateProcessor { /** * 執行狀態遷移的入口 */ void action(StateContext context) throws Exception; }
/** * 狀態A對應的狀態處理器 */ public class StateAProcessor interface StateProcessor { /** * 執行狀態遷移的入口 */ @Override public void action(StateContext context) throws Exception { } }
單一狀態或類型可以通過上面的方法解決,那么"多狀態+多類型+多場景+多維度"這種組合業務呢,當然也可以采用這種模式或思路來解決。首先在開發階段通過一個注解@OrderPorcessor將不同的維度予以組合、開發出多個對應的具體實現類,在系統運行階段,通過判斷上下文來動態選擇具體使用哪一個實現類執行。@OrderPorcessor中分別定義state代表當前處理器要處理的狀態,bizCode和sceneId分別代表業務類型和場景,這兩個字段留給業務進行擴展,比如可以用bizCode代表產品或訂單類型、sceneId代表業務形態或來源場景等等,如果要擴展多個維度的組合、也可以用多個維度拼接后的字符串賦值到bizCode和sceneId上。
受限於Java枚舉不能繼承的規范,如果要開發通用的功能、注解中就不能使用枚舉、所以此處只好使用String。
/** * 狀態機引擎的處理器注解標識 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Component public @interface OrderProcessor { /** * 指定狀態,state不能同時存在 */ String[] state() default {}; /** * 業務 */ String[] bizCode() default {}; /** * 場景 */ String[] sceneId() default {}; }
/** * 創建訂單狀態對應的狀態處理器 */ @OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5") public class StateCreateProcessor interface StateProcessor { }
再想一下,因為涉及到狀態流轉,不可能會是一個狀態A只能流轉到狀態B、狀態A可能在不同的場景下流轉到狀態B、狀態C、狀態D;還有雖然都是由狀態A流轉到狀態B、但是不同的場景處理流程也可能不一樣,比如都是將訂單從從待支付狀態進行支付、用戶主動發起支付和系統免密支付的流程可能就不一樣。針對上面這兩種情況、我們把這里的"場景"統一封裝為"事件(event)",以"事件驅動"的方式來控制狀態的流向,一個狀態遇到一個特定的處理事件來決定該狀態的業務處理流程和最終狀態流向。我們可以總結下,其實狀態機模式簡單說就是:基於某些特定業務和場景下,根據源狀態和發生的事件,來執行下一步的流程處理邏輯,並設置一個目標狀態。
這里有人可能有一些疑問,這個"事件"和上面說的"多場景"、"多維度"有什么不一樣。解釋一下,我們這里說的是"事件"是一個具體的業務要執行的動作,比如用戶下單是一個業務事件、用戶取消訂單是一個業務事件、用戶支付訂單也是一個業務事件。而"多場景"、"多維度"則是可交由業務自行進行擴展的維度,比如自有標准模式來源的訂單、通過開放平台API來的訂單、通過第三方標准來源的訂單,某某小程序、某某APP來源可以定義為不同場景,而接送機、企業用車、拼車等可以定義為維度。
public @interface OrderProcessor { /** * 指定狀態 */ String[] state() default {}; /** * 訂單操作事件 */ String event(); ...... }
/** * 訂單狀態遷移事件 */ public interface OrderStateEvent { /** * 訂單狀態事件 */ String getEventType(); /** * 訂單ID */ String getOrderId(); /** * 如果orderState不為空,則代表只有訂單是當前狀態才進行遷移 */ default String orderState() { return null; } /** * 是否要新創建訂單 */ boolean newCreate(); }
狀態遷移流程的封裝
在滿足了上面說的多維度組合的業務場景、開發多個實現類來執行的情況,我們思考執行這些實現類在流程上是否有再次抽象和封裝的地方、以減少研發工作量和盡量的實現通用流程。我們經過觀察和抽象,發現每一個訂單狀態流轉的流程中,都會有三個流程:校驗、業務邏輯執行、數據更新持久化;於是再次抽象,可以將一個狀態流轉分為數據准備(prepare)——>校驗(check)——>獲取下一個狀態(getNextState)——>業務邏輯執行(action)——>數據持久化(save)——>后續處理(after)這六個階段;然后通過一個模板方法將六個階段方法串聯在一起、形成一個有順序的執行邏輯。這樣一來整個狀態流程的執行邏輯就更加清晰和簡單了、可維護性上也得到的一定的提升。
/** * 狀態遷移動作處理步驟 */ public interface StateActionStep<T, C> { /** * 准備數據 */ default void prepare(StateContext<C> context) { } /** * 校驗 */ ServiceResult<T> check(StateContext<C> context); /** * 獲取當前狀態處理器處理完畢后,所處於的下一個狀態 */ String getNextState(StateContext<C> context); /** * 狀態動作方法,主要狀態遷移邏輯 */ ServiceResult<T> action(String nextState, StateContext<C> context) throws Exception; /** * 狀態數據持久化 */ ServiceResult<T> save(String nextState, StateContext<C> context) throws Exception; /** * 狀態遷移成功,持久化后執行的后續處理 */ void after(StateContext<C> context); }
/** * 狀態機處理器模板類 */ @Component public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T, C>, StateActionStep<T, C> { @Override public final ServiceResult<T> action(StateContext<C> context) throws Exception { ServiceResult<T> result = null; try { // 數據准備 this.prepare(context); // 串行校驗器 result = this.check(context); if (!result.isSuccess()) { return result; } // getNextState不能在prepare前,因為有的nextState是根據prepare中的數據轉換而來 String nextState = this.getNextState(context); // 業務邏輯 result = this.action(nextState, context); if (!result.isSuccess()) { return result; } // 持久化 result = this.save(nextState, context); if (!result.isSuccess()) { return result; } // after this.after(context); return result; } catch (Exception e) { throw e; } }
/** * 狀態A對應的狀態處理器 */ @OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5") public class StateCreateProcessor extends AbstractStateProcessor<String, CreateOrderContext> { ...... }
(1)校驗器
上面提到了校驗(check),我們都知道任何一個狀態的流轉甚至接口的調用其實都少不了一些校驗規則,尤其是對於復雜的業務、其校驗規則和校驗邏輯也會更加復雜。那么對於這些校驗規則怎么解耦呢,既要將校驗邏輯從復雜的業務流程中解耦出來、同時又需要把復雜的校驗規則簡單化,使整個校驗邏輯更具有可擴展性和可維護性。其實做法也比較簡單、參考上面的邏輯,只需要抽象一個校驗器接口checker、把復雜的校驗邏輯拆開、形成多個單一邏輯的校驗器實現類,狀態處理器在調用check時只需要調用一個接口、由校驗器執行多個checker的集合就可以了。將校驗器checker進行封裝之后,發現要加入一個新的校驗邏輯就十分簡單了,只需要寫一個新的checker實現類加入校驗器就行、對其他代碼基本沒有改動。
/** * 狀態機校驗器 */ public interface Checker<T, C> { ServiceResult<T> check(StateContext<C> context); /** * 多個checker時的執行順序 */ default int order() { return 0; } }
邏輯簡單了、擴展性和維護性解決了、性能問題就會顯現出來。多個校驗器checker串行執行性能肯定性能比較差,此時很簡單的可以想到使用並行執行,是的、此處使用多線程並行執行多個校驗器checker能顯著提高執行效率。但是也應該意識到,有些校驗器邏輯可能是有前后依賴的(其實不應該出現),還有寫業務流程中要求某些校驗器的執行必須有前后順序,還有些流程不要求校驗器的執行順序但是要求錯誤時的返回順序、那么怎么在並行的前提下保證順序呢、此處就可以用order+Future實現了。經過一系列的思考和總結,我們把校驗器分為參數校驗(paramChecker)、同步校驗(syncChecker)、異步校驗(asyncChecker)三種類型,其中參數校驗paramChecker是需要在狀態處理器最開始處執行的,為什么這么做、因為參數都不合法了肯定沒有繼續向下執行的必要了。
/** * 狀態機校驗器 */ public interface Checkable { /** * 參數校驗 */ default List<Checker> getParamChecker() { return Collections.EMPTY_LIST; } /** * 需同步執行的狀態檢查器 */ default List<Checker> getSyncChecker() { return Collections.EMPTY_LIST; } /** * 可異步執行的校驗器 */ default List<Checker> getAsyncChecker() { return Collections.EMPTY_LIST; } }
/** * 校驗器的執行器 */ public class CheckerExecutor { /** * 執行並行校驗器, * 按照任務投遞的順序判斷返回。 */ public ServiceResult<T, C> parallelCheck(List<Checker> checkers, StateContext<C> context) { if (!CollectionUtils.isEmpty(checkers)) { if (checkers.size() == 1) { return checkers.get(0).check(context); } List<Future<ServiceResult>> resultList = Collections.synchronizedList(new ArrayList<>(checkers.size())); checkers.sort(Comparator.comparingInt(Checker::order)); for (Checker c : checkers) { Future<ServiceResult> future = executor.submit(() -> c.check(context)); resultList.add(future); } for (Future<ServiceResult> future : resultList) { try { ServiceResult sr = future.get(); if (!sr.isSuccess()) { return sr; } } catch (Exception e) { log.error("parallelCheck executor.submit error.", e); throw new RuntimeException(e); } } } return new ServiceResult<>(); } }
checkable在模板方法中的使用。
public interface StateActionStep<T, C> { Checkable getCheckable(StateContext<C> context); .... }
public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T>, StateActionStep<T, C> { @Resource private CheckerExecutor checkerExecutor; @Override public final ServiceResult<T> action(StateContext<C> context) throws Exception { ServiceResult<T> result = null; Checkable checkable = this.getCheckable(context); try { // 參數校驗器 result = checkerExecutor.serialCheck(checkable.getParamChecker(), context); if (!result.isSuccess()) { return result; } // 數據准備 this.prepare(context); // 串行校驗器 result = checkerExecutor.serialCheck(checkable.getSyncChecker(), context); if (!result.isSuccess()) { return result; } // 並行校驗器 result = checkerExecutor.parallelCheck(checkable.getAsyncChecker(), context); if (!result.isSuccess()) { return result; } ...... }
checkable在具體狀態處理器中的代碼應用舉例。
@OrderProcessor(state = "INIT", bizCode = {"CHEAP","POPULAR"}, sceneId = "H5") public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> { @Resource private CreateParamChecker createParamChecker; @Resource private UserChecker userChecker; @Resource private UnfinshChecker unfinshChecker; @Override public Checkable getCheckable(StateContext<CreateOrderContext> context) { return new Checkable() { @Override public List<Checker> getParamChecker() { return Arrays.asList(createParamChecker); } @Override public List<Checker> getSyncChecker() { return Collections.EMPTY_LIST; } @Override public List<Checker> getAsyncChecker() { return Arrays.asList(userChecker, unfinshChecker); } }; } ......
checker的定位是校驗器,負責校驗參數或業務的合法性,但實際編碼過程中、checker中可能會有一些臨時狀態類操作,比如在校驗之前進行計數或者加鎖操作、在校驗完成后根據結果進行釋放,這里就需要支持統一的釋放功能。
public interface Checker<T, C> { ...... /** * 是否需求release */ default boolean needRelease() { return false; } /** * 業務執行完成后的釋放方法, * 比如有些業務會在checker中加一些狀態操作,等業務執行完成后根據結果選擇處理這些狀態操作, * 最典型的就是checker中加一把鎖,release根據結果釋放鎖. */ default void release(StateContext<C> context, ServiceResult<T> result) { } }
public class CheckerExecutor { /** * 執行checker的釋放操作 */ public <T, C> void releaseCheck(Checkable checkable, StateContext<C> context, ServiceResult<T> result) { List<Checker> checkers = new ArrayList<>(); checkers.addAll(checkable.getParamChecker()); checkers.addAll(checkable.getSyncChecker()); checkers.addAll(checkable.getAsyncChecker()); checkers.removeIf(Checker::needRelease); if (!CollectionUtils.isEmpty(checkers)) { if (checkers.size() == 1) { checkers.get(0).release(context, result); return; } CountDownLatch latch = new CountDownLatch(checkers.size()); for (Checker c : checkers) { executor.execute(() -> { try { c.release(context, result); } finally { latch.countDown(); } }); } try { latch.await(); } catch (InterruptedException e) { throw new RuntimeException(e); } } } }
2)上下文
從上面代碼可以發現,整個狀態遷移的幾個方法都是使用上下文Context對象串聯的。Context對象中一共有三類對象,(1)訂單的基本信息(訂單ID、狀態、業務屬性、場景屬性)、(2)事件對象(其參數基本就是狀態遷移行為的入參)、(3)具體處理器決定的泛型類。一般要將數據在多個方法中進行傳遞有兩種方案:一個是包裝使用ThreadLocal、每個方法都可以對當前ThreadLocal進行賦值和取值;另一種是使用一個上下文Context對象做為每個方法的入參傳遞。這種方案都有一些優缺點,使用ThreadLocal其實是一種"隱式調用",雖然可以在"隨處"進行調用、但是對使用方其實不明顯的、在中間件中會大量使用、在開發業務代碼中是需要盡量避免的;而使用Context做為參數在方法中進行傳遞、可以有效的減少"不可知"的問題。
不管是使用ThreadLocal還是Context做為參數傳遞,對於實際承載的數據載體有兩種方案,常見的是使用Map做為載體,業務在使用的時候可以根據需要隨意的設置任何kv,但是這種情況對代碼的可維護性和可讀性是極大的挑戰,所以這里使用泛型類來固定數據格式,一個具體的狀態處理流程到底需要對哪些數據做傳遞需要明確定義好。其實原則是一樣的,業務開發盡量用用可見性避免不可知。
public class StateContext<C> { /** * 訂單操作事件 */ private OrderStateEvent orderStateEvent; /** * 狀態機需要的訂單基本信息 */ private FsmOrder fsmOrder; /** * 業務可定義的上下文泛型對象 */ private C context; public StateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) { this.orderStateEvent = orderStateEvent; this.fsmOrder = fsmOrder; } ......
/** * 狀態機引擎所需的訂單信息基類信息 */ public interface FsmOrder { /** * 訂單ID */ String getOrderId(); /** * 訂單狀態 */ String getOrderState(); /** * 訂單的業務屬性 */ String bizCode(); /** * 訂單的場景屬性 */ String sceneId(); }
(3)遷移到的狀態判定
為什么要把下一個狀態(getNextState)抽象為單獨一個步驟、而不是交由業務自己進行設置呢?是因為要遷移到的下一個狀態不一定是固定的,就是說根據當前狀態和發生的事件、再遇到更加細節的邏輯時也可能會流轉到不同的狀態。舉個例子,當前狀態是用戶已下單完成、要發生的事件是用戶取消訂單,此時根據不同的邏輯,訂單有可能流轉到取消狀態、也有可能流轉到取消待審核狀態、甚至有可能流轉到取消待支付費用狀態。當然這里要取決於業務系統對狀態和事件定義的粗細和狀態機的復雜程度,做為狀態機引擎、這里把下一個狀態的判定交由業務根據上下文對象自己來判斷。
getNextState()使用及狀態遷移持久化舉例:
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS") public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> { ........ @Override public String getNextState(StateContext<CreateOrderContext> context) { // if (context.getOrderStateEvent().getEventType().equals("xxx")) { // return OrderStateEnum.INIT; // } return OrderStateEnum.NEW; } @Override public ServiceResult<String> save(String nextState, StateContext<CreateOrderContext> context) throws Exception { OrderInfo orderInfo = context.getContext().getOrderInfo(); // 更新狀態 orderInfo.setOrderState(nextState); // 持久化 // this.updateOrderInfo(orderInfo); log.info("save BUSINESS order success, userId:{}, orderId:{}", orderInfo.getUserId(), orderInfo.getOrderId()); return new ServiceResult<>(orderInfo.getOrderId(), "business下單成功"); } }
狀態消息
一般來說,所有的狀態遷移都應該發出對應的消息,由下游消費方訂閱進行相應的業務處理。
(1)狀態消息內容
對於狀態遷移消息的發送內容通常有兩種形式,一個是只發狀態發生遷移這個通知、舉例子就是只發送"訂單ID、變更前狀態、變更后狀態"等幾個關鍵字段,具體下游業務需要哪些具體內容在調用相應的接口進行反查;還有一種是發送所有字段出去、類似於發一個狀態變更后的訂單內容快照,下游接到消息后幾乎不需要在調用接口進行反查。
(2)狀態消息的時序
狀態遷移是有時序的,因此很多下游依賴方也需要判斷消息的順序。一種實現方案是使用順序消息(rocketmq、kafka等),但基於並發吞吐量的考慮很少采用這種方案;一般都是在消息體中加入"消息發送時間"或者"狀態變更時間"字段,有消費方自己進行處理。
(3)數據庫狀態變更和消息的一致性
狀態變更需要和消息保持一致嗎?
很多時候是需要的,如果數據庫狀態變更成功了、但是狀態消息沒有發送出去、則會導致一些下游依賴方處理邏輯的缺失。而我們知道,數據庫和消息系統是無法保證100%一致的,我們要保證的是主要數據庫狀態變更了、消息就要盡量接近100%的發送成功。
那么怎么保證呢?
其實通常確實有幾種方案:
a)使用rocketmq等支持的兩階段式消息提交方式:
- 先向消息服務器發送一條預處理消息
- 當本地數據庫變更提交之后、再向消息服務器發送一條確認發送的消息
- 如果本地數據庫變更失敗、則向消息服務器發送一條取消發送的消息
- 如果長時間沒有向消息服務器發生確認發送的消息,消息系統則會回調一個提前約定的接口、來查看本地業務是否成功,以此決定是否真正發生消息
b)使用數據庫事務方案保證:
- 創建一個消息發送表,將要發送的消息插入到該表中,同本地業務在一個數據庫事務中進行提交
- 之后在由一個定時任務來輪詢發送、直到發送成功后在刪除當前表記錄
c)還是使用數據庫事務方案保證:
- 創建一個消息發送表,將要發送的消息插入到該表中,同本地業務在一個數據庫事務中進行提交
- 向消息服務器發送消息
- 發送成功則刪除掉當前表記錄
- 對於沒有發送成功的消息(也就是表里面沒有被刪除的記錄),再由定時任務來輪詢發送
還有其他方案嗎?有的。
d)數據對賬、發現不一致時進行補償處理、以此保證數據的最終一致。其實不管使用哪種方案來保證數據庫狀態變更和消息的一致,數據對賬的方案都是"必須"要有的一種兜底方案。
那么、還有其他方案嗎?還是有的,對於數據庫狀態變更和消息的一致性的問題,細節比較多,每種方案又都有相應的優缺點,本文主要是介紹狀態機引擎的設計,對於消息一致性的問題就不過多介紹,后面也許會有單獨的文章對數據庫變更和消息的一致性的問題進行介紹和討論。
2 橫向解決邏輯復用和實現業務擴展
實現基於"多類型+多場景+多維度"的代碼分離治理、以及標准處理流程模板的狀態機模型之后,其實在真正編碼的時候會發現不同類型不同維度對於同一個狀態的流程處理過程,有時多個處理邏輯中的一部分流程一樣的或者是相似的,比如支付環節不管是采用免密還是其他方式、其中核銷優惠券的處理邏輯、設置發票金額的處理邏輯等都是一樣的;甚至有些時候多個類型間的處理邏輯大部分是相同的而差異是小部分,比如下單流程的處理邏輯基本邏輯都差不多,而出租車對比網約車可能就多了出租車紅包、無預估價等個別流程的差異。
對於上面這種情況、其實就是要實現在縱向解決業務隔離和流程編排的基礎上,需要支持小部分邏輯或代碼段的復用、或者大部分流程的復用,減少重復建設和開發。對此我們在狀態機引擎中支持兩種解決方案:
基於插件化的解決方案
插件的主要邏輯是:可以在業務邏輯執行(action)、數據持久化(save)這兩個節點前加載對應到的插件類進行執行,主要是對上下文Context對象進行操作、或者根據Context參數發起不同的流程調用,已到達改變業務數據或流程的目的。
(1)標准流程+差異化插件
上面講到同一個狀態模型下、不同的類型或維度有些邏輯或處理流程是一樣的小部分邏輯是不同的。於是我們可以把一種處理流程定義為標准的或默認的處理邏輯,把差異化的代碼寫成插件,當業務執行到具體差異化邏輯時會調用到不同的插件進行處理,這樣只需要為不同的類型或維度編寫對應有差異邏輯的插件即可、標准的處理流程由默認的處理器執行就行。
(2)差異流程+公用插件
當然對於小部分邏輯和代碼可以公用的場景,也可以用插件化的方案解決。比如對於同一個狀態下多個維修下不同處理器中、我們可以把相同的邏輯或代碼封裝成一個插件,多個處理器中都可以識別加載該插件進行執行,從而實現多個差異的流程使用想用插件的形式。
/** * 插件注解 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Component public @interface ProcessorPlugin { /** * 指定狀態,state不能同時存在 */ String[] state() default {}; /** * 訂單操作事件 */ String event(); /** * 業務 */ String[] bizCode() default {}; /** * 場景 */ String[] sceneId() default {}; }
* 插件處理器
*/
public interface PluginHandler<T, C> extends StateProcessor<T, C> { }
Plug在處理器模板中的執行邏輯。
public abstract class AbstractStateProcessor<T, C> implements StateProcessor<T>, StateActionStep<T, C> { @Override public final ServiceResult<T> action(StateContext<C> context) throws Exception { ServiceResult<T> result = null; try { ...... // 業務邏輯 result = this.action(nextState, context); if (!result.isSuccess()) { return result; } // 在action和save之間執行插件邏輯 this.pluginExecutor.parallelExecutor(context); // 持久化 result = this.save(nextState, context)); if (!result.isSuccess()) { return result; } ...... } catch (Exception e) { throw e; } }
插件使用的例子:
/** * 預估價插件 */ @ProcessorPlugin(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS") public class EstimatePricePlugin implements PluginHandler<String, CreateOrderContext> { @Override public ServiceResult action(StateContext<CreateOrderContext> context) throws Exception { // String price = priceSerive.getPrice(); String price = ""; context.getContext().setEstimatePriceInfo(price); return new ServiceResult(); } }
基於代碼繼承方式的解決方案
當發現新增一個狀態不同維度的處理流程,和當前已存在的一個處理器大部分邏輯是相同的,此時就可以使新寫的這個處理器B繼承已存在的處理器A,只需要讓處理器B覆寫A中不同方法邏輯、實現差異邏輯的替換。這種方案比較好理解,但是需要處理器A已經規划好一些可以擴展的點、其他處理器可以基於這些擴展點進行覆寫替換。當然更好的方案其實是,先實現一個默認的處理器,把所有的標准處理流程和可擴展點進行封裝實現、其他處理器進行繼承、覆寫、替換就好。
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "CHEAP") public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> { @Override public ServiceResult action(String nextState, StateContext<CreateOrderContext> context) throws Exception { CreateEvent createEvent = (CreateEvent) context.getOrderStateEvent(); // 促銷信息信息 String promtionInfo = this.doPromotion(); ...... } /** * 促銷相關擴展點 */ protected String doPromotion() { return "1"; } }
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "TAXI") public class OrderCreatedProcessor4Taxi extends OrderCreatedProcessor<String, CreateOrderContext> { @Override protected String doPromotion() { return "taxt1"; } }
3 狀態遷移流程的執行流程
狀態機引擎的執行過程
通過上面的介紹,大體明白了怎么實現狀態流程編排、業務隔離和擴展等等,但是狀態機引擎具體是怎么把這個過程串聯起來的呢?簡單說、分為兩個階段:初始化階段和運行時階段。
(1)狀態機引擎初始化階段
首先在代碼編寫階段、根據上面的分析,業務通過實現AbstractStateProcessor模板類、並添加@OrderProcessor注解來實現自己的多個需要的特定狀態處理器。
那么在系統初始化階段,所有添加了@OrderProcessor注解的實現類都會被spring所管理成為spring bean,狀態機引擎在通過監聽spring bean的注冊(BeanPostProcessor)來將這些狀態處理器processor裝載到自己管理的容器中。直白來說、這個狀態處理器容器其實就是一個多層map實現的,第一層map的key是狀態(state),第二層map的key是狀態對應的事件(event)、一個狀態可以有多個要處理的事件,第三層map的key是具體的場景code(也就是bizCode和sceneId的組合),最后的value是AbstractStateProcessor集合。
public class DefaultStateProcessRegistry implements BeanPostProcessor { /** * 第一層key是訂單狀態。 * 第二層key是訂單狀態對應的事件,一個狀態可以有多個事件。 * 第三層key是具體場景code,場景下對應的多個處理器,需要后續進行過濾選擇出一個具體的執行。 */ private static Map<String, Map<String, Map<String, List<AbstractStateProcessor>>>> stateProcessMap = new ConcurrentHashMap<>(); @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof AbstractStateProcessor && bean.getClass().isAnnotationPresent(OrderProcessor.class)) { OrderProcessor annotation = bean.getClass().getAnnotation(OrderProcessor.class); String[] states = annotation.state(); String event = annotation.event(); String[] bizCodes = annotation.bizCode().length == 0 ? new String[]{"#"} : annotation.bizCode(); String[] sceneIds = annotation.sceneId().length == 0 ? new String[]{"#"} : annotation.sceneId(); initProcessMap(states, event, bizCodes, sceneIds, stateProcessMap, (AbstractStateProcessor) bean); } return bean; } private <E extends StateProcessor> void initProcessMap(String[] states, String event, String[] bizCodes, String[] sceneIds, Map<String, Map<String, Map<String, List<E>>>> map, E processor) { for (String bizCode : bizCodes) { for (String sceneId : sceneIds) { Arrays.asList(states).parallelStream().forEach(orderStateEnum -> { registerStateHandlers(orderStateEnum, event, bizCode, sceneId, map, processor); }); } } } /** * 初始化狀態機處理器 */ public <E extends StateProcessor> void registerStateHandlers(String orderStateEnum, String event, String bizCode, String sceneId, Map<String, Map<String, Map<String, List<E>>>> map, E processor) { // state維度 if (!map.containsKey(orderStateEnum)) { map.put(orderStateEnum, new ConcurrentHashMap<>()); } Map<String, Map<String, List<E>>> stateTransformEventEnumMap = map.get(orderStateEnum); // event維度 if (!stateTransformEventEnumMap.containsKey(event)) { stateTransformEventEnumMap.put(event, new ConcurrentHashMap<>()); } // bizCode and sceneId Map<String, List<E>> processorMap = stateTransformEventEnumMap.get(event); String bizCodeAndSceneId = bizCode + "@" + sceneId; if (!processorMap.containsKey(bizCodeAndSceneId)) { processorMap.put(bizCodeAndSceneId, new CopyOnWriteArrayList<>()); } processorMap.get(bizCodeAndSceneId).add(processor); } }
(2)狀態機引擎運行時階段
經過初始化之后,所有的狀態處理器processor都被裝載到容器。在運行時,通過一個入口來發起對狀態機的調用,方法的主要參數是操作事件(event)和業務入參,如果是新創建訂單請求需要攜帶業務(bizCode)和場景(sceneId)信息、如果是已存在訂單的更新狀態機引擎會根據oderId自動獲取業務(bizCode)、場景(sceneId)和當前狀態(state)。之后引擎根據state+event+bizCode+sceneId從狀態處理器容器中獲取到對應的具體處理器processor,從而進行狀態遷移處理。
/** * 狀態機執行引擎 */ public interface OrderFsmEngine { /** * 執行狀態遷移事件,不傳FsmOrder默認會根據orderId從FsmOrderService接口獲取 */ <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent) throws Exception; /** * 執行狀態遷移事件,可攜帶FsmOrder參數 */ <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) throws Exception; }
@Component
public class DefaultOrderFsmEngine implements OrderFsmEngine { @Override public <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent) throws Exception { FsmOrder fsmOrder = null; if (orderStateEvent.newCreate()) { fsmOrder = this.fsmOrderService.getFsmOrder(orderStateEvent.getOrderId()); if (fsmOrder == null) { throw new FsmException(ErrorCodeEnum.ORDER_NOT_FOUND); } } return sendEvent(orderStateEvent, fsmOrder); } @Override public <T> ServiceResult<T> sendEvent(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) throws Exception { // 構造當前事件上下文 StateContext context = this.getStateContext(orderStateEvent, fsmOrder); // 獲取當前事件處理器 StateProcessor<T> stateProcessor = this.getStateProcessor(context); // 執行處理邏輯 return stateProcessor.action(context); } private <T> StateProcessor<T, ?> getStateProcessor(StateContext<?> context) { OrderStateEvent stateEvent = context.getOrderStateEvent(); FsmOrder fsmOrder = context.getFsmOrder(); // 根據狀態+事件對象獲取所對應的業務處理器集合 List<AbstractStateProcessor> processorList = stateProcessorRegistry.acquireStateProcess(fsmOrder.getOrderState(), stateEvent.getEventType(), fsmOrder.bizCode(), fsmOrder.sceneId()); if (processorList == null) { // 訂單狀態發生改變 if (!Objects.isNull(stateEvent.orderState()) && !stateEvent.orderState().equals(fsmOrder.getOrderState())) { throw new FsmException(ErrorCodeEnum.ORDER_STATE_NOT_MATCH); } throw new FsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR); } if (CollectionUtils.isEmpty(processorResult)) { throw new FsmException(ErrorCodeEnum.NOT_FOUND_PROCESSOR); } if (processorResult.size() > 1) { throw new FsmException(ErrorCodeEnum.FOUND_MORE_PROCESSOR); } return processorResult.get(0); } private StateContext<?> getStateContext(OrderStateEvent orderStateEvent, FsmOrder fsmOrder) { StateContext<?> context = new StateContext(orderStateEvent, fsmOrder); return context; } }
檢測到多個狀態執行器怎么處理
有一點要說明,有可能根據state+event+bizCode+sceneId信息獲取到的是多個狀態處理器processor,有可能確實業務需要單純依賴bizCode和sceneId兩個屬性無法有效識別和定位唯一processor,那么我們這里給業務開一個口、由業務決定從多個處理器中選一個適合當前上下文的,具體做法是業務processor通過filter方法根據當前context來判斷是否符合調用條件。
private <T> StateProcessor<T, ?> getStateProcessor(StateContext<?> context) { // 根據狀態+事件對象獲取所對應的業務處理器集合 List<AbstractStateProcessor> processorList = ... ...... List<AbstractStateProcessor> processorResult = new ArrayList<>(processorList.size()); // 根據上下文獲取唯一的業務處理器 for (AbstractStateProcessor processor : processorList) { if (processor.filter(context)) { processorResult.add(processor); } } ...... }
filter在具體狀態處理器processor中的使用舉例:
@OrderProcessor(state = OrderStateEnum.INIT, event = OrderEventEnum.CREATE, bizCode = "BUSINESS") public class OrderCreatedProcessor extends AbstractStateProcessor<String, CreateOrderContext> { ...... @Override public boolean filter(StateContext<CreateOrderContext> context) { OrderInfo orderInfo = (OrderInfo) context.getFsmOrder(); if (orderInfo.getServiceType() == ServiceType.TAKEOFF_CAR) { return true; } return false; } ...... }
當然,如果最終經過業務filter之后,還是有多個狀態處理器符合條件,那么這里只能拋異常處理了。這個需要在開發時,對狀態和多維度處理器有詳細規划。
4 狀態機引擎執行總結
狀態機引擎處理流程
簡易的狀態機引擎的執行流程整理,主要介紹運行時的狀態機執行過程。
狀態處理器的原理
簡易的狀態機處理器的原理和依賴整理,主要介紹狀態處理器的流程和細節。
三 其他
還有其他問題么,想一下。
1 狀態流轉並發問題怎么處理?
如果一個訂單當前是狀態A、此刻從不同的維度或入口分別發起了不同的事件請求,此時怎么處理?
比如當前訂單是新創建完成狀態,用戶發起了取消同時客服也發起了取消,在或者訂單是待支付狀態、系統發起了免密支付同時客服或者用戶發起了改價。這些場景不管是系統照成的並發還是業務操作造成的並發,並發是真實存在的。對於這種情況、原則是同一時刻一個訂單只能有一個狀態變更事件可進行,其他的請求要么排隊、要么返回由上游進行處理或重試等。
我們的做法是:
- 在狀態機OrderFsmEngine的sendEvent入口處,針對同一個訂單維度加鎖(redis分布式鎖)、同一時間只允許有一個狀態變更操作進行,其他請求則進行排隊等待。
- 在數據庫層對當前state做校驗、類似與樂觀鎖方式。最終是將其他請求拋錯、由上游業務進行處理。
2 能不能動態實現狀態流程的切換和編排?
最開始我們有一個版本,狀態處理器的定義不是由注解方式實現、而是將state、event、bizCode、sceneId、processor這些通過數據庫表來保存,初始化時從數據庫加載后進行處理器的裝載。同時通過一個后台可以動態的調整state、event、bizCode、sceneId、processor對應關系、以此來達到動態靈活配置流程的效果,但是隨着業務的上線,基本從來沒有進行動態變更過,其實也不敢操作,畢竟狀態流轉事十分核心的業務、一旦因變更導致故障是不可想象的。
3 通用性的問題
其實不僅僅訂單系統、甚至不僅是狀態機邏輯可以用上面講的這些思路處理,很多日常中其他一些多維度的業務都可以采取這些方案進行處理。
4 與TMF的結合
其實這套狀態機引擎還是比較簡單的、對於業務擴展點處的定義也不是十分友好,目前我們也正在結合TMF框架來定制擴展點,TMF是從執行具體擴展點實現的角度出發,達到標准流程和具體業務邏輯分離的效果。
當然不管那種方案,擴展點的定義是業務需要核心關心和友好封裝的事情。
本文為阿里雲原創內容,未經允許不得轉載。