Event Sourcing的落地與意義


jsoncat:https://github.com/Snailclimb/jsoncat (仿 Spring Boot 但不同於 Spring Boot 的一個輕量級的 HTTP 框架)

高內聚低耦合一直是程序設計提倡的方式,但是很多人在實際項目中一直再使用面向過程的編碼,導致代碼臃腫不堪,項目維護難度日益增加,在我接觸的初中高級程序員中,很多人曾問我如何從設計階段初期盡量避免日后帶來維護難度,今天就從Event Soucing(事件溯源)模式聊聊如何優化我們的業務系統。

枯燥的理論知識避不可免,下面我盡量以代碼形式去演示事件驅動給在我們業務編程中帶來的好處。

什么是Event Sourcing ? 簡單來說,大家應該都知道mysql 數據同步中的binlog模式,我們在執行一條查詢語句 select * from Person limit 1 時看到的數據可以理解為當前時間的快照,眼前的這條數據,可能經歷過若干update語句之后才得到的結果,事件溯源亦如此,如果我們把某一行數據 看做Person對象,一個對象從開始到消亡會經歷過很多事件(update語句),如果我們要還原某個時間點的對象,只需依據的產生日期,按照順序在初始化對象上依次疊加上去,就能還原這一時期的對象了,舉個例子一個person(張三)對象

Person zs = new Person(); 張三出生了

6歲 👦 學生

25歲 👮 警察

60歲 👴 退休老人

雖然都是張三對象,但是不同時間段里張三的身份截然不同,如果我們要獲取警察時代的zs,我們用初始得到的zs依次累加上學生時代,警察時代就可以得到這一時代的zs對象了。

由此來看,對象好像顯得已經不那么重要,事件溯源更加具有意義,因為它完整描述了這個對象從出生到消亡的全過程,也可以看為不斷在改變對象的狀態,事件是只會增加不會修改,對於現如今大數據時代,事件的產生對於數據挖掘、數據分析更具有意義。

扯了這么多,還是要以代碼來實際說說事件驅動帶來的好處,先看一處經典的代碼

StockService.java

@Service
@AllArgsConstructor
public class StockService extends BaseMapper<Product> {
    //京東服務
    private final JdService jdService;
    //淘寶服務
    private final TaobaoService productService;
    //有贊服務
    private final YouzanService youzanService;
    //拼多多服務
    private final pddService pddService;
    //更多服務
    ...

    //設置商品庫存
    @Override
    public void changeProductStock(ChangeProductStockInputDTO inputDTO) {
        if(inputDTO.getStock<0){
          throw new BusinessException("庫存不能小於0");
        }
        Product product = baseMapper.getById(inputDTO.getId());
        product.setStock(inputDTO.getStock());
        baseMapper.updateById(product);
        //通知京東
        jdService.notify();
        //通知淘寶
        productService.notify();
        //通知有贊
        youzanService.notify();
        //更多需要執行的業務...
    }
}

Product.java

@Data
public class Product {
    //id
    private String id;
    
    //庫存
    private BigDecimal stock;
    
    //...
}

例如比如在電商系統中,在我們自己的商品后台中修改商品庫存后,我們要依次告知在其他第三方平台這個商品庫存信息,我相信很多同學都會這樣寫的吧,這樣的代碼確實可以完成我們的業務功能,但隨着業務功能的復雜度提升,加上我們面向過程的編碼模式,一定會越加復雜,曾看到有將近5000多行的一個訂單類,相信不管誰看見這樣的類都會頭大,接下來我們就要想辦法優化它,安排!

首先存在這樣的代碼是因為沒有划清邊界,沒有保持一個領域中的純粹性,從StockService中注入大量的服務類與標志性的貧血模型Product對象就能看出,既然我們提倡以高內聚低耦合去編寫代碼,那首先去修改我們的Product吧,讓它變得豐富起來。

改變的
Product.java

@Data
public class Product {
    
    public void changeStock(BigDecimal stock){
      if(delStatus == 1){
        throw new BusinessException("商品信息不存在");
      }
      if(stock < 0){
        throw new BusinessException("庫存不能小於0");
      }
      this.stock = stock;
      EventBus.instance().register(new ChangedProductStockDomainEvent(this));
    }
    
    //id
    private String id;
    
    //庫存
    private BigDecimal stock;
    
    //刪除狀態
    private int delStatus;
    
    //...
}

//名字盡量起得生動一些,單詞語法的過去式,現在進行時都具有意義
@Getter
@AllArgsConstructor
public class ChangedProductStockDomainEvent {
    
   private Product product;
}

更改的 StockService.java

@Service
@AllArgsConstructor
public class StockService extends BaseMapper<Product> {

    //設置商品庫存
    @Override
    public void changeProductStock(ChangeProductStockInputDTO inputDTO) {
        Product product = baseMapper.getById(inputDTO.getId());
        product.setProductStock(inputDTO.getProductStock());
    }
}

更改過后的代碼是不是看起來清爽了很多,加上我們賦予了Product對象方法之后,職責看起來就更加明確,充血模型體現出聚合內單一的行為,在Product中我們只描述了此領域范圍的職能,已經充分體現了高內聚低耦合的思想,不參合其他業務邏輯。這時可能有的同學會問那怎么持久化到數據庫呢?在我工作的這些年里,遇到很多程序員,不論初中高級程序員都習慣了先建立數據庫,再去建立模型,但是我們要改變傳統思維,我們寫代碼是面向對象,面向對象,面向對象(重要的事情說三遍),不是面向數據或者過程,在剝離了數據后,其實我們真正就做到了數據與業務代碼的剝離,下面我在說這樣具體的好處。

細心的同學看到我在Product的changeStock方法里,在執行完一些邏輯判斷后,設置完商品庫存后,我們在EventBus 事件總線中注冊了一個事件,這個事件還沒有具體的作用,我們看看EventBus的實現

StockService.java


public class EventBus {

    public static EventBus instance() {
        return new EventBus();
    }

    private static final ThreadLocal<List<DomainEvent>> domainEvents = new ThreadLocal<>();

    public void init() {
        if (domainEvents.get() == null) {
            domainEvents.set(new ArrayList<>());
        }
    }

    public EventBus register(DomainEvent domainEvent) {
        List<DomainEvent> domainEventList = domainEvents.get();
        if (domainEventList == null)
            throw new IllegalArgumentException("domainEventList not init");
        domainEventList.add(domainEvent);
        return this;
    }

    /**
     * 獲取領域事件
     *
     * @return
     */
    public List<DomainEvent> getDomainEvent() {
        return domainEvents.get();
    }


    /**
     * 請空領域事件集合
     */
    public void reset() {
        domainEvents.set(null);
    }
}

在當前線程內內存空間我們吧事件塞了進去,目前只有存儲作用,接下來我們要定義它的處理者

DomainEventProcessor.java

@Aspect
@Component
@Slf4j
public class DomainEventProcessor {

    /**
     * 這里我是我對RocketMq的封裝
     */
    @Autowired
    private EventPublisherExecutor processPublisherExecutor;

    /**
     * 當前上下文內訂閱者
     */
    @Autowired
    protected ApplicationContext applicationContext;

    private static ThreadLocal<AtomicInteger> counter = new ThreadLocal<>();

    @Pointcut("within(com.github.tom.*.application..*)")
    public void aopRule() {

    }

    /**
     * 為當前線程初始化EventBus
     */
    @Before("aopRule()")
    public void initEventBus(JoinPoint joinPoint) {
        log.debug("初始化領域事件槽");
        log.debug("切入切點信息:" + joinPoint.getSignature().toString());
        EventBus.instance().init();
        if (counter.get() == null) {
            counter.set(new AtomicInteger(0));
        }
        counter.get().incrementAndGet();
    }

    /**
     * 發布領域事件
     */
    @AfterReturning("aopRule()")
    public void publish() {
        int count = counter.get().decrementAndGet();
        if (count == 0) {
            try {
                List<DomainEvent> domainEventList = EventBus.instance().getDomainEvent();
                if (domainEventList != null && domainEventList.size() > 0) {
                    //進程內事件
                    domainEventList.forEach(domainEvent -> applicationContext.publishEvent(domainEvent));
                    //進程外事件
                    domainEventList.forEach(domainEvent -> processPublisherExecutor.publish(domainEvent));
                }
            } finally {
                EventBus.instance().reset();
                counter.set(null);
            }
        }
    }

    @AfterThrowing(throwing = "ex", pointcut = "aopRule()")
    public void exception(Throwable ex) {
        log.error(ex.getMessage(), ex);
        EventBus.instance().reset();
        //釋放計數器
        counter.set(null);
    }
}

這里借助了AOP功能,在AOP內我對service進行攔截,在執行方法攔截的出口時,查找當前線程內的EventBus中看是否有存在的領域事件,接下來把事件發送出去,事件的響應分為進程內和進程外(多微服務),剛才的同學問的如何持久化到DB這里可以看到答案

@Slf4j
public abstract class AbstractEventHandler<T extends EventData> implements SmartApplicationListener {

    private Class<?> clazzType;

    public AbstractEventHandler(Class<? extends ApplicationEvent> clazzType) {
        this.clazzType = clazzType;
    }

    @Override
    public boolean supportsEventType(Class<? extends ApplicationEvent> clazzType) {
        return clazzType == this.clazzType;
    }

    @Override
    public boolean supportsSourceType(Class<?> clazz) {
        return true;
    }

    @Override
    public void onApplicationEvent(ApplicationEvent applicationEvent) {
        onApplicationEventHandler((T) applicationEvent);
    }

    protected abstract void onApplicationEventHandler(T eventData);
}


@Slf4j
public abstract class AbstractPersistenceEventHandler<T extends EventData> extends AbstractEventHandler<T> {

    public AbstractPersistenceEventHandler(Class<? extends ApplicationEvent> clazzType) {
        super(clazzType);
    }

    @Override
    public int getOrder() {
        return 0;
    }

}

@Component
public class ChangeProductStockPersistenceEventHandler
        extends AbstractPersistenceEventHandler<ChangedProductStockDomainEvent> {

    @Autowired
    private ProductRepository productRepository;

    public CreatedPortalArticlePersistenceEventHandler() {
        super(CreatedPortalArticleDomainEvent.class);
    }

    @Override
    protected void onApplicationEventHandler(ChangedProductStockDomainEvent eventData) {
        if (portalArticleRepository.updateById(eventData.getProduct()) <= 0) {
            throw new BusinessException("數據操作錯誤");
        }
    }
}

在響應事件的其中一個訂閱者,可以完成數據庫的持久化操作。接下來我們去定義各個響應ChangedProductStockDomainEvent事件的訂閱者就行,例如京東服務

@Component
public class JdStockEventHandler {

    @Autowired
    private JdAppService jdAppService;

    /**
     * 庫存持久化事件
     *
     * @param eventData
     */
    @StreamListener(value = "product-channel")
    public void receive(@Payload ChangedProductStockDomainEvent eventData) {
        jdAppService.changingInventory(eventData);
    }
}

事件驅動的模型大大降低了業務模塊耦合嚴重,在每個聚合的領域內,我們應該着重自身聚合的業務邏輯,事件的消費我們可以通過廣播通知和最終一致性來達成目的。業務代碼的純粹,也更適合TDD只對業務編寫測試代碼,例如我在編寫設置庫存的測試方法時,我只要構造好商品對象,就可以按照測試用例編寫不同情況下的測試代碼了。

@Component
public class ProductStockTest {

    @Before
    public void setUp() {
      EventBus.instance().init();
    }
    
    @Test
    public void testChangeStockError() {
        Product product = new Product();
        product.setStock(BigDecimal.valueOf("-1"));
        product.changeStock();
    }

   @Test
    public void testChangeStockSuccess() {
        Product product = new Product();
        product.setStock(BigDecimal.valueOf("2"));
        product.changeStock();
        assertThat(product.getStock()).isEqualTo("2");
    }

}

好了今天的介紹就先這么多,后面我會介紹如何讓三層架構中的Service層升級,變得充滿業務味道(領域服務)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM