Activiti是業界很流行的java工作流引擎,關於Activiti與JBPM5的關系和如何選擇不是本文要討論的話題,相關內容可以baidu一下。Activiti從架構角度看是比較優秀的,是很面向對象的,是我所閱讀過的代碼結構很棒的開源軟件,個人認為比Spring,Hibernate的要好。
Activiti的基礎編程框架
Activiti基於Spring,ibatis等開源中間件作為軟件平台,在此之上構建了非常清晰的開發框架。上圖列出了Activiti的核心組件。
1.ProcessEngine:流程引擎的抽象,對於開發者來說,它是我們使用Activiti的facade,通過它可以獲得我們需要的一切服務。
2.XXService(TaskService,RuntimeService,RepositoryService...):Activiti按照流程的生命周期(定義,部署,運行)把不同階段的服務封裝在不同的Service中,用戶可以非常清晰地使用特定階段的接口。通過ProcessEngine能夠獲得這些Service實例。TaskService,RuntimeService,RepositoryService是非常重要的三個Service:
TaskService:流程運行過程中,與每個任務節點相關的接口,比如complete,delete,delegate等等
RepositoryService:流程定義和部署相關的存儲服務。
RuntimeService:流程運行時相關服務,如startProcessInstanceByKey.
關於ProcessEngine和XXService的關系,可以看下面這張圖:
3.CommandContextIntercepter(CommandExecutor):Activiti使用命令模式作為基礎開發模式,上面Service中定義的各個方法都對應相應的命令對象(xxCmd), Service把各種請求委托給xxCmd,xxCmd來決定命令的接收者,接收者執行后返回結果。而CommandContextIntercepter顧名思義,它是一個攔截器,攔截所有命令,在命令執行前后執行一些公共性操作。比如CommandContextIntercepter的核心方法:
- public <T> T execute(Command<T> command) {
- CommandContext context = commandContextFactory.createCommandContext(command);
- try {
- //執行前保存上下文
- Context.setCommandContext(context);
- Context.setProcessEngineConfiguration(processEngineConfiguration);
- return next.execute(command);//執行命令
- } catch (Exception e) {
- context.exception(e);
- } finally {
- try {
- //關閉上下文,內部會flush session,把數據持久化到db等
- context.close();
- } finally {
- //釋放上下文
- Context.removeCommandContext();
- Context.removeProcessEngineConfiguration();
- }
- }
- return null;
- }
關於命令模式的細節說明,網上有很多資料,這里不展開。我只是想說一下我看到Activiti的這種設計之后的兩點感受:
1)一個產品或者一個項目,從技術上必須有一個明確的、唯一的開發模型或者叫開發樣式(真不知道怎么說恰當),我們常說希望一個團隊的所有人寫出的代碼都有統一的風格,都像是一個人寫出來的,很理想化,但做到很難,往往我們都是通過“規范”去約束大家這樣做,而規范畢竟是程序之外的東西,主觀性很強,不遵守規范的情況屢屢發生。而如果架構師給出了明確的開發模型,並使用一些基礎組件加以強化,把程序員要走的路規定清楚,那你想不遵守規范都會很難,因為那意味着你寫的東西沒發工作。就像Activiti做的這樣,明確以Command作為基本開發模型,輔之以Event-Listener,這樣編程風格的整體性得到了保證。
2)使用命令模式的好處,我這里體會最深的就是 職責分離,解耦。有了Command,各個Service從角色上說只是一些協調者或者控制者,他不需要知道具體怎么做,他只是把任務交給了各個命令。直接的好處是臃腫的、萬能的大類沒有了。而這往往是我們平時開發中最深惡痛絕的地方。
4.核心業務對象(Task,ProcessInstance,Execution...):org.activiti.engine.impl.persistence.entity包下的類是Activiti的核心業務對象。它們是真正的對象,而不是只有數據沒有行為的假對象,搞java企業級開發的人也許已經習慣了下面的層次划分:controller->service->dao->entity, entity只是ORMapping中數據表的java對象體現,沒有任何行為(getter/setter不能算行為),對於面向對象來說,這當然是有問題的,記得曾聽人說過這樣的話“使用面向對象語言進行設計和開發 與 面向對象的設計和開發 是兩回事”,面向對象講究的是“封裝”,“多態”,追求的是滿足“開-閉”原則的、職責單一的對象社會。如果你認同上述觀點,那么相信Activiti會讓你感覺舒服一些,以TaskEntity為例,其UML類圖如下:
(圖2:TaskEntity類)
TaskEntity實現了3個接口:Task,DelegateTask和PersistentObject。其中PersistentObject是一個聲明接口,表明TaskEntity需要持久化。接口是一種角色的聲明,是一份職責的描述而TaskEntity就是這個角色的具體扮演者,因此TaskEntity必須承擔如complete,delegate等職責。
但是這里有些遺憾的是像complete這么重要的行為居然沒有在3個接口中描述(難道是因為工期緊張?^_^),因此“面向抽象”編程對於TaskEntity來說還沒有完全做到。但至少Activiti告訴我們:
1)牢記面向抽象編程,把職責拆分為不同的接口,讓接口來體現對象的職責,而不用去關心這份職責具體由哪個對象實現;
2)entity其實可以也應該是真正的對象。
5.Activiti的上下文組件(Context)
上下文(Context)用來保存生命周期很長的、全局性的信息。Activiti的Context類(在org.activiti.engine.impl.context包下)保持如下三類信息:
(圖3:Context類)
CommandContext:命令上下文,保持每個命令需要的必要資源,如持久化需要的session。
ProcessEngineConfigurationImpl:流程引擎相關的配置信息。它是整個引擎的全局配置信息,mailServerHost,DataSource等。單例。該實例在流程引擎創建時被實例化,其調用stack如下圖:
(圖4:ProcessEngineConfiguration的初始化)
ExecutionContext:剛看到這個類感覺有些奇怪,不明白其作用是什么。看其代碼持有ExecutionEntity這個實例。而ExecutionEntity是Activiti中非常重要的一個類,//TODO
6.Activiti的持久化框架(組件)
Activiti使用ibatis作為ORMapping工具。在此基礎之上Activiti設計了自己的持久化框架,看一張圖:
(圖5:Activiti持久化框架)
頂層接口是Session和SessionFactory,這都非常好理解了。
Session有兩個實現類:
DbSqlSession:簡單點說,DbSqlSession負責sql表達式的執行。
AbstractManager:簡單點說,AbstractManager及其子類負責面向對象的持久化操作
同理DbSqlSessionFactory與GenericManagerFactory的區別就很好理解了。
持久化框架也是在流程引擎建立時初始化的,具體見圖4.
7.Event-Listener 組件
Activiti允許客戶端代碼介入流程的執行。為此提供了一個基礎組件,看圖:
(圖6:用戶代碼介入流程的基礎組件)
用戶可以介入的代碼類型包括:TaskListener,JavaDelegate,Expression,ExecutionListener。
ProcessEngineConfigurationImpl持有DelegateInterceptor的某個實例,這樣就可以隨時非常方便地調用handleInvocation
8.Cache 組件
對Activiti的cache實現很感興趣,但現在我了解到的情況(也許還沒有了解清楚)其cache的實現還是很簡單的,在DbSqlSession中有cache實現:
- protected List<PersistentObject> insertedObjects = new ArrayList<PersistentObject>();
- protected Map<Class<?>, Map<String, CachedObject>> cachedObjects = new HashMap<Class<?>, Map<String,CachedObject>>();
- protected List<DeleteOperation> deletedObjects = new ArrayList<DeleteOperation>();
- protected List<DeserializedObject> deserializedObjects = new ArrayList<DeserializedObject>();
也就是說Activiti就是基於內存的List和Map來做緩存的。具體怎么用的呢?以DbSqlSession.selectOne方法為例:
- public Object selectOne(String statement, Object parameter) {
- statement = dbSqlSessionFactory.mapStatement(statement);
- Object result = sqlSession.selectOne(statement, parameter);
- if (result instanceof PersistentObject) {
- PersistentObject loadedObject = (PersistentObject) result;
- 緩存處理
- result = cacheFilter(loadedObject);
- }
- return result;
- }
- protected PersistentObject cacheFilter(PersistentObject persistentObject) {
- PersistentObject cachedPersistentObject = cacheGet(persistentObject.getClass(), persistentObject.getId());
- if (cachedPersistentObject!=null) {
- //如果緩存中有就直接返回
- return cachedPersistentObject;
- }
- //否則,就先放入緩存
- cachePut(persistentObject, true);
- return persistentObject;
- }
- protected CachedObject cachePut(PersistentObject persistentObject, boolean storeState) {
- Map<String, CachedObject> classCache = cachedObjects.get(persistentObject.getClass());
- if (classCache==null) {
- classCache = new HashMap<String, CachedObject>();
- cachedObjects.put(persistentObject.getClass(), classCache);
- }
- //這里是關鍵:一個CachedObject包含被緩存的對象本身:persistentObject和緩存的狀態:storeState
- //Activiti正是根據storeState來判別緩存中的數據是否被更新是否與db保持一致的。
- CachedObject cachedObject = new CachedObject(persistentObject, storeState);
- classCache.put(persistentObject.getId(), cachedObject);
- return cachedObject;
- }
看了Activiti的緩存設計,我現在最大的疑問是Activiti貌似不支持cluster,因為其緩存設計是基於單機內存的,這個問題還需要進一步調查。
9.異步執行組件
Activiti可以異步執行job(具體例子可以看一下ProcessInstance startProcessInstanceByKey(String processDefinitionKey);),下面簡單分析一下其實現過程,還是先看圖:
(圖7:異步執行組件核心類)
JobExecutor是異步執行組件的核心類,其包含三個主要屬性:
1)JobAcquisitionThread jobAccquisitionThread:執行任務的線程 extends java.lang.Thread
2)BlockingQueue<Runnable> threadPoolQueue
3)ThreadPoolExecutor threadPoolExecutor:線程池
方法ProcessEngines在引擎啟動時調用JobExecutor.start,JobAcquisitionThread 線程即開始工作,其run方法不斷循環執行AcquiredJobs中的job,執行一次后線程等待一定時間直到超時或者JobExecutor.jobWasAdded方法因為有新任務而被調用。
這里發現有一處設計的不夠好:JobAcquisitionThread 與JobExecutor之間的關系是如此緊密(你中有我,我中有你),那么可以把JobAcquisitionThread 作為JobExecutor的內部類來實現,同時把ThreadPoolExecutor threadPoolExecutor交給JobAcquisitionThread 來管理,JobExecutor只負責接受任務以及啟動、停止等更高級的工作,具體細節委托給JobAcquisitionThread ,責任分解,便於維護,JobExecutor的代碼也會看起來更清晰。
10.PVM
PVM:Process Virtal Machine,流程虛擬機API暴露了流程虛擬機的POJO核心,流程虛擬機API描述了一個工作流流程必備的組件,這些組件包括:
PvmProcessDefinition:流程的定義,形象點說就是用戶畫的那個圖。靜態含義。
PvmProcessInstance:流程實例,用戶發起的某個PvmProcessDefinition的一個實例,動態含義。
PvmActivity:流程中的一個節點
PvmTransition:銜接各個節點之間的路徑,形象點說就是圖中各個節點之間的連接線。
PvmEvent:流程執行過程中觸發的事件
以上這些組件很好地對一個流程進行了建模和抽象。每個組件都有很清晰的角色和職責划分。另外,有了這些API,我們可以通過編程的方式,用代碼來“畫”一個流程圖並讓他run起來,例如:
- PvmProcessDefinition processDefinition = new ProcessDefinitionBuilder()
- .createActivity("a").initial().<strong style="#ff0000;">behavior</strong>(new WaitState())
- .transition("b").endActivity().createActivity("b")
- .behavior(new WaitState()).transition("c").endActivity()
- .createActivity("c").behavior(new WaitState()).endActivity()
- .buildProcessDefinition();
- PvmProcessInstance processInstance = processDefinition
- .createProcessInstance();
- processInstance.start();
- PvmExecution activityInstance = processInstance.findExecution("a");
- assertNotNull(activityInstance);
- activityInstance.signal(null, null);
- activityInstance = processInstance.findExecution("b");
- assertNotNull(activityInstance);
- activityInstance.signal(null, null);
- activityInstance = processInstance.findExecution("c");
- assertNotNull(activityInstance);
以上代碼都很簡單,很好理解,只有一點需要說明一下,粗體紅色背景的behavior方法,為一個PvmActivity增加ActivityBehavior,這是干什么呢?ActivityBehavior是一個interface,其接口聲明很簡單:
- /**
- * @author Tom Baeyens
- */
- public interface ActivityBehavior {
- void execute(ActivityExecution execution) throws Exception;
- }
我的理解:Activiti把完成一個PvmActivity的行為單獨建模封裝在ActivityBehavior中。execute方法只有一個參數ActivityExecution,為啥這么設計?
為了更好地理解ActivityBehavior的作用,我們以TaskEntity.complete方法為例,分析其執行過程,先看complete的代碼:
- public void complete() {
- fireEvent(TaskListener.EVENTNAME_COMPLETE);
- Context
- .getCommandContext()
- .getTaskManager()
- .deleteTask(this, TaskEntity.DELETE_REASON_COMPLETED, false);
- if (executionId!=null) {
- getExecution().signal(null, null);
- }
- }
代碼很簡單,也很好理解(可能出乎我們的意料,因為完成一個task,其實有很多事情要做的):
1.fireEvent:通知Listener,本任務完成了。
2.數據持久化相關的動作
3.getExecution().signal(null, null):發信號,這里面隱藏的東西就多了,總體來說,完成了當前任務流程怎么走,怎么生成新的任務都是在這里完成的。
進去看看:
- public void signal(String signalName, Object signalData) {
- ensureActivityInitialized();
- SignallableActivityBehavior activityBehavior = (SignallableActivityBehavior) activity.getActivityBehavior();
- try {
- activityBehavior.signal(this, signalName, signalData);
- } catch (RuntimeException e) {
- throw e;
- } catch (Exception e) {
- throw new PvmException("couldn't process signal '"+signalName+"' on activity '"+activity.getId()+"': "+e.getMessage(), e);
- }
- }
ExecutionEntity.signal方法核心工作就是把發信號的工作委托給PvmActivity的activityBehavior. 這里的設計存在問題,很顯然其觸犯了一個代碼的壞味道:消息鏈。它讓ExceutionEntity沒有必要地與SignallableActivityBehavior 產生了耦合,更好的做法應該是PvmActivity提供signal方法,其內部調用ActivityBehavior完成發信號工作。
其實看看PvmActivity的接口聲明,我不免也有疑問,本來屬於PvmActivity的很重要的職責在其接口聲明中都沒有體現,why??
- /**
- * @author Tom Baeyens
- */
- public interface PvmActivity extends PvmScope {
- boolean isAsync();
- PvmScope getParent();
- List<PvmTransition> getIncomingTransitions();
- List<PvmTransition> getOutgoingTransitions();
- PvmTransition findOutgoingTransition(String transitionId);
- }
把思路拉回來,我們繼續看activityBehavior.signal方法內部的具體實現。