前言:
最近做的一個項目是一個數據庫服務化的管控平台,用時髦一點的名詞來說是一個DBaaS產品。這種面向雲化的產品,呈現給最終用戶的體驗是提供一個管理頁面,把數據庫的生命周期,監控等功能通過WEB頁面或者Open API暴露給用戶或者第三方的程序,常見的產品類似於阿里雲或者AWS的RDS。而我們的做的產品實際上是一個分布式的數據庫服務平台,除了底層的存儲,還有上層的proxy去完成分庫分表,讀寫分離等操作。
對於終端用戶來說,使用的是一個數據庫連接,但實際上在后面,會有很多系統一同工作。例如當用戶創建一個RDS的時候,會去創建底層的數據庫實例(MySQL,SQL Server等),Loader Blance,Proxy等,而這些組件其實也是由其他系統通過Open API或者RPC的方式暴露給上層應用。作為Paas或者DBaaS的最上層的產品,免不了會調用其他的系統接口去申請資源。那么在代碼實現上會碰到一個問題,當依賴多個系統的時候,依次調用各個系統的的過程中,如果中途出錯,錯誤處理比較難。
之前狀況:
之前的項目代碼中,沒有統一的任務執行框架,由各個開發人員自行去編碼,那么經常看到這種代碼
try{ resultA = call_system_a; resultB = call_system_b; }catch(Exception e){ if ( ! resultA ){ do_some_clean_work_sysytem_a; }else ( result && ! reusltB ) {
do_some_clean_word_system_a; do_some_clean_work_system_b; } }
為什么要這么寫呢,是因為當出了異常以后,你需要判斷是a系統調用失敗了,還是b系統調用失敗了。並且把之前調用其他系統的資源給及時釋放掉。
上面這段代碼只是大致演示了當流程涉及到兩個系統的時候,代碼是怎么樣,實際上,完成一個雲資源的申請,會涉及到 5 6 個系統,如果中間一步出錯,需要在代碼里面控制如何回滾,是非常難的。因為系統間的調用多數是通過HTTP或者RPC的方式,而不像數據庫可用事物控制。
統一的任務執行API
為了在新的項目里面規避之前實現不合理的帶來的問題,我新寫了一個package,將原來樹形的流程控制(即多個if else嵌套去判斷是否系統調用出問題),改成了線性的流程控制,這樣簡化的編程模型,統一了項目組成員編寫代碼的風格。
對於要執行的任務,抽象出任務和步驟兩個核心領域模型,對於每一次任務的執行抽象出任務執行實例這個模型,整個系統的模型如下圖所示:
這樣,之前在一個方法里面去調用多個系統,變成多個方法的組合,各個方法只關注單個系統的調用和出錯處理,簡化了編程模型。
為了實現錯誤處理,在step中還定義了onError接口,在出錯的時候,框架按照先入后出的順序會調用各個step的onError方法。Step的接口定義如下:
public interface Step { String getStepName(); void beforeExecute(TaskExecution taskExecution); /** * 該步驟主要的業務邏輯實現,如果拋出任何異常,表示改步驟執行失敗,會調用該步驟和已經執行完的步驟的onError方法 * @param taskExecution */ void onExecute(TaskExecution taskExecution); void onComplete(TaskExecution taskExecution); /** * 該步驟出錯時會調用,如果拋出異常,會被框架給忽略掉。已完成其他步驟的onError方法調用。 * @param execution * @param throwable */ void onError(TaskExecution execution, Throwable throwable); }
之前提到了按照先入后出的順序去回調onError方法,自然就想到用stack去保存已經執行過的步驟,任務執行引擎的主要過程和代碼如下:
private TaskExecution doExecute(Task Task, TaskExecution taskExecution) { Stack<Step> StepStack = new Stack<>(); for (Step step : Task.getSteps()) { StepStack.push(step); StepExecution StepExecution = executeSingleStep(taskExecution, step); if (StepExecution.getStatus() == StepStatus.FAILED) { rollBack(StepStack, taskExecution); taskExecution = taskExecutionRepository.updateStatus(taskExecution, FAILED); return taskExecution; } else { taskExecution.setPercent(calculatePercent(StepStack.size(), Task.getSteps().size())); } } return taskExecutionRepository.updateStatus(taskExecution, COMPLETED); }
其中TaskExecutionRepository主要用來做任務執行狀態的持久化。
其他:
除去統一了任務執行的編程模型,在整個系統的領域層,也抽取出Task這個領域對象,前端通過rest接口的方式暴露一個統一的任務提交接口,返回一個TaskExecution對象之后由后端去異步執行任務。前端根據該對象的唯一ID可以查詢當前任務執行的狀態,耗時時間,完成百分比等信息。這類信息對於雲產品的運維提供了方便。在以后新加業務功能的時候,只要新加任務類型即可。