liteFlow簡介
liteFlow是一個輕量級微流程框架.liteFlow能夠幫助你的項目實現業務組件化
liteFlow能最大程度上解耦,支持即時調整策略的一個中間件
流程架構圖

項目源碼解析
以官網liteflow-example為例
1.自動裝配
解析liteflow的starter中配置文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.yomahub.liteflow.springboot.LiteflowAutoConfiguration,\
com.yomahub.liteflow.springboot.LiteflowExecutorInit
- LiteflowAutoConfiguration
@Configuration
@EnableConfigurationProperties(LiteflowProperty.class)
@ConditionalOnProperty(prefix = "liteflow", name = "rule-source")
public class LiteflowAutoConfiguration {
@Bean
public ComponentScaner componentScaner(){
return new ComponentScaner();
}
@Bean
public FlowExecutor flowExecutor(LiteflowProperty property){
if(StringUtils.isNotBlank(property.getRuleSource())){
List<String> ruleList = Lists.newArrayList(property.getRuleSource().split(","));
FlowExecutor flowExecutor = new FlowExecutor();
flowExecutor.setRulePath(ruleList);
return flowExecutor;
}else{
return null;
}
}
}
這個類設置了liteflow的配置參數, 關鍵在於判斷liteflow配置文件的condition以及配置了ruleSource
擴展一下ComponentScaner
public class ComponentScaner implements BeanPostProcessor, PriorityOrdered {
private static final Logger LOG = LoggerFactory.getLogger(ComponentScaner.class);
public static Map<String, NodeComponent> nodeComponentMap = new HashMap<String, NodeComponent>();
static {
LOGOPrinter.print();
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@SuppressWarnings("rawtypes")
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Class clazz = bean.getClass();
if(NodeComponent.class.isAssignableFrom(clazz)){
LOG.info("component[{}] has been found",beanName);
NodeComponent nodeComponent = (NodeComponent)bean;
nodeComponent.setNodeId(beanName);
nodeComponentMap.put(beanName, nodeComponent);
}
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
}
關鍵點在於實現了BeanPostProcessor, 在nodeComponentMap中放入了所有的NodeComponent實現類(業務實現組件)
- LiteflowExecutorInit
public class LiteflowExecutorInit implements InitializingBean {
@Resource
private FlowExecutor flowExecutor;
@Override
public void afterPropertiesSet() throws Exception {
flowExecutor.init();
}
}
init()具體代碼, liteflow支持三種格式配置文件(xml, Java配置類, zk), 本文以xml配置文件為例
public void init() {
XmlFlowParser parser = null;
for(String path : rulePath){
try {
if(isLocalConfig(path)) {
parser = new LocalXmlFlowParser();
}else if(isZKConfig(path)){
if(StringUtils.isNotBlank(zkNode)) {
parser = new ZookeeperXmlFlowParser(zkNode);
}else {
parser = new ZookeeperXmlFlowParser();
}
}else if(isClassConfig(path)) {
Class c = Class.forName(path);
parser = (XmlFlowParser)c.newInstance();
}
parser.parseMain(path);
} catch (Exception e) {
String errorMsg = MessageFormat.format("init flow executor cause error,cannot parse rule file{0}", path);
LOG.error(errorMsg,e);
throw new FlowExecutorNotInitException(errorMsg);
}
}
}
這個類初始化了流程執行器的所需的數據到FlowBus, parserMain的解析方法具體不再展開
2.組件介紹
執行器
執行器FlowExecutor用來執行一個流程,用法為
public <T extends Slot> T execute(String chainId,Object param)
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz)
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain)
- chainId: 流程流id
- param: 流程入參
- slotClazz: Slot的子類(業務中建議自己實現, 默認為DefaultSlot)
- isInnerChain: 是否節點內執行
數據槽
在執行器執行流程時會分配唯一的一個數據槽給這個請求。不同請求的數據槽是完全隔離的。
默認的數據槽主要是一個ConcurrentHashMap,里面存放着liteFlow的元數據.還有一個雙向隊列ArrayDeque, 里面存放着步驟數據.所以建議根據業務自行實現.
組件節點
繼承NodeComponent
實現process()方法來實現業務
建議實現isAccess()來判斷是否進入該組件
其他幾個可以覆蓋的方法有:
方法isContinueOnError:表示出錯是否繼續往下執行下一個組件
方法isEnd:表示是否立即結束整個流程
在組件節點里,隨時可以通過方法getSlot獲取當前的數據槽,從而可以獲取任何數據。
路由節點
<flow>
<chain name="chain1">
<then value="node1, node2, node3" />
<then value="node4(node5 | node6)" />
<when value="node7, node8" />
<then value="node9, chain2" />
</chain>
<chain name="chain2">
<then value="node9" />
</chain>
</flow>
1.then節點代表順序執行, when節點代表異步(同時執行),
2.node4節點是條件組件繼承NodeCondComponent實現processCond()方法, 返回需要執行的節點字符串("node5"或"node6")
3. 2.3.0后支持子流程如在chain1中調用chain2, 也可以在代碼內隱式調用
3.執行器的運作
FlowExecutor的excute方法
public <T extends Slot> T execute(String chainId,Object param,Class<? extends Slot> slotClazz,Integer slotIndex,boolean isInnerChain) throws Exception{
Slot slot = null;
try{
// 判斷FlowBus中chainMap是否為空
if(FlowBus.needInit()) {
// 初始化方法, 在自動裝配中提到了
init();
}
// 根據chainId獲取對應的chain
Chain chain = FlowBus.getChain(chainId);
if(chain == null){
String errorMsg = MessageFormat.format("couldn't find chain with the id[{0}]", chainId);
throw new ChainNotFoundException(errorMsg);
}
// 如果不是子流程且數據槽還未獲取(該方法有可能被invoke隱式代用)
if(!isInnerChain && slotIndex == null) {
// 從DataBus中遍歷slots取一個值為null的下標, 賦值一個slot實例后返回
slotIndex = DataBus.offerSlot(slotClazz);
LOG.info("slot[{}] offered",slotIndex);
}
if(slotIndex == -1){
throw new NoAvailableSlotException("there is no available slot");
}
slot = DataBus.getSlot(slotIndex);
if(slot == null) {
throw new NoAvailableSlotException("the slot is not exist");
}
// 往當前slot增加一個當前時間的唯一ID
if(StringUtils.isBlank(slot.getRequestId())) {
slot.generateRequestId();
LOG.info("requestId[{}] has generated",slot.getRequestId());
}
// 如果不是子流程則加當前入參和chaiId設置到slot
if(!isInnerChain) {
slot.setRequestData(param);
slot.setChainName(chainId);
}else {
// 主流程已有入參, 當前入參加到子流程的key
slot.setChainReqData(chainId, param);
}
// 詳解見Chain的execute()方法
chain.execute(slotIndex);
return (T)slot;
}catch(Exception e){
String errorMsg = MessageFormat.format("[{0}]executor cause error", slot.getRequestId());
LOG.error(errorMsg,e);
throw e;
}finally{
if(!isInnerChain) {
slot.printStep();
DataBus.releaseSlot(slotIndex);
}
}
}
Chain的execute()方法
public void execute(Integer slotIndex) throws Exception{
if(CollectionUtils.isEmpty(conditionList)){
throw new FlowSystemException("no conditionList in this chain[" + chainName + "]");
}
Slot slot = DataBus.getSlot(slotIndex);
// 遍歷chain中的conditionList(xml中then或者when節點)
for (Condition condition : conditionList){
// 如果condition為then則順序執行value中的node節點
if(condition instanceof ThenCondition){
for(Executable executableItem : condition.getNodeList()){
try{
// 詳解見Executable的execute方法
executableItem.execute(slotIndex);
}catch (ChainEndException e){
break;
}
}
// 如果condition為when則創建多個線程執行value中的node節點
}else if(condition instanceof WhenCondition){
// 使用計數器
final CountDownLatch latch = new CountDownLatch(condition.getNodeList().size());
for(Executable executableItem : condition.getNodeList()){
// 當前版本有兩個WhenConditionThread, 一個在FlowExecutor中內部類, 一個單獨抽離出來.(猜測內部類應該是抽離后忘記刪了)
new WhenConditionThread(executableItem,slotIndex,slot.getRequestId(),latch).start();
}
// 等待計數器完成或者超過15秒
latch.await(15, TimeUnit.SECONDS);
}
}
}
Executable的execute方法
public void execute(Integer slotIndex) throws Exception {
if(instance == null){
throw new FlowSystemException("there is no instance for node id " + id);
}
instance.setSlotIndex(slotIndex);
Slot slot = DataBus.getSlot(slotIndex);
try{
// 可以重寫isAccess()決定是否執行
if(instance.isAccess()){
// 詳解見NodeComponent的excute方法
instance.execute();
// NodeComponent中有一個InheritableThreadLocal<Boolean> isEndTL, 來判斷是否停止整個流程.
// InheritableThreadLocal可繼承,所以可以影響所有繼承了NodeComponent的組件. 但是ThreadLocal是每個線程獨有的, 所以對多線程的when並行流無效
if(instance.isEnd()){
LOG.info("[{}]:component[{}] lead the chain to end",slot.getRequestId(),instance.getClass().getSimpleName());
// 拋出異常停止當前流程
throw new ChainEndException("component lead the chain to end");
}
}else{
LOG.info("[{}]:[X]skip component[{}] execution",slot.getRequestId(),instance.getClass().getSimpleName());
}
}catch (Exception e){
// 重寫isContinueOnError()方法決定發生錯誤后是否繼續執行(會使isEndTL無效)
if(instance.isContinueOnError()){
String errorMsg = MessageFormat.format("[{0}]:component[{1}] cause error,but flow is still go on", slot.getRequestId(),id);
LOG.error(errorMsg,e);
}else{
String errorMsg = MessageFormat.format("[{0}]:component[{1}] cause error",slot.getRequestId(),id);
LOG.error(errorMsg,e);
throw e;
}
}finally {
// 移除NodeComponent中的InheritableThreadLocal<Integer> slotIndexTL(用於slotIndex的傳遞)
instance.removeSlotIndex();
// 移除isEndTL
instance.removeIsEnd();
}
}
NodeComponent的excute方法
public void execute() throws Exception{
Slot slot = this.getSlot();
LOG.info("[{}]:[O]start component[{}] execution",slot.getRequestId(),this.getClass().getSimpleName());
slot.addStep(new CmpStep(nodeId, CmpStepType.SINGLE));
StopWatch stopWatch = new StopWatch();
stopWatch.start();
// 執行重寫的業務方法(如果是NodeCondComponent組件會將要運行的nodeId綁定到solt中)
process();
stopWatch.stop();
long timeSpent = stopWatch.getTime();
// slot.addStep(new CmpStep(nodeId, CmpStepType.END));
//性能統計
CompStatistics statistics = new CompStatistics();
statistics.setComponentClazzName(this.getClass().getSimpleName());
statistics.setTimeSpent(timeSpent);
MonitorBus.load().addStatistics(statistics);
// condition組件
if(this instanceof NodeCondComponent){
// 獲取到綁定的nodeId, 並執行
String condNodeId = slot.getCondResult(this.getClass().getName());
if(StringUtils.isNotBlank(condNodeId)){
Node thisNode = FlowBus.getNode(nodeId);
Executable condExecutor = thisNode.getCondNode(condNodeId);
if(condExecutor != null){
// 執行對應的NodeCompenent
condExecutor.execute(slotIndexTL.get());
}
}
}
LOG.debug("[{}]:componnet[{}] finished in {} milliseconds",slot.getRequestId(),this.getClass().getSimpleName(),timeSpent);
}
