liteFlow源碼解析


liteFlow簡介

liteFlow是一個輕量級微流程框架.liteFlow能夠幫助你的項目實現業務組件化
liteFlow能最大程度上解耦,支持即時調整策略的一個中間件

流程架構圖

image

項目源碼解析

以官網liteflow-example為例

1.自動裝配

解析liteflow的starter中配置文件

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.yomahub.liteflow.springboot.LiteflowAutoConfiguration,\
  com.yomahub.liteflow.springboot.LiteflowExecutorInit
  • LiteflowAutoConfiguration
@Configuration
@EnableConfigurationProperties(LiteflowProperty.class)
@ConditionalOnProperty(prefix = "liteflow", name = "rule-source")
public class LiteflowAutoConfiguration {

    @Bean
    public ComponentScaner componentScaner(){
        return new ComponentScaner();
    }

    @Bean
    public FlowExecutor flowExecutor(LiteflowProperty property){
        if(StringUtils.isNotBlank(property.getRuleSource())){
            List<String> ruleList = Lists.newArrayList(property.getRuleSource().split(","));
            FlowExecutor flowExecutor = new FlowExecutor();
            flowExecutor.setRulePath(ruleList);
            return flowExecutor;
        }else{
            return null;
        }
    }
}

這個類設置了liteflow的配置參數, 關鍵在於判斷liteflow配置文件的condition以及配置了ruleSource

擴展一下ComponentScaner

public class ComponentScaner implements BeanPostProcessor, PriorityOrdered {

	private static final Logger LOG = LoggerFactory.getLogger(ComponentScaner.class);

	public static Map<String, NodeComponent> nodeComponentMap = new HashMap<String, NodeComponent>();

	static {
		LOGOPrinter.print();
	}

	@Override
	public int getOrder() {
		return Ordered.LOWEST_PRECEDENCE;
	}

	@SuppressWarnings("rawtypes")
	@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
		Class clazz = bean.getClass();
		if(NodeComponent.class.isAssignableFrom(clazz)){
			LOG.info("component[{}] has been found",beanName);
			NodeComponent nodeComponent = (NodeComponent)bean;
			nodeComponent.setNodeId(beanName);
			nodeComponentMap.put(beanName, nodeComponent);
		}
		return bean;
	}

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		return bean;
	}
}

關鍵點在於實現了BeanPostProcessor, 在nodeComponentMap中放入了所有的NodeComponent實現類(業務實現組件)

  • LiteflowExecutorInit
public class LiteflowExecutorInit implements InitializingBean {

    @Resource
    private FlowExecutor flowExecutor;

    @Override
    public void afterPropertiesSet() throws Exception {
        flowExecutor.init();
    }
}

init()具體代碼, liteflow支持三種格式配置文件(xml, Java配置類, zk), 本文以xml配置文件為例

public void init() {
		XmlFlowParser parser = null;
		for(String path : rulePath){
			try {
				if(isLocalConfig(path)) {
					parser = new LocalXmlFlowParser();
				}else if(isZKConfig(path)){
					if(StringUtils.isNotBlank(zkNode)) {
						parser = new ZookeeperXmlFlowParser(zkNode);
					}else {
						parser = new ZookeeperXmlFlowParser();
					}
				}else if(isClassConfig(path)) {
					Class c = Class.forName(path);
					parser = (XmlFlowParser)c.newInstance();
				}
				parser.parseMain(path);
			} catch (Exception e) {
				String errorMsg = MessageFormat.format("init flow executor cause error,cannot parse rule file{0}", path);
				LOG.error(errorMsg,e);
				throw new FlowExecutorNotInitException(errorMsg);
			}
		}
	}

這個類初始化了流程執行器的所需的數據到FlowBus, parserMain的解析方法具體不再展開

2.組件介紹

執行器

執行器FlowExecutor用來執行一個流程,用法為

public <T extends Slot> T execute(String chainId,Object param)
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz)
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain)
  • chainId: 流程流id
  • param: 流程入參
  • slotClazz: Slot的子類(業務中建議自己實現, 默認為DefaultSlot)
  • isInnerChain: 是否節點內執行

數據槽

在執行器執行流程時會分配唯一的一個數據槽給這個請求。不同請求的數據槽是完全隔離的。
默認的數據槽主要是一個ConcurrentHashMap,里面存放着liteFlow的元數據.還有一個雙向隊列ArrayDeque, 里面存放着步驟數據.所以建議根據業務自行實現.

組件節點

繼承NodeComponent
實現process()方法來實現業務
建議實現isAccess()來判斷是否進入該組件
其他幾個可以覆蓋的方法有:
方法isContinueOnError:表示出錯是否繼續往下執行下一個組件
方法isEnd:表示是否立即結束整個流程
在組件節點里,隨時可以通過方法getSlot獲取當前的數據槽,從而可以獲取任何數據。

路由節點

<flow>
    <chain name="chain1">
        <then value="node1, node2, node3" />
        <then value="node4(node5 | node6)" />
        <when value="node7, node8" />
        <then value="node9, chain2" />
    </chain>
    
    <chain name="chain2">
        <then value="node9" />
    </chain>
</flow>

1.then節點代表順序執行, when節點代表異步(同時執行),
2.node4節點是條件組件繼承NodeCondComponent實現processCond()方法, 返回需要執行的節點字符串("node5"或"node6")
3. 2.3.0后支持子流程如在chain1中調用chain2, 也可以在代碼內隱式調用

3.執行器的運作

FlowExecutor的excute方法

public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain) throws Exception{
		Slot slot = null;
		try{
		    // 判斷FlowBus中chainMap是否為空
			if(FlowBus.needInit()) {
			    // 初始化方法, 在自動裝配中提到了
				init();
			}
            // 根據chainId獲取對應的chain
			Chain chain = FlowBus.getChain(chainId);
            
			if(chain == null){
				String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId);
				throw new ChainNotFoundException(errorMsg);
			}
            // 如果不是子流程且數據槽還未獲取(該方法有可能被invoke隱式代用)
			if(!isInnerChain && slotIndex == null) {
			    // 從DataBus中遍歷slots取一個值為null的下標, 賦值一個slot實例后返回
				slotIndex = DataBus.offerSlot(slotClazz);
				LOG.info("slot[{}] offered",slotIndex);
			}

			if(slotIndex == -1){
				throw new NoAvailableSlotException("there is no available slot");
			}

			slot = DataBus.getSlot(slotIndex);
			if(slot == null) {
				throw new NoAvailableSlotException("the slot is not exist");
			}
            // 往當前slot增加一個當前時間的唯一ID
			if(StringUtils.isBlank(slot.getRequestId())) {
				slot.generateRequestId();
				LOG.info("requestId[{}] has generated",slot.getRequestId());
			}
            // 如果不是子流程則加當前入參和chaiId設置到slot
			if(!isInnerChain) {
				slot.setRequestData(param);
				slot.setChainName(chainId);
			}else {
			    // 主流程已有入參, 當前入參加到子流程的key
				slot.setChainReqData(chainId, param);
			}

			// 詳解見Chain的execute()方法
			chain.execute(slotIndex);

			return (T)slot;
		}catch(Exception e){
			String errorMsg = MessageFormat.format("[{0}]executor cause error", slot.getRequestId());
			LOG.error(errorMsg,e);
			throw e;
		}finally{
			if(!isInnerChain) {
				slot.printStep();
				DataBus.releaseSlot(slotIndex);
			}
		}
	}

Chain的execute()方法

	public void execute(Integer slotIndex) throws Exception{
		if(CollectionUtils.isEmpty(conditionList)){
			throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
		}

		Slot slot = DataBus.getSlot(slotIndex);
        // 遍歷chain中的conditionList(xml中then或者when節點)
		for (Condition condition : conditionList){
		    // 如果condition為then則順序執行value中的node節點
			if(condition instanceof ThenCondition){
				for(Executable executableItem : condition.getNodeList()){
					try{
					    // 詳解見Executable的execute方法
						executableItem.execute(slotIndex);
					}catch (ChainEndException e){
						break;
					}
				}
			// 如果condition為when則創建多個線程執行value中的node節點
			}else if(condition instanceof WhenCondition){
			    // 使用計數器
		    	final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size());
				for(Executable executableItem : condition.getNodeList()){
					// 當前版本有兩個WhenConditionThread, 一個在FlowExecutor中內部類, 一個單獨抽離出來.(猜測內部類應該是抽離后忘記刪了)
					new WhenConditionThread(executableItem,slotIndex,slot.getRequestId(),latch).start();
				}
				// 等待計數器完成或者超過15秒
				latch.await(15, TimeUnit.SECONDS);
			}
		}
	}

Executable的execute方法

	public void execute(Integer slotIndex) throws Exception {
		if(instance == null){
			throw new FlowSystemException("there is no instance for node id " + id);
		}
		instance.setSlotIndex(slotIndex);
		Slot slot = DataBus.getSlot(slotIndex);

		try{
		    // 可以重寫isAccess()決定是否執行
			if(instance.isAccess()){
                // 詳解見NodeComponent的excute方法
				instance.execute();
                // NodeComponent中有一個InheritableThreadLocal<Boolean> isEndTL, 來判斷是否停止整個流程.
                // InheritableThreadLocal可繼承,所以可以影響所有繼承了NodeComponent的組件. 但是ThreadLocal是每個線程獨有的, 所以對多線程的when並行流無效
				if(instance.isEnd()){
					LOG.info("[{}]:component[{}] lead the chain to end",slot.getRequestId(),instance.getClass().getSimpleName());
					// 拋出異常停止當前流程
					throw new ChainEndException("component lead the chain to end");
				}
			}else{
				LOG.info("[{}]:[X]skip component[{}] execution",slot.getRequestId(),instance.getClass().getSimpleName());
			}
		}catch (Exception e){
		    // 重寫isContinueOnError()方法決定發生錯誤后是否繼續執行(會使isEndTL無效)
			if(instance.isContinueOnError()){
				String errorMsg = MessageFormat.format("[{0}]:component[{1}] cause error,but flow is still go on", slot.getRequestId(),id);
				LOG.error(errorMsg,e);
			}else{
				String errorMsg = MessageFormat.format("[{0}]:component[{1}] cause error",slot.getRequestId(),id);
				LOG.error(errorMsg,e);
				throw e;
			}
		}finally {
		    // 移除NodeComponent中的InheritableThreadLocal<Integer> slotIndexTL(用於slotIndex的傳遞)
			instance.removeSlotIndex();
			// 移除isEndTL
			instance.removeIsEnd();
		}
	}

NodeComponent的excute方法

public void execute() throws Exception{
		Slot slot = this.getSlot();
		LOG.info("[{}]:[O]start component[{}] execution",slot.getRequestId(),this.getClass().getSimpleName());
		slot.addStep(new CmpStep(nodeId, CmpStepType.SINGLE));
		StopWatch stopWatch = new StopWatch();
		stopWatch.start();
        // 執行重寫的業務方法(如果是NodeCondComponent組件會將要運行的nodeId綁定到solt中)
		process();

		stopWatch.stop();
		long timeSpent = stopWatch.getTime();

//		slot.addStep(new CmpStep(nodeId, CmpStepType.END));

		//性能統計
		CompStatistics statistics = new CompStatistics();
		statistics.setComponentClazzName(this.getClass().getSimpleName());
		statistics.setTimeSpent(timeSpent);
		MonitorBus.load().addStatistics(statistics);

        // condition組件
		if(this instanceof NodeCondComponent){
		    // 獲取到綁定的nodeId, 並執行
			String condNodeId = slot.getCondResult(this.getClass().getName());
			if(StringUtils.isNotBlank(condNodeId)){
				Node thisNode = FlowBus.getNode(nodeId);
				Executable condExecutor = thisNode.getCondNode(condNodeId);
				if(condExecutor != null){
				    // 執行對應的NodeCompenent
					condExecutor.execute(slotIndexTL.get());
				}
			}
		}

		LOG.debug("[{}]:componnet[{}] finished in {} milliseconds",slot.getRequestId(),this.getClass().getSimpleName(),timeSpent);
	}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM