Activiti架構分析及源碼詳解
引言
工作流引擎,應用於解決流程審批和流程編排方面等問題,有效的提供了擴展性的支撐。而目前來說,工作流領域也有了相對通行化的標准規范,也就是BPMN2.0。支持這個規范的開源引擎主要有:Activiti,flowable,Jbpm4等。本文着重對Activiti的架構設計進行分析和梳理,同時對流程啟動和原子操作的相關代碼進行完整走讀。
本文的閱讀對象需要對Activiti有一定的理解並且已經能夠初步的使用Activiti進行流程流轉方面開發。
如果對文章內容有疑惑,歡迎加入技術交流群186233599,作者會不定時解答相關問題。
一、Activiti設計解析-架構&領域模型
1.1 架構
Activiti采用了一個分層架構完成自底向上的包裝。架構圖如下
大致包括:
- 核心接口層,被PVM接口定義。PVM會在后面的章節中詳細講述。
- 核心實現層,基於PVM的思想和接口,定義了一些關鍵實體包含ActivityImpl(該類抽象了節點實現),FlowElementBehavior實現(該類抽象了節點指令動作),ExecutionImpl(流程執行實體類)
- 命令層,Activiti在編碼模式上直接限定整體風格為命令模式。也就是將業務邏輯封裝為一個個的Command接口實現類。這樣新增一個業務功能時只需要新增一個Command實現即可。這里需要特別提到的是,命令本身需要運行在命令上下文中,也就是CommandContext類對象。
- 命令攔截層,采用責任鏈模式,通過責任鏈模式的攔截器層,為命令的執行創造條件。諸如開啟事務,創建CommandContext上下文,記錄日志等
- 業務接口層,面向業務,提供了各種接口。這部分的接口就不再面向框架開發者了,而是面向框架的使用者。
- 部署層,嚴格來說,這個與上面說到的並不是一個完整的分層體系。但是為了突出重要性,單獨拿出來說。流程運轉的前提是流程定義。而流程定義解析就是一切的開始。從領域語言解析為Java的POJO對象依靠的就是部署層。后文還會細說這個環節。
- 流程引擎,所有接口的總入口。上面提到的業務接口層,部署層都可以從流程引擎類中得到。因此這里的流程引擎接口其實類似門面模式,只作為提供入口。
1.1.1 命令模式
Activit整體上采用命令模式進行代碼功能解耦。將流程引擎的大部分涉及到客戶端的需求讓外部以具體命令實現類的方式實現。
完成這個編碼模式,有幾個重點類需要關注
Command
命令接口,所有的具體命令都需要實現該類,最終業務就是執行該類的execute方法。CommandContext
命令上下文,為具體命令的執行提供上下文支撐。該上下文的生成是依靠命令攔截器中的上下文攔截器org.activiti.engine.impl.interceptor.CommandContextInterceptor
來生成的。該攔截器會判斷是復用當前的上下文還是生成新的上下文。
引擎內的大部分功能都是通過單獨的命令完成。
1.1.2 責任鏈模式
Activiti的命令模式還需要搭配其對應的責任鏈來完成。具體來說,Activiti中存在一個命令攔截器鏈條,該命令攔截器鏈條由幾大塊的攔截器實現組成,如下
其中重要的默認攔截器有2個:
- 事務攔截器,主要職責是使得后續命令運行在事務環境下。
- CommandContext攔截器,主要職責是在有必要的時候創建CommandContext對象,並在使用完成后關閉該上下文。
1.1.2.1 事務攔截器
事務攔截器是否提供取決於org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl
的子類對方法createTransactionInterceptor
的實現。獨立使用時的org.activiti.engine.impl.cfg.StandaloneProcessEngineConfiguration
該方法返回為空。也就是不提供事務攔截器。此時,命令的運行就無法通過事務攔截器來提供事務環境了。
1.1.2.2 命令上下文攔截器
實現類:org.activiti.engine.impl.interceptor.CommandContextInterceptor。
該攔截器的功能非常重要,可以說是Activiti操作的核心之一。其作用是在后續攔截器執行前檢查當前上下文環境,如果不存在CommandContext對象,則創建一個;在后續攔截器執行后,將CommandContext對象close。CommandContext包含了本次操作中涉及到所有的數據對象。
1.1.3 流程定義解析
Activiti遵循BPMN2.0規范,因此框架中少不了對BPMN2.0規范的定義文件(XML形式)的解析類。Activiti采用的STAX的拉模型進行XML解析。這里先不分析其具體的解析類的內在聯系,而是概念性的闡述下Activiti對解析的概念分層。
首先通過類org.activiti.bpmn.converter.BpmnXMLConverter
進行XML解析,解析為org.activiti.bpmn.model
包下面的與各個XML元素定義對應的POJO類。此時這些POJO類僅僅只是XML文件的一個Java表達。
在通過類org.activiti.engine.impl.bpmn.parser.BpmnParser
聚合不同的解析類,將上面步驟解析出來的POJO類進一步解析為可以在框架中利用的org.activiti.engine.impl.pvm.process
包下面的類。典型的代表就是ActivityImpl類。
三者之間的關系簡單用圖表達就是
1.2 領域模型
Activiti采取了領域中的充血模型作為自己的實現方式。大部分業務邏輯都直接關聯在了org.activiti.engine.impl.persistence.entity.ExecutionEntity
中。由於Activiti采用了MyBatis而非Hibernate這樣的O/R Mapping產品作為持久層,因此Activiti在具體的持久化操作上也有自己的獨特方式。
1.2.1 數據集中提交
Activiti的持久化機制簡單說來就是數據集中提交。集中提交還產生了一個額外的作用:自動提交。換句話說,在內存中的實體,如果更新了屬性但是沒有顯示的執行刷新動作,在一個調用的生命周期結束后也會被持久化其最新的狀態到數據庫。下面來看下詳細解釋下這個集中提交機制。
在Activiti所有運行期生成的對象都需要實現一個接口org.activiti.engine.impl.interceptor.Session
,其定義如下
public interface Session
{
void flush();
void close();
}
而Session對象則是由接口org.activiti.engine.impl.interceptor.SessionFactory
的方法進行生成,其定義如下
public interface SessionFactory {
Class<?> getSessionType();
Session openSession();
}
流程引擎內部持有各種SessionFactory實現,用戶也可以自定義注冊自己的SessionFactory實現,如果用戶希望自定義的對象也可以被集中提交機制處理的話。
在CommandContext中存在一個Map<Class,Session>存儲,存儲着該CommandContext生命周期內新建的所有Session對象。當一個命令執行完畢后,最終命令上下文CommandContext的close方法會被調用。當執行CommandContext.close()方法時,其內部會按照順序執行flushSessions
,closeSessions
方法。從名字可以看到,第一個方法內部就是執行所有Session對象的flush方法,第二個方法內部就是執行所有的Session對象的close方法。
流程引擎內部有一個Session實現是比較特別的。也就是org.activiti.engine.impl.db.DbSqlSession
實現。如果有需要更新,刪除,插入等操作,該操作是需要通過DbSqlSession來實現的,而實際上該實現會將這些操作緩存在內部。只有在執行flush方法時才會真正的提交到數據庫去執行。正是因為如此,所有的數據操作,實際上最終都是要等到CommandContext執行close方法時,才會真正提到到數據庫。
理論上,充血模型是在聚合根這個層面上完成的持久化。但是由於Activiti沒有采用O/R Mapping框架,於是自己完成了一個類似功能的模塊。
1.2.2 PersistentObject
要明白工作流中的數據插入機制,首先要看下實體類的接口org.activiti.engine.impl.db.PersistentObject
,如下
public interface PersistentObject {
String getId();
void setId(String id);
Object getPersistentState();
}
Activiti的數據庫表都是單字符串主鍵,每一個實體類都需要實現該接口。在對實體類進行保存的時候,DbSqlSession會調用getId方法判斷是否存在ID,如果不存在,則使用ID生成器(該生成器根據策略有幾種不同實現,這里不表)生成一個ID並且設置。
而方法getPersistentState
是用來返回一個持久化狀態對象。則方法的使用場合在下一個章節說明
1.2.3 DbSqlSession
該Session內部存在三個重要屬性,如下
//該屬性存儲着所有使用insert方法放入的對象
protected Map<Class<? extends PersistentObject>, List<PersistentObject>> insertedObjects = new HashMap<Class<? extends PersistentObject>, List<PersistentObject>>();
//該Map結構內存儲所有通過該DbSqlSession查詢出來的結果,以及update方法放入的對象
protected Map<Class<?>, Map<String, CachedObject>> cachedObjects = new HashMap<Class<?>, Map<String,CachedObject>>();
//該屬性內存儲着所有將要執行的刪除操作
protected List<DeleteOperation> deleteOperations = new ArrayList<DeleteOperation>();
刪除和新增都比較容易理解,就是要此類操作緩存起來,一次性提交到數據庫,上文曾提到的數據集中提交就體現在這個地方。而cachedObjects就有些不同了。要解析這個Map結構,首先來看下類org.activiti.engine.impl.db.DbSqlSession.CachedObject
的結構屬性,如下
public static class CachedObject {
protected PersistentObject persistentObject;
protected Object persistentObjectState;
}
public CachedObject(PersistentObject persistentObject, boolean storeState) {
this.persistentObject = persistentObject;
if (storeState) {
this.persistentObjectState = persistentObject.getPersistentState();
}
}
通過構造方法可以明白,在新建該對象的時候,通過storeState參數決定是否保存當時的持久化狀態。
該Map的數據來源有2處
- 所有通過該DbSqlSession對象執行查詢的結果對象都會生成一個對應的CachedObject對象,並且storeState參數為true
- 執行該DbSqlSession對象的update類方法,會將參數用CachedObject對象包裝起來,storeState參數為false。
當DbSqlSession執行flush方法時,主要來說是做了數據提交動作
- 將insertObjects列表中的元素插入到數據庫
- 將deleteOperations列表中的元素遍歷執行
- 執行方法getUpdatedObjects獲得要更新的實體對象
方法getUpdatedObjects的邏輯就是遍歷所有的CachedObject,同時滿足以下條件者則放入要更新的實體集合中
- 實體的getPersistentState方法不為空
- 實體的getPersistentState方法返回對象與CachedObject存儲的persistentObjectState執行equal判斷,結果為false
通過上面可以得知,如果一個實體類在DbSqlSession的生命周期被查詢出來,並且其數據內容有了修改,則DbSqlSession刷新時會自動刷新到數據庫。
二、Activiti設計解析-PVM執行樹
2.1 核心理念
任何框架都是核心理念上發展細化而來。Activiti的核心理念就是流程虛擬機(Process Virtual Machine,以下簡稱PVM)。PVM試圖提供一組API,通過API本身來描述工作流方面的各種可能性。沒有了具體實現,也使得PVM本身可以較好的適應各種不同的工作流領域語言,而Activiti本身也是在PVM上的一種實現。
2.1.1 PVM對流程定義期的描述
首先來看下流程定義本身。在工作流中,流程定義可以圖形化的表達為一組節點和連接構成的集合。比如下圖
即使沒有任何知識也能大概明白這張圖表達的是一個流程以及執行順序的意圖。流程定義的表達方式不限,可以使用圖形的方式表達,可以使用領域語言,也可以傳統的XML(比如Activiti用的就是BPMN2.0 Schema下的XML)。特別的,當前已經有了標准化的BPMN2.0規范。
PVM將流程定義描述為流程元素的集合。再將流程元素細分為2個子類:流程節點和連線。
- 流程節點是某一種動作表達的抽象描述。節點本身是可以嵌套的,也就是節點可以擁有子節點。
- 連線表達是不同節點之間的轉移關系。一個連線只能有一個源頭節點和一個目標節點。而節點本身可以有任意多的進入連線和外出連線。
從類圖的角度也能很好的看出這種關系,流程節點PvmActivity和連線PvmTransition都是流程元素PvmProcessElement。
從類圖可以看到PvmActivity繼承於PvmScope。這種繼承關系表明流程節點本身有其歸於的作用域(PvmScope),節點本身也可能是另外一些節點的作用域,這也符合節點可能擁有子節點的原則。關於作用域本身,后文還會再次詳細講解,這里先按下不表。
2.1.2 PVM對流程運行期的描述
通過流程節點和連線,PVM完成了對流程定義的表達。流程定義是一個流程的靜態表達,流程執行則是依照流程定義啟動的一個運行期表達,每一個流程執行都具備自己唯一的生命周期。流程執行需要具備以下要素:
- 流程節點的具體執行動作。
- 流程執行當前處於哪一個流程節點。
- 流程執行是如何從一個節點運行至下一個節點。
- 流程執行如何執行流程節點定義的執行動作
針對要素1,Activiti提供了接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior
。該接口內部僅有一個execute方法。該接口的實現即為不同PvmActivity節點提供了具體動作。ActivityBehavior有豐富的不同實現,對應了流程中豐富的不同功能的節點。每一個PvmActivity對象都會持有一個ActivityBehavior對象。
針對要素2,Activiti提供了接口org.activiti.engine.impl.pvm.PvmExecution
。該接口有一個方法PvmActivity getActivity()
。用以返回當前流程執行所處的流程節點。
針對要素3,Activiti提供了接口org.activiti.engine.impl.pvm.runtime.InterpretableExecution
。接口方法很多,這里取和流程執行運轉最重要的2個方法展開,如下
public interface InterpretableExecution extends ActivityExecution, ExecutionListenerExecution, PvmProcessInstance {
void take(PvmTransition transition);
void take(PvmTransition transition, boolean fireActivityCompletedEvent);
執行方法take,以連線對象作為入參,這會使得流程執行該連線定義的路線。其實現邏輯應該為讓流程執行定位於連線源頭的活動節點,經由連線對象,到達連線目的地的活動節點。
針對要素4,實際上也是由接口org.activiti.engine.impl.pvm.runtime.AtomicOperation
來完成的。通過該接口的調用類,此種情況的實現者需要獲取當前流程執行所處的活動節點的ActivityBehavior
對象,執行其execute
方法來執行節點動作。結合要素3和4,可以看出AtomicOperation
接口用於執行流程運轉中的單一指令,例如根據連線移動,執行節點指令等。分解成單一指令的好處是易於編碼和理解。這也契合接口命名中的原子一意。
2.1.3PVM綜述
從上面對PVM定義期和運行期的解釋可以看出,整個概念體系並不復雜。涉及到的類也不多。正是因為PVM只對工作流中最基礎的部分做了抽象和接口定義,使得PVM的實現上有了很多的可能性。
然而也正是由於定義的簡單性,實際上這套PVM在轉化為實際實現的時候需要額外附加很多的特性才能真正完成框架需求。
2.2 ActivitiImpl與作用域
在解析完成后,一個流程定義中的所有節點都會被解析為ActivityImpl對象。ActivityImpl對象本身可以持有事件訂閱(根據BPMN2.0規范,目前有定時,消息,信號三種事件訂閱類型)。因為ActivityImpl本身可以嵌套並且可以持有訂閱,因此引入作用域概念(Scope)。
一個ActivityImpl在以下兩種情況下會被定義為作用域ActivityImpl。
- 該ActivityImpl是可變范圍,則它是一個作用域。可變范圍可以理解為該節點的內容定義是可變的。比如流程定義、子流程,其內部內容是可變的。根據BPMN定義,可變范圍有:流程定義,子流程,多實例,調用活動。
- 該ActivityImpl定義了一個上下文用於接收事件。比如:具備邊界事件的ActivityImpl,具備事件子流程的ActivityImpl,事件驅動網關,中間事件捕獲ActivityImpl。
作用域是一個很重要的概念,情況1中作用域定義的是復雜節點的生命周期,情況2中作用域定義的是事件的捕獲范圍。
2.3 ExecutionEntity
ExecutionEntity的含義是一個流程定義被啟動后的執行實例,代表着流程的運行期狀態。在Activiti的設計中,事件訂閱,流程變量等都是與一個具體的ExecutionEntity相關的。其本身有幾個重要的屬性:
- isScope:該屬性為真時,意味該執行實例在執行一個具備作用域的ActivityImpl節點或者執行一個流程定義。更簡單一些,意味着該實例正在執行一個作用域活動。
- isConcurrent:該屬性為真時,意味與該執行實例正在執行的活動節點同屬相同作用域的節點可能正並發被其他執行實例執行(比如並行網關后面的2個並行任務)。
- isActive:該屬性為真時,意味該執行實例正在執行一個簡單ActivityImpl(不包含其他ActivityImpl的ActivityImpl)
- isEventScope:該屬性為真時,意味該執行實例是為了后期補償而進行的變量保存所創建的執行實例。由於流程執行中的變量都需要與ExecutionEntity掛鈎,而補償是需要原始變量的快照。為了滿足這個需求,創建出一個專用於此的ExecutionEntity。
- activityId:該ExecutionEntity正在執行的ActivityImpl的id。正在執行意味着幾種情況:進入該節點,執行該節點動作,離開該節點。如果是等待子流程的完成,則該屬性為null。
上面對ExecutionEntity的解釋仍然抽象。如果直觀的看,可以認為ExecutionEntity是某一種生命周期的體現,其內部屬性隨着不同的情況而變化。如下圖所示:
隨着流程的啟動,會創建一個ExecutionEntity
對象。該ExecutionEntity
生命周期與整個流程相同,而其中的isScope
和isConcurrent
在創建之初固定,並且不會改變。而isActive
和activityId
隨着流程的推進則會不斷變化。
ExecutionEntity
是用來反映流程的推進情況的,實際上,往往一個ExecutionEntity
不足以支撐全部的BPMN功能。因此實現上,Activiti是通過一個樹狀結構的ExecutionEntity
結構來反映流程推進情況。創建之初的ExecutionEntity
對象隨着流程的推進會不斷的分裂和合並,ExecutionEntity
樹也會不斷的生長和修剪。在流程的推進過程中會遇到4種基本情況
- 單獨的非作用域節點
- 單獨的作用域節點
- 並發的非作用域節點
- 並發的作用域節點
2.3.1 單獨的非作用域節點
此種情況可以如下圖所示
在流程前進的構成中,遇到單獨的非作用域節點,ExecutionEntity
一直處於激活狀態,只不過隨着流程前進,其activityId指向會不斷變化。如上圖所示,會經歷:開始節點、組員工作、領導審批、結束節點4個不同的值。
實際上,這些節點都是在作用域<流程定義>之下,而ExecutionEntity
代表的正是該作用域,因此其isScope屬性為true。
2.3.2 單獨的作用域節點
如果流程推進中遇到單獨的作用域節點,則當前執行對象ExecutionEntity
應該創建一個作用域子執行(isScope為true的ExecutionEntity)。整個變化過程可以如下圖所示
當准備進入節點<組員工作>時,ExecutionEntity1
凍結,並且創建出子執行ExecutionEntity2
。ExecutionEntity2
的isScope屬性也為true。
ExecutionEntity1
的isScope為true,是因為該執行實例負責整個流程定義的事件訂閱,ExecutionEntity2
的isScope為true,是因為該執行實例負責節點<組員工作>的事件訂閱。
前面提到過,事件訂閱與某一個具體的執行實例相關。當節點<組員工作>完成時,也就是事件訂閱所在的作用域要被摧毀時,對應的事件訂閱也要被刪除。此時額外的ExecutionEntity
就特別方便,只要刪除該ExecutionEntity
,順便刪除相關的事件訂閱即可,在這里就是刪除ExecutionEntity2
。
刪除ExecutionEntity2
,並且激活父執行ExecutionEntity1
。隨着流程推進,ExecutionEntity1
更換指向的activityId。
2.3.3 並發的非作用域節點
流程推進中節點存在多個外出連線,則可以根據需要創建多個並發的子執行,每一個子執行對應一個連線。如下圖所示
當流程節點A、B被激活時,ExecutionEntity1
會有2個並發的子執行ExecutionEntity2
和ExecutionEntity3
。這兩個子執行的isConcurrent
屬性均為true,因為節點A和B都是在相同的作用域(流程定義)下被並發的執行。
當節點A、B執行完畢后,並發的子執行被刪除,父執行重新被激活,繼續后面的節點。
2.3.4 並發的作用域節點
流程推進中遇到並發節點,並且節點為作用域節點,情況就會如下所示
當流程運行至P1節點,P1節點有多個出線。根據出線數目創建2個子執行,此時2個子執行均為並發的,且從P1作為出線的源頭節點,因此2個子執行的activityId均為P1。
當運行到A、B節點時,由於2個節點均為作用域節點,因此還會再創建2個子執行。此時ExecutionEntity3
和ExecutionEntity3
凍結。ExecutionEntity4
和ExecutionEntity5
所執行的節點在各自的作用域下均無並發操作,因此其isScope屬性為true,isConcurrent屬性為false。這5個執行實例構成的執行樹如下
當A、B節點完成時,首先是各自的作用域被刪除,因此ExecutionEntity4
和ExecutionEntity5
首先被刪除,ExecutionEntity3
和ExecutionEntity4
激活。而后匯聚於P2節點,因此ExecutionEntity3
和ExecutionEntity4
刪除,ExecutionEntity1
被激活,繼續執行剩下的節點。
三、代碼解析-流程啟動
3.1 流程說明
流程啟動依靠的是命令類:org.activiti.engine.impl.cmd.StartProcessInstanceCmd
。
該命令的整體流程如下
部署管理器的查詢和運行期關系不大,先忽略。兩個流程着重展開:
- 流程定義實體創建流程實例
- 流程實例執行啟動
3.1.1 流程定義實體創建流程實例
流程如下
其中指定初始滑動節點創建實例本身展開后的流程如下
ExecutionEntity的初始化需要單獨說下,流程如下
在創建流程的邏輯的尾部是一個循環流程。該循環的目的是為了創建正確的ExecutionImpl樹(以下簡稱執行樹)。本質上該方法是創建一個流程實例,並且將流程當前運行節點定位到指定的節點。而工作流的正確執行依賴於執行樹的正確分裂和整合。因此就需要為指定的節點創建其上游的執行樹實例。使得在效果上看起來就和流程自動執行到當前節點類似(執行樹類似,節點運行歷史則無相似,實際上也無歷史節點)。
而如果指定的初始節點就是流程定義的初始節點,則循環就不存在意義了。
3.1.2 流程實例啟動
流程實例的啟動的內容,就是執行原子操作:org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStart
。關於原子操作單獨闡述。
3.2 額外補充
3.2.1 ActivityImpl的parent屬性
org.activiti.engine.impl.pvm.process.ActivityImpl
類中有一個屬性parent,類型為org.activiti.engine.impl.pvm.process.ScopeImpl
。在解析的時候,該屬性為當前節點的作用域節點。根據作用域節點的定義,該屬性的取值有兩種可能的類型,一種是org.activiti.engine.impl.pvm.process.ActivityImpl
,另外一種是org.activiti.engine.impl.pvm.process.ProcessDefinitionImpl
。
第二種情況意味着該節點是直屬於流程定義的節點了。
四、代碼解析-原子操作
4.1 說明
原子操作是一個接口org.activiti.engine.impl.pvm.runtime.AtomicOperation
。從名字也可以看出,該接口的作用就是執行流程實例中的一個單步操作。下面分階段說明
4.2 AbstractEventAtomicOperation
該抽象類是眾多實現類的基類。其代碼如下
public abstract class AbstractEventAtomicOperation implements AtomicOperation {
public boolean isAsync(InterpretableExecution execution) {
return false;
}
public void execute(InterpretableExecution execution) {
//獲取當前執行對象的作用域對象。具體由子類提供。
ScopeImpl scope = getScope(execution);
//從作用域對象中獲取指定事件的監聽器。事件名稱由子類提供。
List<ExecutionListener> exectionListeners = scope.getExecutionListeners(getEventName());
int executionListenerIndex = execution.getExecutionListenerIndex();
if (exectionListeners.size()>executionListenerIndex) {
execution.setEventName(getEventName());
execution.setEventSource(scope);
ExecutionListener listener = exectionListeners.get(executionListenerIndex);
try {
listener.notify(execution);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
}
execution.setExecutionListenerIndex(executionListenerIndex+1);
execution.performOperation(this);
} else {
execution.setExecutionListenerIndex(0);
execution.setEventName(null);
execution.setEventSource(null);
eventNotificationsCompleted(execution);
}
}
protected abstract ScopeImpl getScope(InterpretableExecution execution);
protected abstract String getEventName();
protected abstract void eventNotificationsCompleted(InterpretableExecution execution);
}
整個抽象類的邏輯概括而言,就是將獲取當前執行實例的作用域對象(具體由子類提供),執行其中特定事件(事件名由子類提供)的監聽器。
在全部的監聽器執行完畢后,執行子類的特定邏輯。
4.3 AtomicOperationProcessStart
該操作用於流程啟動。但是並不執行真正的啟動動作。只是設置了當前執行實例的活動節點為org.activiti.engine.impl.pvm.runtime.StartingExecution
中存儲的活動節點。然后執行原子操作org.activiti.engine.impl.pvm.runtime.AtomicOperationProcessStartInitial
。
本質上來說,只是執行了一個設置的動作。
public class AtomicOperationProcessStart extends AbstractEventAtomicOperation {
@Override
protected ScopeImpl getScope(InterpretableExecution execution) {
return execution.getProcessDefinition();
}
@Override
protected String getEventName() {
return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START;
}
@Override
protected void eventNotificationsCompleted(InterpretableExecution execution) {
if (Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Map<String, Object> variablesMap = null;
try {
variablesMap = execution.getVariables();
} catch (Throwable t) {
// In some rare cases getting the execution variables can fail (JPA entity load failure for example)
// We ignore the exception here, because it's only meant to include variables in the initialized event.
}
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createEntityWithVariablesEvent(ActivitiEventType.ENTITY_INITIALIZED,
execution, variablesMap, false));
Context.getProcessEngineConfiguration().getEventDispatcher()
.dispatchEvent(ActivitiEventBuilder.createProcessStartedEvent(execution, variablesMap, false));
}
ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
StartingExecution startingExecution = execution.getStartingExecution();
List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());
execution.setActivity(initialActivityStack.get(0));
execution.performOperation(PROCESS_START_INITIAL);
}
}
4.4 AtomicOperationProcessStartInitial
代碼如下
public class AtomicOperationProcessStartInitial extends AbstractEventAtomicOperation {
@Override
protected ScopeImpl getScope(InterpretableExecution execution) {
return (ScopeImpl) execution.getActivity();
}
@Override
protected String getEventName() {
return org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_START;
}
@Override
protected void eventNotificationsCompleted(InterpretableExecution execution) {
ActivityImpl activity = (ActivityImpl) execution.getActivity();
ProcessDefinitionImpl processDefinition = execution.getProcessDefinition();
StartingExecution startingExecution = execution.getStartingExecution();
//從開始節點開始的,該判斷均為真。
if (activity==startingExecution.getInitial()) {
execution.disposeStartingExecution();
execution.performOperation(ACTIVITY_EXECUTE);
} else {
List<ActivityImpl> initialActivityStack = processDefinition.getInitialActivityStack(startingExecution.getInitial());
int index = initialActivityStack.indexOf(activity);
activity = initialActivityStack.get(index+1);
InterpretableExecution executionToUse = null;
if (activity.isScope()) {
executionToUse = (InterpretableExecution) execution.getExecutions().get(0);
} else {
executionToUse = execution;
}
executionToUse.setActivity(activity);
executionToUse.performOperation(PROCESS_START_INITIAL);
}
}
}
4.5 AtomicOperationTransitionNotifyListenerEnd
該原子操作的目的僅是為了執行節點上的end
事件監聽器。監聽器執行完畢后,就執行下一個原子操作AtomicOperationTransitionDestroyScope
4.6 AtomicOperationTransitionNotifyListenerStart
該原子操作的目的是為了執行節點上start事件監聽器。在執行完畢后,會判斷執行實例的當前節點是否可以執行。判斷的依據該節點和連接線節點
4.3 AtomicOperationActivityExecute
該原子操作的作用實際上就是取出該執行實例當前的活動節點,並且執行該活動節點的行為定義。行為定義通過接口org.activiti.engine.impl.pvm.delegate.ActivityBehavior
定義。不同的節點行為由不同的子類完成
public class AtomicOperationActivityExecute implements AtomicOperation {
private static Logger log = LoggerFactory.getLogger(AtomicOperationActivityExecute.class);
public boolean isAsync(InterpretableExecution execution) {
return false;
}
public void execute(InterpretableExecution execution) {
ActivityImpl activity = (ActivityImpl) execution.getActivity();
ActivityBehavior activityBehavior = activity.getActivityBehavior();
if (activityBehavior==null) {
throw new PvmException("no behavior specified in "+activity);
}
log.debug("{} executes {}: {}", execution, activity, activityBehavior.getClass().getName());
try {
if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createActivityEvent(ActivitiEventType.ACTIVITY_STARTED,
execution.getActivity().getId(),
(String) execution.getActivity().getProperty("name"),
execution.getId(),
execution.getProcessInstanceId(),
execution.getProcessDefinitionId(),
(String) activity.getProperties().get("type"),
activity.getActivityBehavior().getClass().getCanonicalName()));
}
activityBehavior.execute(execution);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
LogMDC.putMDCExecution(execution);
throw new PvmException("couldn't execute activity <"+activity.getProperty("type")+" id=\""+activity.getId()+"\" ...>: "+e.getMessage(), e);
}
}
}
4.4 AtomicOperationTransitionDestroyScope
public class AtomicOperationTransitionDestroyScope implements AtomicOperation {
private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionDestroyScope.class);
public boolean isAsync(InterpretableExecution execution) {
return false;
}
@SuppressWarnings("unchecked")
public void execute(InterpretableExecution execution) {
InterpretableExecution propagatingExecution = null;
ActivityImpl activity = (ActivityImpl) execution.getActivity();
/**
* 如果當前的活動節點具備作用域。這就意味着最初的時候,有一個處於激活狀態的執行實例在執行該節點,以下稱初始執行實例。
* 從這樣的節點退出要考慮幾種情況:
* 一、單獨的作用域節點。此時的執行樹情況是:非激活的非作用域執行實例-激活的作用域執行實例(初始執行實例)。此時要離開作用域節點,首先是銷毀激活的作用域執行實例(初始執行實例),激活初始執行實例的父實例,使用該父實例執行后續的出線動作。
* 二、並行的作用域節點。此時的執行樹情況是:非激活的非作用域並發執行實例-非激活的
*/
if (activity.isScope()) {
InterpretableExecution parentScopeInstance = null;
// if this is a concurrent execution crossing a scope boundary
if (execution.isConcurrent() && !execution.isScope()) {
// first remove the execution from the current root
InterpretableExecution concurrentRoot = (InterpretableExecution) execution.getParent();
parentScopeInstance = (InterpretableExecution) execution.getParent().getParent();
log.debug("moving concurrent {} one scope up under {}", execution, parentScopeInstance);
List<InterpretableExecution> parentScopeInstanceExecutions = (List<InterpretableExecution>) parentScopeInstance.getExecutions();
List<InterpretableExecution> concurrentRootExecutions = (List<InterpretableExecution>) concurrentRoot.getExecutions();
// if the parent scope had only one single scope child
if (parentScopeInstanceExecutions.size()==1) {
// it now becomes a concurrent execution
parentScopeInstanceExecutions.get(0).setConcurrent(true);
}
concurrentRootExecutions.remove(execution);
parentScopeInstanceExecutions.add(execution);
execution.setParent(parentScopeInstance);
execution.setActivity(activity);
propagatingExecution = execution;
// if there is only a single concurrent execution left
// in the concurrent root, auto-prune it. meaning, the
// last concurrent child execution data should be cloned into
// the concurrent root.
if (concurrentRootExecutions.size()==1) {
InterpretableExecution lastConcurrent = concurrentRootExecutions.get(0);
if (lastConcurrent.isScope()) {
lastConcurrent.setConcurrent(false);
} else {
log.debug("merging last concurrent {} into concurrent root {}", lastConcurrent, concurrentRoot);
// We can't just merge the data of the lastConcurrent into the concurrentRoot.
// This is because the concurrent root might be in a takeAll-loop. So the
// concurrent execution is the one that will be receiving the take
concurrentRoot.setActivity((ActivityImpl) lastConcurrent.getActivity());
concurrentRoot.setActive(lastConcurrent.isActive());
lastConcurrent.setReplacedBy(concurrentRoot);
lastConcurrent.remove();
}
}
} else if (execution.isConcurrent() && execution.isScope()) {
/**
* 根據算法,這種情況不會出現。源代碼中,這部分也屬於todo的內容。
*/
}
else {
/**
* 這個條件是執行實例的scope屬性為真。此時銷毀當前的執行實例,使用其父執行實例繼續后面的流程
*/
propagatingExecution = (InterpretableExecution) execution.getParent();
propagatingExecution.setActivity((ActivityImpl) execution.getActivity());
propagatingExecution.setTransition(execution.getTransition());
propagatingExecution.setActive(true);
log.debug("destroy scope: scoped {} continues as parent scope {}", execution, propagatingExecution);
execution.destroy();
//刪除與該執行實例相關的一切,包括:定時工作,各種任務,事件訂閱,用戶流程關系,最后刪除自身。
execution.remove();
}
} else {
//如果離開的是一個非作用域節點,則仍然使用當前的執行實例作為下一個節點的執行實例
propagatingExecution = execution;
}
// if there is another scope element that is ended
ScopeImpl nextOuterScopeElement = activity.getParent();
TransitionImpl transition = propagatingExecution.getTransition();
ActivityImpl destination = transition.getDestination();
/**
* 考慮當前的節點可能是子流程或者活動調用中的節點。那么就需要離開當前的作用域范圍,回到更上層的作用域下。因此需要判斷目的地是否和源頭節點處於同一個作用域。如果不是同一個作用域,則不斷向上回溯
*/
if (transitionLeavesNextOuterScope(nextOuterScopeElement, destination)) {
propagatingExecution.setActivity((ActivityImpl) nextOuterScopeElement);
propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_END);
} else {
propagatingExecution.performOperation(TRANSITION_NOTIFY_LISTENER_TAKE);
}
}
public boolean transitionLeavesNextOuterScope(ScopeImpl nextScopeElement, ActivityImpl destination) {
return !nextScopeElement.contains(destination);
}
}
4.5 AtomicOperationTransitionNotifyListenerTake
該原子操作的目的就是為了執行在連接線上的監聽器。在執行完畢后,就准備執行目標活動節點。這里關於目標節點還存在一個選擇的問題。並不是直接執行連接線上的目標活動節點。而是從目標活動節點出發,選擇和執行實例當前活動節點同屬同一個作用域的目標活動節點或其父節點。
public class AtomicOperationTransitionNotifyListenerTake implements AtomicOperation {
private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionNotifyListenerTake.class);
public boolean isAsync(InterpretableExecution execution) {
return false;
}
public void execute(InterpretableExecution execution) {
TransitionImpl transition = execution.getTransition();
List<ExecutionListener> executionListeners = transition.getExecutionListeners();
int executionListenerIndex = execution.getExecutionListenerIndex();
/**
* 整個if的功能就是不斷判斷監聽器是否被執行完畢。都執行完畢后走入到else的部分。
*/
if (executionListeners.size()>executionListenerIndex) {
execution.setEventName(org.activiti.engine.impl.pvm.PvmEvent.EVENTNAME_TAKE);
execution.setEventSource(transition);
ExecutionListener listener = executionListeners.get(executionListenerIndex);
try {
listener.notify(execution);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new PvmException("couldn't execute event listener : "+e.getMessage(), e);
}
execution.setExecutionListenerIndex(executionListenerIndex+1);
execution.performOperation(this);
} else {
if (log.isDebugEnabled()) {
log.debug("{} takes transition {}", execution, transition);
}
execution.setExecutionListenerIndex(0);
execution.setEventName(null);
execution.setEventSource(null);
ActivityImpl activity = (ActivityImpl) execution.getActivity();
ActivityImpl nextScope = findNextScope(activity.getParent(), transition.getDestination());
execution.setActivity(nextScope);
// Firing event that transition is being taken
if(Context.getProcessEngineConfiguration() != null && Context.getProcessEngineConfiguration().getEventDispatcher().isEnabled()) {
Context.getProcessEngineConfiguration().getEventDispatcher().dispatchEvent(
ActivitiEventBuilder.createSequenceFlowTakenEvent(ActivitiEventType.SEQUENCEFLOW_TAKEN, transition.getId(),
activity.getId(), (String) activity.getProperties().get("name") ,(String) activity.getProperties().get("type"), activity.getActivityBehavior().getClass().getCanonicalName(),
nextScope.getId(), (String) nextScope.getProperties().get("name"), (String) nextScope.getProperties().get("type"), nextScope.getActivityBehavior().getClass().getCanonicalName()));
}
execution.performOperation(TRANSITION_CREATE_SCOPE);
}
}
/** finds the next scope to enter. the most outer scope is found first */
public static ActivityImpl findNextScope(ScopeImpl outerScopeElement, ActivityImpl destination) {
ActivityImpl nextScope = destination;
while( (nextScope.getParent() instanceof ActivityImpl)
&& (nextScope.getParent() != outerScopeElement)
) {
nextScope = (ActivityImpl) nextScope.getParent();
}
return nextScope;
}
}
4.6 AtomicOperationTransitionCreateScope
該原子操作是為了確認進入的節點是否具備作用域。如果具備作用域,則將目前的執行實例凍結。並且創建出新的執行實例,用於執行作用域節點;如果不具備作用域,則無效果。
在確認完畢后,執行原子操作AtomicOperationTransitionNotifyListenerStart
public class AtomicOperationTransitionCreateScope implements AtomicOperation {
private static Logger log = LoggerFactory.getLogger(AtomicOperationTransitionCreateScope.class);
public boolean isAsync(InterpretableExecution execution) {
ActivityImpl activity = (ActivityImpl) execution.getActivity();
return activity.isAsync();
}
public void execute(InterpretableExecution execution) {
InterpretableExecution propagatingExecution = null;
ActivityImpl activity = (ActivityImpl) execution.getActivity();
if (activity.isScope()) {
//為作用域活動創建一個新的執行實例,是該原子操作的主要目的
propagatingExecution = (InterpretableExecution) execution.createExecution();
propagatingExecution.setActivity(activity);
propagatingExecution.setTransition(execution.getTransition());
execution.setTransition(null);
execution.setActivity(null);
execution.setActive(false);
log.debug("create scope: parent {} continues as execution {}", execution, propagatingExecution);
//這里是另外一個重點。在一個流程實例初始化的時候,會對當前流程所處的作用域對象(可能是流程定義或者是作用域活動進行處理,具體表現是為該作用域對象上的定時事件,消息事件,信號事件執行注冊動作。分別是放入定時調度器,在數據庫新增事件訂閱)
propagatingExecution.initialize();
} else {
propagatingExecution = execution;
}
propagatingExecution.performOperation(AtomicOperation.TRANSITION_NOTIFY_LISTENER_START);
}
}