Activiti內部實現中,各主要部件關系
對外,提供Service服務,它是無狀態的。
這些Service包括:
- protected RepositoryService repositoryService = new RepositoryServiceImpl();
- protected RuntimeService runtimeService = new RuntimeServiceImpl();
- protected HistoryService historyService = new HistoryServiceImpl();
- protected IdentityService identityService = new IdentityServiceImpl();
- protected TaskService taskService = new TaskServiceImpl();
- protected FormService formService = new FormServiceImpl();
- protected ManagementService managementService = new ManagementServiceImpl();
對service提供的服務,基本上每一個需要執行具體任務的方法,都有一個對應的Command實現與之對應。
- 一般來說,一個Command對應一個完整事物;
- 調用Command之前,都會經過CommandInterceptor,這個在初始化時就確定了,如:LogInterceptor >> CommandContextInterceptor >> CommandInvoker
如果Command不涉及節點相關內容,而是直接對數據庫進行操作,則直接關聯DB,入DbSqlSession的緩存;
否則,一般會通過ExecutionEntity來完成具體的操作,這里,封裝了一些基本的原子操作,它們都是AtomicOperation的接口實現:
流程實例類:
- AtomicOperationProcessStart
- AtomicOperationProcessStartInitial
- AtomicOperationProcessEnd
流程活動類:
- AtomicOperationActivityStart
- AtomicOperationActivityExecute
- AtomicOperationActivityEnd
流程活動流轉類:
- AtomicOperationTransitionNotifyListenerEnd
- AtomicOperationTransitionDestroyScope
- AtomicOperationTransitionNotifyListenerTake
- AtomicOperationTransitionCreateScope
- AtomicOperationTransitionNotifyListenerStart
流程執行樹清理類:
- AtomicOperationDeleteCascade
- AtomicOperationDeleteCascadeFireActivityEnd
其中,活動執行時,具體動作是由ActivityBehavior的實現類來決定的:
關於默認ID生成器
Activiti的默認生成器是DbIdGenerator,實際是一次從數據庫中取一塊數據,然后慢慢用,用完后再取。
/**
* @author Tom Baeyens
*/
public class DbIdGenerator implements IdGenerator {
protected int idBlockSize;
protected long nextId = 0;
protected long lastId = -1;
protected CommandExecutor commandExecutor;
protected CommandConfig commandConfig;
public synchronized String getNextId() {
if (lastId<nextId) {
getNewBlock();
}
long _nextId = nextId++;
return Long.toString(_nextId);
}
protected synchronized void getNewBlock() {
IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));
this.nextId = idBlock.getNextId();
this.lastId = idBlock.getLastId();
}
- 在ProcessEngineConfiguration中定義的塊默認大小100;
- 在ProcessEngineComfigurationImpl中,完成初始化:
// id generator /////////////////////////////////////////////////////////////
protected void initIdGenerator() {
if (idGenerator==null) {
CommandExecutor idGeneratorCommandExecutor = null;
if (idGeneratorDataSource!=null) {
ProcessEngineConfigurationImpl processEngineConfiguration = new StandaloneProcessEngineConfiguration();
processEngineConfiguration.setDataSource(idGeneratorDataSource);
processEngineConfiguration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_FALSE);
processEngineConfiguration.init();
idGeneratorCommandExecutor = processEngineConfiguration.getCommandExecutor();
} else if (idGeneratorDataSourceJndiName!=null) {
ProcessEngineConfigurationImpl processEngineConfiguration = new StandaloneProcessEngineConfiguration();
processEngineConfiguration.setDataSourceJndiName(idGeneratorDataSourceJndiName);
processEngineConfiguration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_FALSE);
processEngineConfiguration.init();
idGeneratorCommandExecutor = processEngineConfiguration.getCommandExecutor();
} else {
idGeneratorCommandExecutor = getCommandExecutor();
}
DbIdGenerator dbIdGenerator = new DbIdGenerator();
dbIdGenerator.setIdBlockSize(idBlockSize);
dbIdGenerator.setCommandExecutor(idGeneratorCommandExecutor);
dbIdGenerator.setCommandConfig(getDefaultCommandConfig().transactionRequiresNew());
idGenerator = dbIdGenerator;
}
}
注意:此處對getNextId()方法加了synchronize關鍵字,它在單機部署下,確定不會出現網上分析的什么ID重復問題。
關於task的start_time_字段取值問題
在TaskEntity中:
/** creates and initializes a new persistent task. */
public static TaskEntity createAndInsert(ActivityExecution execution) {
TaskEntity task = create();
task.insert((ExecutionEntity) execution);
return task;
}
public void insert(ExecutionEntity execution) {
CommandContext commandContext = Context.getCommandContext();
DbSqlSession dbSqlSession = commandContext.getDbSqlSession();
dbSqlSession.insert(this);
if(execution != null) {
execution.addTask(this);
}
commandContext.getHistoryManager().recordTaskCreated(this, execution);
}
/*
* 。。。。
*/
/** Creates a new task. Embedded state and create time will be initialized.
* But this task still will have to be persisted. See {@link #insert(ExecutionEntity)}. */
public static TaskEntity create() {
TaskEntity task = new TaskEntity();
task.isIdentityLinksInitialized = true;
task.createTime = ClockUtil.getCurrentTime();
return task;
}
由此知道,活動的時間是由ClockUtil.getCurrentTime()決定的。再來看看CockUtil的源碼:
/**
* @author Joram Barrez
*/
public class ClockUtil {
private volatile static Date CURRENT_TIME = null;
public static void setCurrentTime(Date currentTime) {
ClockUtil.CURRENT_TIME = currentTime;
}
public static void reset() {
ClockUtil.CURRENT_TIME = null;
}
public static Date getCurrentTime() {
if (CURRENT_TIME != null) {
return CURRENT_TIME;
}
return new Date();
}
}
注意:
因為可能多線程情況,而且只有set,不會執行類似++,--這樣的操作,所以這里用volatile關鍵字完全滿足需要。
默認實現在分布式中問題
在上面介紹了DbIdGenerator以及ClockUtil之后,可以清楚明白他們的原理,那么在分布式部署中,如果還是使用這種默認的實現而不加以改善,會出現什么問題。
1.DbIdGenerator的getNextId()/getNewBlock()兩個方法,在分布式主機中,synchronize不能順利實現鎖控制;
2.ClockUtil嚴重依賴容器所在服務器時間,但是分布式主機的時間不可能達到完全的同步;
3.在分布式主機中,對同一個任務,可以同時執行,因為他們都是DbSqlSession緩存,不會立馬入庫;也就是說,可能存在一個任務被同時自行兩次的情況。
對1,2兩點分析,基本上是確定會存在的,至於第三點,限於猜想,不知道實際是否有相關的解決策略,目前對activiti關於此處的設置猜想還沒有完全弄清楚。
其實之所以那么在乎任務Id以及任務執行時間,主要是在流程跟蹤圖中,需要根據有序的歷史任務結果集模仿重現走過的路徑,而做到有序,這兩個要素是非常關鍵的。
對分布式應用,如果同庫,那ID的生成問題都會是一個問題,常見的解決方式是把他扔給數據庫去解決,比如一個序列、一個類似序列的自定義函數等都是不錯的選擇。
當然,放到DB中以后,那么頻繁訪問數據庫,activiti中設計的blocksize也就基本失效了,這個也算是衡量的結果吧。
實際問題分析
上述數據,是在生產上出現的問題數據,環境是為了負載均衡做了兩個應用Server,同時連接一個DB;
從數據可以分析出“活動節點會在雙機中運行”;
61011(A) >> 60852(B) >> 60866(B) >> 61022(A) >> 61028(A) >> 60889(B) >> 60893(B)
A機器上的61011執行完畢以后,事件如何轉到B機器上的60852,這個還不明白,待解決!!



