Activiti源碼分析


Activiti是業界很流行的java工作流引擎,關於Activiti與JBPM5的關系和如何選擇不是本文要討論的話題,相關內容可以baidu一下。Activiti從架構角度看是比較優秀的,是很面向對象的,是我所閱讀過的代碼結構很棒的開源軟件,個人認為比Spring,Hibernate的要好。


Activiti的基礎編程框架

 


Activiti基於Spring,ibatis等開源中間件作為軟件平台,在此之上構建了非常清晰的開發框架。上圖列出了Activiti的核心組件。

1.ProcessEngine:流程引擎的抽象,對於開發者來說,它是我們使用Activiti的facade,通過它可以獲得我們需要的一切服務。

2.XXService(TaskService,RuntimeService,RepositoryService...):Activiti按照流程的生命周期(定義,部署,運行)把不同階段的服務封裝在不同的Service中,用戶可以非常清晰地使用特定階段的接口。通過ProcessEngine能夠獲得這些Service實例。TaskService,RuntimeService,RepositoryService是非常重要的三個Service:

TaskService:流程運行過程中,與每個任務節點相關的接口,比如complete,delete,delegate等等

RepositoryService:流程定義和部署相關的存儲服務。

RuntimeService:流程運行時相關服務,如startProcessInstanceByKey.

 

關於ProcessEngine和XXService的關系,可以看下面這張圖:

 

 

3.CommandContextIntercepter(CommandExecutor):Activiti使用命令模式作為基礎開發模式,上面Service中定義的各個方法都對應相應的命令對象(xxCmd), Service把各種請求委托給xxCmd,xxCmd來決定命令的接收者,接收者執行后返回結果。而CommandContextIntercepter顧名思義,它是一個攔截器,攔截所有命令,在命令執行前后執行一些公共性操作。比如CommandContextIntercepter的核心方法:

 

Java代碼   收藏代碼
  1.   public <T> T execute(Command<T> command) {  
  2.     CommandContext context = commandContextFactory.createCommandContext(command);  
  3.   
  4.     try {  
  5. //執行前保存上下文  
  6.       Context.setCommandContext(context);  
  7.       Context.setProcessEngineConfiguration(processEngineConfiguration);  
  8.       return next.execute(command);//執行命令  
  9.         
  10.     } catch (Exception e) {  
  11.       context.exception(e);  
  12.         
  13.     } finally {  
  14.       try {  
  15. //關閉上下文,內部會flush session,把數據持久化到db等  
  16.         context.close();  
  17.       } finally {  
  18. //釋放上下文  
  19.         Context.removeCommandContext();  
  20.         Context.removeProcessEngineConfiguration();  
  21.       }  
  22.     }  
  23.       
  24.     return null;  
  25.   }  

 

關於命令模式的細節說明,網上有很多資料,這里不展開。我只是想說一下我看到Activiti的這種設計之后的兩點感受:

1)一個產品或者一個項目,從技術上必須有一個明確的、唯一的開發模型或者叫開發樣式(真不知道怎么說恰當),我們常說希望一個團隊的所有人寫出的代碼都有統一的風格,都像是一個人寫出來的,很理想化,但做到很難,往往我們都是通過“規范”去約束大家這樣做,而規范畢竟是程序之外的東西,主觀性很強,不遵守規范的情況屢屢發生。而如果架構師給出了明確的開發模型,並使用一些基礎組件加以強化,把程序員要走的路規定清楚,那你想不遵守規范都會很難,因為那意味着你寫的東西沒發工作。就像Activiti做的這樣,明確以Command作為基本開發模型,輔之以Event-Listener,這樣編程風格的整體性得到了保證。

2)使用命令模式的好處,我這里體會最深的就是 職責分離,解耦。有了Command,各個Service從角色上說只是一些協調者或者控制者,他不需要知道具體怎么做,他只是把任務交給了各個命令。直接的好處是臃腫的、萬能的大類沒有了。而這往往是我們平時開發中最深惡痛絕的地方。

 

 

4.核心業務對象(Task,ProcessInstance,Execution...):org.activiti.engine.impl.persistence.entity包下的類是Activiti的核心業務對象。它們是真正的對象,而不是只有數據沒有行為的假對象,搞java企業級開發的人也許已經習慣了下面的層次划分:controller->service->dao->entity, entity只是ORMapping中數據表的java對象體現,沒有任何行為(getter/setter不能算行為),對於面向對象來說,這當然是有問題的,記得曾聽人說過這樣的話“使用面向對象語言進行設計和開發 與 面向對象的設計和開發 是兩回事”,面向對象講究的是“封裝”,“多態”,追求的是滿足“開-閉”原則的、職責單一的對象社會。如果你認同上述觀點,那么相信Activiti會讓你感覺舒服一些,以TaskEntity為例,其UML類圖如下:

(圖2:TaskEntity類)

TaskEntity實現了3個接口:Task,DelegateTask和PersistentObject。其中PersistentObject是一個聲明接口,表明TaskEntity需要持久化。接口是一種角色的聲明,是一份職責的描述而TaskEntity就是這個角色的具體扮演者,因此TaskEntity必須承擔如complete,delegate等職責。

但是這里有些遺憾的是像complete這么重要的行為居然沒有在3個接口中描述(難道是因為工期緊張?^_^),因此“面向抽象”編程對於TaskEntity來說還沒有完全做到。但至少Activiti告訴我們:

1)牢記面向抽象編程,把職責拆分為不同的接口,讓接口來體現對象的職責,而不用去關心這份職責具體由哪個對象實現;

2)entity其實可以也應該是真正的對象。

 

5.Activiti的上下文組件(Context)

上下文(Context)用來保存生命周期很長的、全局性的信息。Activiti的Context類(在org.activiti.engine.impl.context包下)保持如下三類信息:

 

(圖3:Context類)

CommandContext:命令上下文,保持每個命令需要的必要資源,如持久化需要的session。

ProcessEngineConfigurationImpl:流程引擎相關的配置信息。它是整個引擎的全局配置信息,mailServerHost,DataSource等。單例。該實例在流程引擎創建時被實例化,其調用stack如下圖:

(圖4:ProcessEngineConfiguration的初始化)

ExecutionContext:剛看到這個類感覺有些奇怪,不明白其作用是什么。看其代碼持有ExecutionEntity這個實例。而ExecutionEntity是Activiti中非常重要的一個類,//TODO

 

 

6.Activiti的持久化框架(組件)

Activiti使用ibatis作為ORMapping工具。在此基礎之上Activiti設計了自己的持久化框架,看一張圖:

 

(圖5:Activiti持久化框架)

 

頂層接口是Session和SessionFactory,這都非常好理解了。

Session有兩個實現類:

DbSqlSession:簡單點說,DbSqlSession負責sql表達式的執行。

AbstractManager:簡單點說,AbstractManager及其子類負責面向對象的持久化操作

同理DbSqlSessionFactory與GenericManagerFactory的區別就很好理解了。

 

持久化框架也是在流程引擎建立時初始化的,具體見圖4.

 

7.Event-Listener 組件

Activiti允許客戶端代碼介入流程的執行。為此提供了一個基礎組件,看圖:

(圖6:用戶代碼介入流程的基礎組件)

用戶可以介入的代碼類型包括:TaskListener,JavaDelegate,Expression,ExecutionListener。

ProcessEngineConfigurationImpl持有DelegateInterceptor的某個實例,這樣就可以隨時非常方便地調用handleInvocation

 

8.Cache 組件

對Activiti的cache實現很感興趣,但現在我了解到的情況(也許還沒有了解清楚)其cache的實現還是很簡單的,在DbSqlSession中有cache實現:

 

Java代碼   收藏代碼
  1. protected List<PersistentObject> insertedObjects = new ArrayList<PersistentObject>();  
  2. protected Map<Class<?>, Map<String, CachedObject>> cachedObjects = new HashMap<Class<?>, Map<String,CachedObject>>();  
  3. protected List<DeleteOperation> deletedObjects = new ArrayList<DeleteOperation>();  
  4. protected List<DeserializedObject> deserializedObjects = new ArrayList<DeserializedObject>();  

 也就是說Activiti就是基於內存的List和Map來做緩存的。具體怎么用的呢?以DbSqlSession.selectOne方法為例:

 

Java代碼   收藏代碼
  1. public Object selectOne(String statement, Object parameter) {  
  2.   statement = dbSqlSessionFactory.mapStatement(statement);  
  3.   Object result = sqlSession.selectOne(statement, parameter);  
  4.   if (result instanceof PersistentObject) {  
  5.     PersistentObject loadedObject = (PersistentObject) result;  
  6. 緩存處理  
  7.     result = cacheFilter(loadedObject);  
  8.   }  
  9.   return result;  
  10. }  

 

 

Java代碼   收藏代碼
  1.   protected PersistentObject cacheFilter(PersistentObject persistentObject) {  
  2.     PersistentObject cachedPersistentObject = cacheGet(persistentObject.getClass(), persistentObject.getId());  
  3.     if (cachedPersistentObject!=null) {  
  4. //如果緩存中有就直接返回  
  5.       return cachedPersistentObject;  
  6.     }  
  7. //否則,就先放入緩存  
  8.     cachePut(persistentObject, true);  
  9.     return persistentObject;  
  10.   }  

 

 

 

Java代碼   收藏代碼
  1. protected CachedObject cachePut(PersistentObject persistentObject, boolean storeState) {  
  2.   Map<String, CachedObject> classCache = cachedObjects.get(persistentObject.getClass());  
  3.   if (classCache==null) {  
  4.     classCache = new HashMap<String, CachedObject>();  
  5.     cachedObjects.put(persistentObject.getClass(), classCache);  
  6.   }  
  7.   //這里是關鍵:一個CachedObject包含被緩存的對象本身:persistentObject和緩存的狀態:storeState  
  8.   //Activiti正是根據storeState來判別緩存中的數據是否被更新是否與db保持一致的。  
  9.   CachedObject cachedObject = new CachedObject(persistentObject, storeState);  
  10.   classCache.put(persistentObject.getId(), cachedObject);  
  11.   return cachedObject;  
  12. }  

 

看了Activiti的緩存設計,我現在最大的疑問是Activiti貌似不支持cluster,因為其緩存設計是基於單機內存的,這個問題還需要進一步調查。

 

9.異步執行組件

Activiti可以異步執行job(具體例子可以看一下ProcessInstance startProcessInstanceByKey(String processDefinitionKey);),下面簡單分析一下其實現過程,還是先看圖:

(圖7:異步執行組件核心類)

JobExecutor是異步執行組件的核心類,其包含三個主要屬性:

1)JobAcquisitionThread jobAccquisitionThread:執行任務的線程 extends java.lang.Thread

2)BlockingQueue<Runnable> threadPoolQueue

3)ThreadPoolExecutor threadPoolExecutor:線程池

 

方法ProcessEngines在引擎啟動時調用JobExecutor.start,JobAcquisitionThread 線程即開始工作,其run方法不斷循環執行AcquiredJobs中的job,執行一次后線程等待一定時間直到超時或者JobExecutor.jobWasAdded方法因為有新任務而被調用。

 

這里發現有一處設計的不夠好:JobAcquisitionThread 與JobExecutor之間的關系是如此緊密(你中有我,我中有你),那么可以把JobAcquisitionThread 作為JobExecutor的內部類來實現,同時把ThreadPoolExecutor threadPoolExecutor交給JobAcquisitionThread 來管理,JobExecutor只負責接受任務以及啟動、停止等更高級的工作,具體細節委托給JobAcquisitionThread ,責任分解,便於維護,JobExecutor的代碼也會看起來更清晰。

 

 

10.PVM

PVM:Process Virtal Machine,流程虛擬機API暴露了流程虛擬機的POJO核心,流程虛擬機API描述了一個工作流流程必備的組件,這些組件包括:

PvmProcessDefinition:流程的定義,形象點說就是用戶畫的那個圖。靜態含義。

PvmProcessInstance:流程實例,用戶發起的某個PvmProcessDefinition的一個實例,動態含義。

PvmActivity:流程中的一個節點

PvmTransition:銜接各個節點之間的路徑,形象點說就是圖中各個節點之間的連接線。

PvmEvent:流程執行過程中觸發的事件

 

以上這些組件很好地對一個流程進行了建模和抽象。每個組件都有很清晰的角色和職責划分。另外,有了這些API,我們可以通過編程的方式,用代碼來“畫”一個流程圖並讓他run起來,例如:

 

Java代碼   收藏代碼
  1. PvmProcessDefinition processDefinition = new ProcessDefinitionBuilder()  
  2.         .createActivity("a").initial().<strong style="#ff0000;">behavior</strong>(new WaitState())  
  3.         .transition("b").endActivity().createActivity("b")  
  4.         .behavior(new WaitState()).transition("c").endActivity()  
  5.         .createActivity("c").behavior(new WaitState()).endActivity()  
  6.         .buildProcessDefinition();  
  7. PvmProcessInstance processInstance = processDefinition  
  8.         .createProcessInstance();  
  9. processInstance.start();  
  10. PvmExecution activityInstance = processInstance.findExecution("a");  
  11. assertNotNull(activityInstance);  
  12. activityInstance.signal(null, null);  
  13. activityInstance = processInstance.findExecution("b");  
  14. assertNotNull(activityInstance);  
  15. activityInstance.signal(null, null);  
  16. activityInstance = processInstance.findExecution("c");  
  17. assertNotNull(activityInstance);  

 

以上代碼都很簡單,很好理解,只有一點需要說明一下,粗體紅色背景的behavior方法,為一個PvmActivity增加ActivityBehavior,這是干什么呢?ActivityBehavior是一個interface,其接口聲明很簡單:

 

Java代碼   收藏代碼
  1. /** 
  2.  * @author Tom Baeyens 
  3.  */  
  4. public interface ActivityBehavior {  
  5.   
  6.   void execute(ActivityExecution execution) throws Exception;  
  7. }  

 

我的理解:Activiti把完成一個PvmActivity的行為單獨建模封裝在ActivityBehavior中。execute方法只有一個參數ActivityExecution,為啥這么設計?

 

 

為了更好地理解ActivityBehavior的作用,我們以TaskEntity.complete方法為例,分析其執行過程,先看complete的代碼:

 

Java代碼   收藏代碼
  1. public void complete() {  
  2.   fireEvent(TaskListener.EVENTNAME_COMPLETE);  
  3.   
  4.   Context  
  5.     .getCommandContext()  
  6.     .getTaskManager()  
  7.     .deleteTask(this, TaskEntity.DELETE_REASON_COMPLETED, false);  
  8.     
  9.   if (executionId!=null) {  
  10.     getExecution().signal(null, null);  
  11.   }  
  12. }  

 

代碼很簡單,也很好理解(可能出乎我們的意料,因為完成一個task,其實有很多事情要做的):

1.fireEvent:通知Listener,本任務完成了。

2.數據持久化相關的動作

3.getExecution().signal(null, null):發信號,這里面隱藏的東西就多了,總體來說,完成了當前任務流程怎么走,怎么生成新的任務都是在這里完成的。

進去看看:

 

Java代碼   收藏代碼
  1. public void signal(String signalName, Object signalData) {  
  2.   ensureActivityInitialized();  
  3.   SignallableActivityBehavior activityBehavior = (SignallableActivityBehavior) activity.getActivityBehavior();  
  4.   try {  
  5.     activityBehavior.signal(this, signalName, signalData);  
  6.   } catch (RuntimeException e) {  
  7.     throw e;  
  8.   } catch (Exception e) {  
  9.     throw new PvmException("couldn't process signal '"+signalName+"' on activity '"+activity.getId()+"': "+e.getMessage(), e);  
  10.   }  
  11. }  

 

ExecutionEntity.signal方法核心工作就是把發信號的工作委托給PvmActivity的activityBehavior. 這里的設計存在問題,很顯然其觸犯了一個代碼的壞味道:消息鏈。它讓ExceutionEntity沒有必要地與SignallableActivityBehavior 產生了耦合,更好的做法應該是PvmActivity提供signal方法,其內部調用ActivityBehavior完成發信號工作。

 

其實看看PvmActivity的接口聲明,我不免也有疑問,本來屬於PvmActivity的很重要的職責在其接口聲明中都沒有體現,why??

 

Java代碼   收藏代碼
  1. /** 
  2.  * @author Tom Baeyens 
  3.  */  
  4. public interface PvmActivity extends PvmScope {  
  5.     
  6.   boolean isAsync();  
  7.   
  8.   PvmScope getParent();  
  9.   
  10.   List<PvmTransition> getIncomingTransitions();  
  11.   
  12.   List<PvmTransition> getOutgoingTransitions();  
  13.     
  14.   PvmTransition findOutgoingTransition(String transitionId);  
  15. }  

 

把思路拉回來,我們繼續看activityBehavior.signal方法內部的具體實現。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM