事件處理業務的簡易組件編排框架


背景

很多入侵事件類型的處理流程非常相似,只有少部分的差異。 按目前的方式,每種入侵事件類型都要開發一遍,很多相似的模板代碼,可復用性和可維護性都一般。這種情況下,可以采用業務編排的方式,來提升事件處理的可復用性和可維護性。

業務的組件編排,意味着需要將業務邏輯抽象成組件,然后將組件編排成業務流程。 基於組件編程思想。

業務的組件編排和工作流引擎有點類似。但我們的目標不是實現類似審批、采購這樣的流程,而是為了讓業務邏輯能夠更加可復用和可擴展,提升研發效率。因此,最終目標不是實現一個工作流引擎,而是實現一個相對靈活的業務組件編排和執行框架。


目標

  • 將通用和差異分離,最大程度復用已有邏輯,降低新入侵事件處理的開發量,提升其研發效率;
  • 業務邏輯組件化,改動只局限在新增的組件,遵循開閉原則,提升軟件可維護性。

適用場景

  • 具有相似性的模板流程的事件處理。

設計思路

要設計和實現一個事件處理的業務組件編排,需要解決如下問題:

  • 定義一個事件處理流程的基本概念集: 事件數據(EventData)、基於事件數據的事件處理上下文語境( FlowContext) 及構建者(FlowContextBuilder)、事件處理流程定義(EventFlow)、事件處理組件聲明和定義( ComponentProperties 和 FlowComponent);

  • 定義事件處理流程的基本模板(或者可以自定義);

  • 定義具體的業務事件處理的流程所需要的組件,並能根據指定業務類型來選擇合適的流程;

  • 定義具體的業務組件,並能根據組件的功能和類型來選擇合適的組件來執行。


整體框圖如下:

本文源代碼在 “ALLIN” 下的 cc.lovesq.flows 包下。

基本概念

事件數據 IEventData

  • 表示任何事件數據;不同的事件處理可以定義不同的具體類 EventData。

public interface EventData {

    /**
     * 獲取事件數據
     */
    String getData();

    /**
     * 獲取事件類型
     */
    String getType();
}


事件處理語境 FlowContext 和 FlowContextBuilder

  • 基於 EventData 構建,用於處理事件過程中的上下文語境。

public abstract class AbstractFlowContext<E extends EventData> {

    /** 事件對象 */
    protected E eventData;

    /** 事件對象中所含的業務 DO 對象 */
    protected DetectDO detectDO;

    /** 事件對象中所含的業務 DTO 對象 */
    protected DetectDTO detectDTO;

    /** 擴展信息 */
    private Map<String, Holder> extendInfo;

    public AbstractFlowContext() {
    }

    public AbstractFlowContext(E eventData, DetectDO detectDO, DetectDTO detectDTO) {
        this.eventData = eventData;
        this.detectDO = detectDO;
        this.detectDTO = detectDTO;
    }

    public E getEventData() {
        return eventData;
    }

    public String getData() {
        return eventData.getData();
    }

    public DetectDO getDetectDO() {
        return detectDO;
    }

    public DetectDTO getDetectDTO() {
        return detectDTO;
    }

    public <T> void put(String name, Holder<T> holder) {
        extendInfo.put(name, holder);
    }

    public <T> T get(String name, Class<T> cls) {
        return (T)extendInfo.get(name).getData();
    }


}

public interface FlowContextBuilder<E extends EventData> {

    /**
     * 根據事件數據構建事件處理的上下文語境
     */
    AbstractFlowContext build(E eventData);
}

public class DefaultDetectFlowBuilder implements FlowContextBuilder<DetectEventDataWrapper> {

    @Override
    public AbstractFlowContext build(DetectEventDataWrapper eventData) {
        String detectType = eventData.getDetectType();

        Class<DetectDO> doCls = DetectEventEnum.getDoClass(detectType);
        DetectDO detectDO = parse(eventData.getData(), doCls);

        Class<DetectDTO> dtoCls = DetectEventEnum.getDtoClass(detectType);
        DetectDTO detectDTO = parse(eventData.getData(), dtoCls);

        return new DefaultDetectFlowContext(eventData, detectDO, detectDTO);
    }

    private <T> T parse(String text, Class<T> cls) {
        JSONObject jsonObject = JSONObject.fromObject(text);
        return (T)JSONObject.toBean(jsonObject, cls);
    }
}

事件處理流程定義EventFlow

  • 定義處理事件數據的處理流程模板,這個流程是通過執行多個流程組件來實現的。

public interface EventFlow<E extends EventData> {

    /**
     * 處理事件的流程
     */
    void process(E eventData);
}

流程組件FlowComponent

  • 在事件處理流程中處理業務邏輯的業務組件;

  • 組件包含組件意圖和組件業務類型,作為選擇組件的依據;

  • 統一用 AbstractFlowContext 作為參數;

  • FlowComponent 的執行結果使用 FlowResult 來表示;FlowResult 包含一個可能的數據值,以及必要的一個枚舉 FlowDecision 決定繼續或結束流程。


@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface ComponentProperties {

    /** 組件的意圖或功能 */
    String purpose();

    /** 組件的業務類型選擇 */
    String biz() default "common";

}

/**
 * 事件處理流程中的一個業務組件
 */
public interface FlowComponent<I extends AbstractFlowContext> {
    FlowResult process(I flowContext);

    // 滿足此條件才能執行該組件,默認可以執行

    default boolean needAccess(I flowContext) {
        return true;
    }
}

定義一個具體組件:


public interface BigDataSender<I extends AbstractFlowContext> extends FlowComponent<I> {
    void sendEventMessage(I context);

    default FlowResult process(I context) {
        sendEventMessage(context);
        return new FlowResult(null, FlowDecision.Continue);
    }
}

@ComponentProperties(purpose = "BigDataSender")
@Component
public class DefaultDetectBigDataSender implements BigDataSender<DefaultDetectFlowContext> {

    private static Log log = LogFactory.getLog(DefaultDetectBigDataSender.class);

    @Override
    public void sendEventMessage(DefaultDetectFlowContext context) {
        String bigDataType = DetectEventEnum.getBigDataType(context.getEventData().getDetectType());
        String info = String.format("send: bigDataType=%s, msg=%s", bigDataType, JSON.toJSONString(context.getEventData().getData()));
        log.info(info);
    }
}


事件處理流程

事件處理流程的基本模板

STEP1:對於某一類事件,初始化 FlowContextBuilder flowContextBuilder 和 FlowComponents ;

STEP2:使用 flowContextBuilder 將 EventData 構建成 FlowContext ;

STEP3: 循環或並發調用 FlowComponent.process 進行 事件處理 。


public abstract class AbstractEventFlow<E extends EventData> implements EventFlow<E> {

    @Resource
    protected FlowComponentFactory flowComponentFactory;
    @Resource
    protected ComponentExecutor componentExecutor;

    public AbstractEventFlow(FlowComponentFactory flowComponentFactory) {
        this.flowComponentFactory = flowComponentFactory;
    }

    @Override
    public void process(E eventData) {

        AbstractFlowContext flowContext = builder().build(eventData);
        List<FlowComponent> flowComponents = flowComponents(eventData.getType());
        componentExecutor.execMayExit(flowContext, flowComponents);
    }

    /** 可以覆寫此方法來構建自己的 FlowContext */
    abstract public FlowContextBuilder builder();

    /** 可以覆寫此方法來指明流程所需要的業務處理組件 */
    abstract public List<FlowComponent> flowComponents(String detectType);
}

@Component
public class DefaultDetectEventFlower extends AbstractEventFlow<DetectEventDataWrapper> {

    private FlowContextBuilder flowContextBuilder = new DefaultDetectFlowBuilder();

    @Autowired
    public DefaultDetectEventFlower(FlowComponentFactory flowComponentFactory) {
        super(flowComponentFactory);
    }

    @Override
    public FlowContextBuilder builder() {
        return flowContextBuilder;
    }

    @Override
    public List<FlowComponent> flowComponents(String detectType) {
        return flowComponentFactory.getComponents(DetectEventFlowDefinitions.getNeedComponents(detectType));
    }
}

業務的事件處理流程定義和選擇


public enum DetectEventFlowDefinitions {

    bounce_detect_flow(BOUNCE_SHELL.getType(),
            Arrays.asList(DefaultNotifySender, DefaultBigDataSender, DefaultThreatInfoSender)),

    ;

    /** 入侵事件業務類型 */
    String detectType;

    /** 入侵事件類型 */
    BizEventTypeEnum bizEventTypeEnum;

    /** 入侵事件處理流程的全限定類名 */
    String eventClassName;

    /** 事件處理所需要的組件 */
    List<ComponentType> componentTypes;

    DetectEventFlowDefinitions(String detectType, List<ComponentType> componentTypes) {
        this(detectType, BizEventTypeEnum.CREATE, DefaultDetectEventFlower.class.getName(), componentTypes);
    }

    DetectEventFlowDefinitions(String detectType, String eventClassName, List<ComponentType> componentTypes) {
        this(detectType, BizEventTypeEnum.CREATE, eventClassName, componentTypes);
    }

    DetectEventFlowDefinitions(String detectType, BizEventTypeEnum bizEventTypeEnum, String eventClassName, List<ComponentType> componentTypes) {
        this.detectType = detectType;
        this.bizEventTypeEnum = bizEventTypeEnum;
        this.eventClassName = eventClassName;
        this.componentTypes = componentTypes;
    }

    private static Map<String, DetectEventFlowDefinitions> flowDefinitionsMap = new HashMap<>();
    private static Set<String> flows = new HashSet<>();

    static {
        for (DetectEventFlowDefinitions detectEventFlowDefinitions : DetectEventFlowDefinitions.values()) {
            flowDefinitionsMap.put(getKey(detectEventFlowDefinitions.detectType, detectEventFlowDefinitions.bizEventTypeEnum.getType()), detectEventFlowDefinitions);
            flows.add(detectEventFlowDefinitions.detectType);
        }
    }

    public static String getKey(String detectModelType, String eventType) {
        return detectModelType + "_" + eventType;
    }

    public static String getEventFlowClassName(String detectModelType, String eventType) {
        return flowDefinitionsMap.get(getKey(detectModelType, eventType)).eventClassName;
    }

    public static List<ComponentType> getNeedComponents(String detectType) {
        return flowDefinitionsMap.get(getKey(detectType, BizEventTypeEnum.CREATE.getType())).componentTypes;
    }

    public static List<ComponentType> getNeedComponents(String detectType, String bizEventType) {
        if (StringUtils.isBlank(bizEventType)) {
            bizEventType = BizEventTypeEnum.CREATE.getType();
        }
        return flowDefinitionsMap.get(getKey(detectType, bizEventType)).componentTypes;
    }
}


使用工廠模式來選擇和獲取具體的某個業務事件處理流程:


public abstract class AbstractEventFlowFactory {

    private final Logger logger = LoggerFactory.getLogger(AbstractComponentFactory.class);

    protected Map<String, EventFlow> eventFlowMap = new HashMap<>();

    private volatile boolean initialized = false;

    /*
     *  初始化事件處理流程Beans
     */
    protected void initBeans() {
        if (!initialized) {
            Map<String, EventFlow> eventFlowBeans = getEventFlowBeans();
            for (EventFlow eventFlow: eventFlowBeans.values()) {
                eventFlowMap.put(eventFlow.getClass().getName(), eventFlow);
            }
            logger.info("init-success: eventFlowMap: {}", eventFlowMap);
            initialized = true;
        }
    }

    /** 返回的事件處理流程 bean 映射 Map[BeanName,IEventFlowBean] */
    abstract public Map<String, EventFlow> getEventFlowBeans();

    /**
     * 根據指定的事件處理流程全限定性類名獲取指定的 Bean 實例
     */
    public EventFlow get(String eventFlowClassName) {
        return eventFlowMap.get(eventFlowClassName);
    }

    public EventFlow getEventFlow(String bizEventType, String detectType) {
        String eventFlowClassName = DetectEventFlowDefinitions.getEventFlowClassName(detectType, bizEventType);
        return eventFlowMap.get(eventFlowClassName);
    }
}

@Component
public class DetectEventFlowFactory extends AbstractEventFlowFactory implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @PostConstruct
    public void init() {
        initBeans();
    }

    @Override
    public Map<String, EventFlow> getEventFlowBeans() {
        return applicationContext.getBeansOfType(EventFlow.class);
    }
}

組件工廠與定位

根據組件意圖和組件業務類型,使用工廠模式來選擇和獲取某個具體的組件:


/**
 * 可定制的組件工廠類
 */
public abstract class AbstractComponentFactory {

    private static Logger logger = LoggerFactory.getLogger(AbstractComponentFactory.class);

    Map<String, FlowComponent> componentBizMap = new HashMap<>();
    Map<String, FlowComponent> componentClassMap = new HashMap<>();

    private volatile boolean initialized = false;

    /*
     * 初始化組件Beans
     */
    protected void initBeans() {
        if (!initialized) {
            Map<String, FlowComponent> beansOfType = getComponentBeans();
            beansOfType.values().forEach(
                    component -> {
                        ComponentProperties annotations = component.getClass().getAnnotation(ComponentProperties.class);
                        if (annotations != null) {
                            componentBizMap.put(ComponentType.getKey(annotations.purpose(), annotations.biz()), component);
                        }
                        componentClassMap.put(component.getClass().getName(), component);
                    }
            );
            logger.info("init-success: ComponentBizMap: {}", componentBizMap);
            logger.info("init-success: componentClassMap: {}", componentClassMap);
            initialized = true;
        }
    }

    /**
     *  返回所有對應的組件實例映射 Map[BeanName, ComponentBean]
     */
    abstract public Map<String, FlowComponent> getComponentBeans();

    /**
     * 根據指定的組件全限定性類名來獲取對應的組件實例
     * @param qualifiedClassName 組件全限定性類名
     * @return 組件實例
     */
    public FlowComponent getComponent(String qualifiedClassName) {
        return componentClassMap.get(qualifiedClassName);
    }

    /**
     * 根據指定的組件意圖和業務類型來獲取對應的組件實例
     * @param purpose 組件意圖標識
     * @param biz 組件業務類型
     * @return 組件實例
     */
    public FlowComponent getComponent(String purpose, String biz) {
        return componentBizMap.get(ComponentType.getKey(purpose, biz));
    }

    /**
     * 根據指定的組件類型來獲取對應的組件實例
     * @param componentType 組件類型
     * @return 組件實例
     */
    public FlowComponent getComponent(ComponentType componentType) {
        return componentBizMap.get(ComponentType.getKey(componentType.getPurpose(), componentType.getBiz()));
    }

    /**
     * 根據組件類型集合批量獲取對應的組件實例集合
     * @param componentTypeList 組件類型集合
     * @return 組件實例集合
     */
    public List<FlowComponent> getComponents(List<ComponentType> componentTypeList) {
        return StreamUtil.map(componentTypeList, com -> getComponent(com.getPurpose(), com.getBiz()));
    }
}

@Component
public class FlowComponentFactory extends AbstractComponentFactory implements ApplicationContextAware {

    private static Log log = LogFactory.getLog(FlowComponentFactory.class);

    private ApplicationContext applicationContext;

    private volatile boolean initialized = false;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @PostConstruct
    public void init() {
        initBeans();
    }

    @Override
    public Map<String, FlowComponent> getComponentBeans() {
        return applicationContext.getBeansOfType(FlowComponent.class);
    }
}

組件執行器


@Component
public class ComponentExecutor {

    @Resource
    private FlowComponentFactory flowComponentFactory;

    // 如果要支持並發,這里可以有一個組件執行線程池

    /**
     * 執行指定流程組件集,若任一組件要結束流程,則流程結束
     */
    public <I extends AbstractFlowContext> FlowResult execMayExit(I flowContext, List<FlowComponent> flowComponents) {
        for (FlowComponent flowComp: flowComponents) {
            if (!flowComp.needAccess(flowContext)) {
                continue;
            }
            FlowResult flowResult = flowComp.process(flowContext);
            if (flowResult.getFlowDecision() == null || FlowDecision.Termination.equals(flowResult.getFlowDecision())) {
                return FlowResultHelper.terminateResult();
            }
        }
        return FlowResultHelper.continueResult();
    }

    /**
     * 單個普通業務組件執行
     */
    public <I extends AbstractFlowContext> FlowResult exec(I param, FlowComponent<I> c) {
        if (c.needAccess(param)) {
            return c.process(param);
        }
        return FlowResultHelper.continueResult();
    }

    public <I extends AbstractFlowContext> FlowResult exec(I context, String compClassName) {
        FlowComponent c = flowComponentFactory.getComponent(compClassName);
        return exec(context, c);
    }

    /**
     * 任意多個流程組件執行(提供更靈活的組件執行能力)
     * 任一組件不影響后續組件的執行(拋出異常除外)
     *
     * NOTE: 這些業務組件的參數是一樣的
     */
    public <I extends AbstractFlowContext> FlowResult execBatch(I context, List<FlowComponent> components) {
        for (FlowComponent c: components) {
            if (c.needAccess(context)) {
                c.process(context);
            }
        }
        return FlowResultHelper.continueResult();
    }

}


設計說明

為什么要用泛型聲明 E extends EventData 而不是直接用 EventData

使用泛型,則不同的業務可以定義不同的 EventData , 覆寫方法時,就可以用具體的 EventData 子類,而不是用 EventData (需要做類型轉換)。 用泛型之后,可以處理不同類型的事件數據。


組件參數傳參

有兩種方式:

  • 像 linux 管道那樣, 上一個組件的輸入作為下一個組件的輸出, 這樣縮小了變量和對象的作用域,能夠及時回收,但是設計需要更加精巧,開發理解也不大容易;

  • 使用一個 Context 或者 ThreadLocal ,簡單粗暴,最大的缺點是這個類容易膨脹臃腫。 目前是先實現第二種。如果要防止膨脹,有一種辦法,就是把一組相互有關聯的變量組織成一個參數對象,通過參數對象去安全地獲取。這樣略有麻煩,但能更清楚地看到組件依賴的變量,要移除也更加容易一點。

提前結束流程

在事件處理流程的任何一步,根據新獲得的信息,都可能做出判斷,提前結束流程。每個組件執行之后,無論是否出錯,既有可能需要結束流程,也有可能需要繼續流程。此時,可以讓 process 返回一個枚舉值 FlowDecision ,這個枚舉值來告訴主控流程,到底是結束還是繼續。


更完善的組件流程

更完善的組件流程應當是:

  • 一個業務流程通過若干的流程 STEP 來實現;流程 STEP 之間是有順序的;

  • 每個流程 STEP 內可能包含多個組件, 這些組件可能是相互可替換的;

  • 業務編排系統應當對流程 STEP 和 特定 STEP 里的組件類型進行必要的檢查。


組件執行順序

組件執行順序既可以用有序列表來表達,也可以用有序鏈表來表達。采用列表的方式,則流程的主控在 IEventFlow 的實現類里,采用鏈表方式表達,則流程的主控在組件里。

多個組件還可以組合成一個組件塊一起執行(組合模式);也可能在某種條件下才去執行某個組件或在組件內執行組件塊。可以在 FlowComponent 里添加執行多個組件的默認執行方法 exec(FlowComponent... flowComponents)。 flowComponents 可以從組件工廠里通過組件的全限定性類名來拿到。

此外,互不依賴互不影響的不同組件塊可以並發執行。 目前業務沒有這種需求,限於時間因素,暫未實現。

事件處理流程配置

事件處理流程的組件配置目前是采用枚舉方式,寫死在枚舉類 DetectAgentEventFlowDefinitions 【前置部分】和 DetectBizEventFlowDefinitions 【后置部分】里了 。采用枚舉方式直觀可靠(寫死在代碼里),但是不夠靈活。如果組件集需要靈活執行,比如 A, B, C, D, E 五個組件,執行順序是: A, (B,C 並發), C, (D,E 並發),就難以支持了。

可以將組件執行順序(即事件處理流程配置)改為 JSON 或 YML 形式,更加靈活。只要編寫一個組件執行器(支持串行或並發地執行組件集),就可以支持。目前入侵事件處理流程暫不需要這樣的功能,限於時間因素,因此暫未實現,但可以留下設計空間。

采用枚舉方式的事件處理流程配置,可以作為降級兜底策略。


可嵌入

如果要從全局來改造原有流程,可能重構成本比較大。比如 AcceptFlow 部分,從全局來改造,需要從 AgentSensor 開始。實際上 AgentSensor 和 Handler 的邏輯並不多,可以從 AcceptFlow 部分嵌入這個事件處理流程的業務編排,只改動 flow.execute 的那部分,能夠降低重構成本(難以重構的或者暫時沒精力重構的暫時保持現狀不影響),同時對於確實有模板特征的事件處理流程能夠利用上業務編排的優勢。這要求,事件處理流程的業務編排框架是小型的可嵌入式的,能夠靈活地使用上。

小結

本文主要給出了一個事件處理流程的簡易業務編排框架,適用於具有相似的模板流程的事件處理。

設計一個小型框架,能夠處理一類相似業務,更能體現性價比。比如做入侵檢測業務,最開始打算將入侵業務都組件化,但這樣重構成本非常高;因此,后面將目標縮小為:做一個小型業務編排系統,能夠將具有相似性處理的入侵事件處理流程組件化和可編排化。做設計,不能一開始就想着弄一個大而全的精巧的框架。制定合理的設計目標,更有利於達成。

此外,設計的一大目標是使團隊開發成員自然地遵循設計約束。初始設計既重要也不能看得太重。如果缺乏初始設計,整個系統很快就會腐化成一坨坨無章法的邏輯堆砌;而初始設計太精巧,隨着更多的成員加入,團隊成員不一定能很好地遵循初始設計約束,系統也會逐漸地腐化。

有一點經驗小結下:

  • 設計和實現組件的基本能力,首先要有一個基本的概念設計和接口設計(概念定義及概念關聯,概念對應的接口及交互);

  • 以一個典型的業務場景為例,驗證事件處理的組件編排框架的可行性;

  • 以一個復雜的業務場景為例,驗證事件處理的組件編排框架的完備性;

  • 完善組件編排框架的配置能力;

  • 多多聽取團隊成員的建議並改進,比如成亮提出包結構重新組織下,對做共享庫就很有幫助;

  • 確立合理的設計目標和設計范圍,盡可能達到好的ROI。


做共享庫的一點經驗:

  • 明確和聚焦事件處理的組件編排框架的核心能力: 組件及組件序列的配置、定位、執行;

  • 共享庫里的類盡量做成抽象的可定制的,減少具體實現的工作量(Javadoc中可以有示例注釋);

  • 共享庫里的代碼注釋要完善,代碼要簡潔,單測要完善;

  • 去掉業務相關的,勿以業務相關的來充實共享庫。




免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM