activiti源碼分析(一)設計模式


  對activiti有基本了解的朋友都知道,activiti暴露了七個接口來提供工作流的相關服務,這些接口具體是如何實現的呢?查看源碼發現其實現的形式大體如下: 

public class RuntimeServiceImpl extends ServiceImpl implements RuntimeService {
  
  public ProcessInstance startProcessInstanceByKey(String processDefinitionKey) {
    return commandExecutor.execute(new StartProcessInstanceCmd<ProcessInstance>(processDefinitionKey, null, null, null));
  }

  public ProcessInstance startProcessInstanceByKey(String processDefinitionKey, String businessKey) {
    return commandExecutor.execute(new StartProcessInstanceCmd<ProcessInstance>(processDefinitionKey, null, businessKey, null));
  }

  ...
}

  service中的大部分方法都是通過調用commandExecutor.execute()完成的,然而點進去看則會發現什么都沒有:

public class CommandExecutorImpl implements CommandExecutor {

  private final CommandConfig defaultConfig;
  private final CommandInterceptor first;
  
  public CommandExecutorImpl(CommandConfig defaultConfig, CommandInterceptor first) {
    this.defaultConfig = defaultConfig;
    this.first = first;
  }
  
  public CommandInterceptor getFirst() {
    return first;
  }

  @Override
  public CommandConfig getDefaultConfig() {
    return defaultConfig;
  }
  
  @Override
  public <T> T execute(Command<T> command) {
    return execute(defaultConfig, command);
  }

  @Override
  public <T> T execute(CommandConfig config, Command<T> command) {
    return first.execute(config, command);
  }

}

  看到這里就會發現並不能看出這條語句究竟做了什么,那么究竟是如何提供服務的呢?其實activiti中大部分操作都是基於設計模式中的命令模式完成的(這里還使用了職責鏈模式,構造了命令攔截器鏈,用於在命令真正被執行之前做一系列操作)。下面結合源碼詳細介紹一下這些設計思路:

  命令模式的本質在於將命令進行封裝,發出命令和執行命令分離。職責鏈模式只需要將請求放入職責鏈上,其處理細節和傳遞都不需要考慮。activiti將這兩個模式整合在一起,構成了其服務主要的實現方式。其核心只有三個部分:CommandExecutor(命令執行器,用於執行命令),CommandInterceptor(命令攔截器,用於構建攔截器鏈),Command(命令自身)。這三個接口是整個核心的部分,還會涉及到其它的關鍵類,之后會一一說明,這三個類都在activiti-engine.jar這個activiti實現的核心包下,具體位置是:org.activiti.engine.impl.interceptor。下面由這三個接口逐步介紹相關的類和具體實現:

  三個接口源碼:

public interface Command <T> {

  T execute(CommandContext commandContext);
  
}

  

/**
 * The command executor for internal usage.
 */
public interface CommandExecutor {
  
  /**
   * @return the default {@link CommandConfig}, used if none is provided.
   */
  CommandConfig getDefaultConfig();

  /**
   * Execute a command with the specified {@link CommandConfig}.
   */
  <T> T execute(CommandConfig config, Command<T> command);

  /**
   * Execute a command with the default {@link CommandConfig}.
   */
  <T> T execute(Command<T> command);
  
}

  

public interface CommandInterceptor {

  <T> T execute(CommandConfig config, Command<T> command);
 
  CommandInterceptor getNext();

  void setNext(CommandInterceptor next);

}

  Command的接口中只有一個execute方法,這里才是寫命令的具體實現,而CommandExecutor的實現類在上面已經給出,其包含了一個CommandConfig和一個命令攔截器CommandInterceptor,而執行的execute(command)方法,實際上調用的就是commandInterceptor.execute(commandConfig,command)。CommandInterceptor中包含了一個set和get方法,用於設置next(實際上就是下一個CommandInterceptor)變量。想象一下,這樣就能夠通過這種形式找到攔截器鏈的下一個攔截器鏈,就可以將命令傳遞下去。

  簡單梳理一下:Service實現服務的其中一個標准方法是在具體服務中調用commandExecutor.execute(new command())(這里的command是具體的命令)。其執行步驟就是命令執行器commandExecutor.execute調用了其內部變量CommandInterceptor first(第一個命令攔截器)的execute方法(加上了參數commandConfig)。CommandInterceptor類中包含了一個CommandInterceptor對象next,用於指向下一個CommandInterceptor,在攔截器的execute方法中,只需要完成其對應的相關操作,然后執行一下next.execute(commandConfig,command),就可以很簡單的將命令傳遞給下一個命令攔截器,然后在最后一個攔截器中執行command.execute(),調用這個命令最終要實現的內容就行了。

  實現一個自定義的命令只需要實現Command<T>接口,在execute中做相應的操作就行了,而實現一個自定義的命令攔截器需要繼承AbstractCommandInterceptor,在execute中做相應的處理,最后調用next.execute()即可,而命令執行器雖然也可以自己實現,但是沒有多大意義,非常麻煩。前面說過,命令執行器會先執行命令攔截器鏈的execute方法,但命令攔截器鏈是如何構建的,命令又是在哪里調用的,第一個攔截器是如何添加到命令執行器的,這些都要關注於Activiti工作流引擎的初始化。

  初始化的方法主要寫在了org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl類的init()方法中,這里主要關注於其中的initCommandExecutors(),如果對activiti的配置不清楚的,可以好好的了解一下這個初始化過程。

  initCommandExecutors():

 protected void initCommandExecutors() {
    initDefaultCommandConfig();
    initSchemaCommandConfig();
    initCommandInvoker();
    initCommandInterceptors();
    initCommandExecutor();
  }

  這五個方法名很清楚地說明了初始化步驟,前兩步都是初始化CommandConfig,第一個就是命令執行器的defaultConfig,主要用在transaction攔截器。第三步初始化命令執行者,這也是一個攔截器,不過其放在攔截器的尾端,最后一個執行,它的execute方法就是調用了command.execute()。第四步就是初始化命令攔截器了。最后一步初始化命令執行器。

  前三步相關的類:

/**
*  CommandConfig實際就這兩個配置
*/
public class CommandConfig {

  private boolean contextReusePossible;
  private TransactionPropagation propagation;
  
  // DefaultConfig
  public CommandConfig() {
    this.contextReusePossible = true;
    this.propagation = TransactionPropagation.REQUIRED;
  }

  // SchemaCommandConfig
  public CommandConfig transactionNotSupported() {
    CommandConfig config = new CommandConfig();
    config.contextReusePossible = false;
    config.propagation = TransactionPropagation.NOT_SUPPORTED;
    return config;
  }
}

  CommandInvoker:

public class CommandInvoker extends AbstractCommandInterceptor {

  @Override
  public <T> T execute(CommandConfig config, Command<T> command) {
    return command.execute(Context.getCommandContext());
  }

  @Override
  public CommandInterceptor getNext() {
    return null;
  }

  @Override
  public void setNext(CommandInterceptor next) {
    throw new UnsupportedOperationException("CommandInvoker must be the last interceptor in the chain");
  }

}

  接下來看看關鍵的第四步:

protected void initCommandInterceptors() {
    if (commandInterceptors==null) {
      commandInterceptors = new ArrayList<CommandInterceptor>();
      if (customPreCommandInterceptors!=null) {
        commandInterceptors.addAll(customPreCommandInterceptors);
      }
      commandInterceptors.addAll(getDefaultCommandInterceptors());
      if (customPostCommandInterceptors!=null) {
        commandInterceptors.addAll(customPostCommandInterceptors);
      }
      commandInterceptors.add(commandInvoker);
    }
  }

  protected Collection< ? extends CommandInterceptor> getDefaultCommandInterceptors() {
    List<CommandInterceptor> interceptors = new ArrayList<CommandInterceptor>();
    interceptors.add(new LogInterceptor());
    
    CommandInterceptor transactionInterceptor = createTransactionInterceptor();
    if (transactionInterceptor != null) {
      interceptors.add(transactionInterceptor);
    }
    
    interceptors.add(new CommandContextInterceptor(commandContextFactory, this));
    return interceptors;
  }

  這段代碼可以看出,activiti提供了默認的命令攔截器,其順序是LogInterceptor->TransactionInterceptor->CommandContextInterceptor,也能看出activiti提供了配置自定義的攔截器可能,customPreCommandInterceptors和customPostCommandInterceptors,只需要set進入配置就行了。一個在默認攔截器之前,一個在之后,最后一個添加的就是commandInvoker。最終的命令攔截器鏈就是customPreCommandInterceptors->LogInterceptor->TransactionInterceptor->CommandContextInterceptor->customPostCommandInterceptors->commandInvoker。

  最后一步初始化命令執行器代碼包括了構建攔截器鏈:

  protected void initCommandExecutor() {
    if (commandExecutor==null) {
      CommandInterceptor first = initInterceptorChain(commandInterceptors);
      commandExecutor = new CommandExecutorImpl(getDefaultCommandConfig(), first);
    }
  }

  protected CommandInterceptor initInterceptorChain(List<CommandInterceptor> chain) {
    if (chain==null || chain.isEmpty()) {
      throw new ActivitiException("invalid command interceptor chain configuration: "+chain);
    }
    for (int i = 0; i < chain.size()-1; i++) {
      chain.get(i).setNext( chain.get(i+1) );
    }
    return chain.get(0);
  }

  最后我們看一看默認提供的三個攔截器都做了一些什么操作(不包括最后CommandInvoker,上面已給出)。

  LogInterceptor.execute():

    if (!log.isDebugEnabled()) {
      // do nothing here if we cannot log
      return next.execute(config, command);
    }
    log.debug("\n");
    log.debug("--- starting {} --------------------------------------------------------", command.getClass().getSimpleName());
    try {

      return next.execute(config, command);

    } finally {
      log.debug("--- {} finished --------------------------------------------------------", command.getClass().getSimpleName());
      log.debug("\n");
    }

  TransactionInterceptor.execute()(這是一個抽象的方法,需要自己實現,下面以與spring集成后所給的實現為例)

    protected CommandInterceptor createTransactionInterceptor() {
        if (transactionManager == null) {
            throw new ActivitiException("transactionManager is required property for SpringProcessEngineConfiguration, use "
                    + StandaloneProcessEngineConfiguration.class.getName() + " otherwise");
        }

        return new SpringTransactionInterceptor(transactionManager);
    }

  SpringTransactionInterceptor:

public class SpringTransactionInterceptor extends AbstractCommandInterceptor {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpringTransactionInterceptor.class);

    protected PlatformTransactionManager transactionManager;

    public SpringTransactionInterceptor(PlatformTransactionManager transactionManager) {
        this.transactionManager = transactionManager;
    }

    public <T> T execute(final CommandConfig config, final Command<T> command) {
        LOGGER.debug("Running command with propagation {}", config.getTransactionPropagation());

        TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
        transactionTemplate.setPropagationBehavior(getPropagation(config));

        T result = transactionTemplate.execute(new TransactionCallback<T>() {
            public T doInTransaction(TransactionStatus status) {
                return next.execute(config, command);
            }
        });

        return result;
    }

    private int getPropagation(CommandConfig config) {
        switch (config.getTransactionPropagation()) {
            case NOT_SUPPORTED:
                return TransactionTemplate.PROPAGATION_NOT_SUPPORTED;
            case REQUIRED:
                return TransactionTemplate.PROPAGATION_REQUIRED;
            case REQUIRES_NEW:
                return TransactionTemplate.PROPAGATION_REQUIRES_NEW;
            default:
                throw new ActivitiIllegalArgumentException("Unsupported transaction propagation: " + config.getTransactionPropagation());
        }
    }
}

  最后一個CommandContextInterceptor.execute():

  public <T> T execute(CommandConfig config, Command<T> command) {
    CommandContext context = Context.getCommandContext();
    
    boolean contextReused = false;
    // We need to check the exception, because the transaction can be in a rollback state,
    // and some other command is being fired to compensate (eg. decrementing job retries)
    if (!config.isContextReusePossible() || context == null || context.getException() != null) { 
    	context = commandContextFactory.createCommandContext(command);    	
    }  
    else {
    	log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName());
    	contextReused = true;
    }

    try {
      // Push on stack
      Context.setCommandContext(context);
      Context.setProcessEngineConfiguration(processEngineConfiguration);
      
      return next.execute(config, command);
      
    } catch (Exception e) {
    	
      context.exception(e);
      
    } finally {
      try {
    	  if (!contextReused) {
    		  context.close();
    	  }
      } finally {
    	  // Pop from stack
    	  Context.removeCommandContext();
    	  Context.removeProcessEngineConfiguration();
    	  Context.removeBpmnOverrideContext();
      }
    }
    
    return null;
  }

  這里值得注意的是context.close()方法,這里將調用session.flush();,真正執行完成數據庫操作。Context也是一個比較重要的類,有興趣可以研究一下。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM