activiti主要組件解析


Activiti內部實現中,各主要部件關系

image

對外,提供Service服務,它是無狀態的。

這些Service包括:

  • protected RepositoryService repositoryService = new RepositoryServiceImpl();
  • protected RuntimeService runtimeService = new RuntimeServiceImpl();
  • protected HistoryService historyService = new HistoryServiceImpl();
  • protected IdentityService identityService = new IdentityServiceImpl();
  • protected TaskService taskService = new TaskServiceImpl();
  • protected FormService formService = new FormServiceImpl();
  • protected ManagementService managementService = new ManagementServiceImpl();

 

對service提供的服務,基本上每一個需要執行具體任務的方法,都有一個對應的Command實現與之對應。

  • 一般來說,一個Command對應一個完整事物;
  • 調用Command之前,都會經過CommandInterceptor,這個在初始化時就確定了,如:LogInterceptor >> CommandContextInterceptor >> CommandInvoker

 

如果Command不涉及節點相關內容,而是直接對數據庫進行操作,則直接關聯DB,入DbSqlSession的緩存;

否則,一般會通過ExecutionEntity來完成具體的操作,這里,封裝了一些基本的原子操作,它們都是AtomicOperation的接口實現:

流程實例類:

  • AtomicOperationProcessStart
  • AtomicOperationProcessStartInitial
  • AtomicOperationProcessEnd

流程活動類:

  • AtomicOperationActivityStart
  • AtomicOperationActivityExecute
  • AtomicOperationActivityEnd

流程活動流轉類:

  • AtomicOperationTransitionNotifyListenerEnd
  • AtomicOperationTransitionDestroyScope
  • AtomicOperationTransitionNotifyListenerTake
  • AtomicOperationTransitionCreateScope
  • AtomicOperationTransitionNotifyListenerStart

流程執行樹清理類:

  • AtomicOperationDeleteCascade
  • AtomicOperationDeleteCascadeFireActivityEnd

 

其中,活動執行時,具體動作是由ActivityBehavior的實現類來決定的:

image

關於默認ID生成器

Activiti的默認生成器是DbIdGenerator,實際是一次從數據庫中取一塊數據,然后慢慢用,用完后再取。

/**
 * @author Tom Baeyens
 */
public class DbIdGenerator implements IdGenerator {

  protected int idBlockSize;
  protected long nextId = 0;
  protected long lastId = -1;
  
  protected CommandExecutor commandExecutor;
  protected CommandConfig commandConfig;
  
  public synchronized String getNextId() {
    if (lastId<nextId) {
      getNewBlock();
    }
    long _nextId = nextId++;
    return Long.toString(_nextId);
  }

  protected synchronized void getNewBlock() {
    IdBlock idBlock = commandExecutor.execute(commandConfig, new GetNextIdBlockCmd(idBlockSize));
    this.nextId = idBlock.getNextId();
    this.lastId = idBlock.getLastId();
  }
  • 在ProcessEngineConfiguration中定義的塊默認大小100;
  • 在ProcessEngineComfigurationImpl中,完成初始化:
  // id generator /////////////////////////////////////////////////////////////
  
  protected void initIdGenerator() {
    if (idGenerator==null) {
      CommandExecutor idGeneratorCommandExecutor = null;
      if (idGeneratorDataSource!=null) {
        ProcessEngineConfigurationImpl processEngineConfiguration = new StandaloneProcessEngineConfiguration();
        processEngineConfiguration.setDataSource(idGeneratorDataSource);
        processEngineConfiguration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_FALSE);
        processEngineConfiguration.init();
        idGeneratorCommandExecutor = processEngineConfiguration.getCommandExecutor();
      } else if (idGeneratorDataSourceJndiName!=null) {
        ProcessEngineConfigurationImpl processEngineConfiguration = new StandaloneProcessEngineConfiguration();
        processEngineConfiguration.setDataSourceJndiName(idGeneratorDataSourceJndiName);
        processEngineConfiguration.setDatabaseSchemaUpdate(DB_SCHEMA_UPDATE_FALSE);
        processEngineConfiguration.init();
        idGeneratorCommandExecutor = processEngineConfiguration.getCommandExecutor();
      } else {
        idGeneratorCommandExecutor = getCommandExecutor();
      }
      
      DbIdGenerator dbIdGenerator = new DbIdGenerator();
      dbIdGenerator.setIdBlockSize(idBlockSize);
      dbIdGenerator.setCommandExecutor(idGeneratorCommandExecutor);
      dbIdGenerator.setCommandConfig(getDefaultCommandConfig().transactionRequiresNew());
      idGenerator = dbIdGenerator;
    }
  }

注意:此處對getNextId()方法加了synchronize關鍵字,它在單機部署下,確定不會出現網上分析的什么ID重復問題。

關於task的start_time_字段取值問題

在TaskEntity中:

  /** creates and initializes a new persistent task. */
  public static TaskEntity createAndInsert(ActivityExecution execution) {
    TaskEntity task = create();
    task.insert((ExecutionEntity) execution);
    return task;
  }

  public void insert(ExecutionEntity execution) {
    CommandContext commandContext = Context.getCommandContext();
    DbSqlSession dbSqlSession = commandContext.getDbSqlSession();
    dbSqlSession.insert(this);
    
    if(execution != null) {
      execution.addTask(this);
    }
    
    commandContext.getHistoryManager().recordTaskCreated(this, execution);
  }

/*
* 。。。。
*/

  /**  Creates a new task.  Embedded state and create time will be initialized.
   * But this task still will have to be persisted. See {@link #insert(ExecutionEntity)}. */
  public static TaskEntity create() {
    TaskEntity task = new TaskEntity();
    task.isIdentityLinksInitialized = true;
    task.createTime = ClockUtil.getCurrentTime();
    return task;
  }

由此知道,活動的時間是由ClockUtil.getCurrentTime()決定的。再來看看CockUtil的源碼:

/**
 * @author Joram Barrez
 */
public class ClockUtil {
  
  private volatile static Date CURRENT_TIME = null;
  
  public static void setCurrentTime(Date currentTime) {
    ClockUtil.CURRENT_TIME = currentTime;
  }
  
  public static void reset() {
    ClockUtil.CURRENT_TIME = null;
  } 
  
  public static Date getCurrentTime() {
    if (CURRENT_TIME != null) {
      return CURRENT_TIME;
    }
    return new Date();
  }

}

注意:

因為可能多線程情況,而且只有set,不會執行類似++,--這樣的操作,所以這里用volatile關鍵字完全滿足需要。

默認實現在分布式中問題

在上面介紹了DbIdGenerator以及ClockUtil之后,可以清楚明白他們的原理,那么在分布式部署中,如果還是使用這種默認的實現而不加以改善,會出現什么問題。

1.DbIdGenerator的getNextId()/getNewBlock()兩個方法,在分布式主機中,synchronize不能順利實現鎖控制;

2.ClockUtil嚴重依賴容器所在服務器時間,但是分布式主機的時間不可能達到完全的同步;

3.在分布式主機中,對同一個任務,可以同時執行,因為他們都是DbSqlSession緩存,不會立馬入庫;也就是說,可能存在一個任務被同時自行兩次的情況。

 

對1,2兩點分析,基本上是確定會存在的,至於第三點,限於猜想,不知道實際是否有相關的解決策略,目前對activiti關於此處的設置猜想還沒有完全弄清楚。

 

其實之所以那么在乎任務Id以及任務執行時間,主要是在流程跟蹤圖中,需要根據有序的歷史任務結果集模仿重現走過的路徑,而做到有序,這兩個要素是非常關鍵的。

對分布式應用,如果同庫,那ID的生成問題都會是一個問題,常見的解決方式是把他扔給數據庫去解決,比如一個序列、一個類似序列的自定義函數等都是不錯的選擇。

當然,放到DB中以后,那么頻繁訪問數據庫,activiti中設計的blocksize也就基本失效了,這個也算是衡量的結果吧。

實際問題分析

image

上述數據,是在生產上出現的問題數據,環境是為了負載均衡做了兩個應用Server,同時連接一個DB;

從數據可以分析出“活動節點會在雙機中運行”;

61011(A) >> 60852(B) >> 60866(B) >> 61022(A) >> 61028(A) >> 60889(B) >> 60893(B)

A機器上的61011執行完畢以后,事件如何轉到B機器上的60852,這個還不明白,待解決!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM