spring集成guava的event bus


Guava的event bus

guava, https://github.com/google/guava 是一個非常有名的Java類庫,提供了很多在日常開發中常用的集合、函數接口等。此外,guava還提供了一個模塊叫做event bus,生產者往event bus上投遞消息,event bus負責回調訂閱了此類消息的回調函數,實現了消息生產者和消費者之間的解耦和異步處理。以下是一個簡單的例子:

public class SimpleListener {
    @Subscribe
    public void task(String s) {
        System.out.println("do task(" + s + ")");
    }
}
public class SimpleEventBusExample {
    public static void main(String[] args) {
        EventBus eventBus = new EventBus();
        eventBus.register(new SimpleListener());
        System.out.println("Post Simple EventBus Example");
        eventBus.post("Simple EventBus Example");
    }
}

output

Post Simple EventBus Example
do task(Simple EventBus Example)

event bus集成到spring中

在之前的例子和guava的官方文檔里面可以看到,guava的event bus使用方式如下

1. 聲明一個event bus對象(線程安全,所以可以做到全局唯一,而且訂閱者和發布者必須共享這個event bus對象)

2. 對於訂閱者,支持 @Subscribe,定義處理消息的回調函數。

3. 對每一個訂閱者,需要調用event bus的register方法,才能收到消息訂閱。

對於1 和 2,都比較好。但是對於第3步來說,則有一點困難。因為

1. 在spring中,通常的你的訂閱者還會依賴其他的spring管理的bean,於是你的訂閱者也會被納入到spring的生命周期的管理中來,這樣如果用new的方式來初始化一個訂閱者,顯得非常的"不spring"。

2. 對於每個訂閱者,都要顯式的注冊到event bus里面,這樣並沒有做到關注點分離。理想的情況下,訂閱者是不應該去關注如何注冊到event bus中。它只應該申明處理消息的回調函數,以及該回調函數是否能夠並發調用。注冊到event bus中這件事,對於訂閱者應該是被動且自動的(只需要申明自己是否想注冊到event bus,而不需要關心細節)。這一點在多人開發,並且項目人員水平參差不齊的時候,尤其重要。

那么如何做到自動注冊呢?其實答案很簡單,在spring中,ApplicationContext這個類提供了一系列的方法去獲取到當前spring context中的bean,只需要在event bus初始化之后,通過ApplicationContext來獲取當前有哪些訂閱者,並且主動的去注冊就行。由於,guava的實現中,並沒有要求訂閱者實現某個接口,而是用注解的方式來聲明回調函數的,則這篇文章中的實現,也不需要訂閱者去實現某個接口,而是用注解的方式來申明自己是一個訂閱者。代碼如下

先聲明一個注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface EventSubscriber {}

對於一個訂閱者,在類的接口上,加上這個注解,並且確定這個bean會在納入spring的生命周期管理中。

@EventSubscriber
public class SimpleSubscriber implements HiDasSubscriber {
@Autowired Somebean somebean; @Subscribe @AllowConcurrentEvents
public Integer logEventToDbAndUpdateStatus(String event) { System.out.println("receive a event:"+event); } }

聲明一個event bus的服務,並且在初始化之后,通過ApplicationContext的 getBeansWithAnnotation 的方法把所有的訂閱者獲取,並且注冊到event bus中。

@Service
public class EventBusService implements InitializingBean{

    private EventBus innerBus;

    @Inject
    private ApplicationContext appContext;

    public void unRegister(Object eventListener){
        innerBus.unregister(eventListener);
    }

    public void postEvent(String event){
        innerBus.post(event);
    }

    public void register(Object eventListener){
        innerBus.register(eventListener);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        innerBus = new AsyncEventBus("Hidas-event-bus", Executors.newCachedThreadPool());
        appContext.getBeansWithAnnotation(EventSubscriber.class).forEach((name, bean) -> {
            innerBus.register(bean);
        });
    }
}

完成了這兩步之后。如果其他的消息生產者要往event bus上發消息,只需要注入這個event bus service,並且調用其post方法就好了。

注:這個只是例子,在實際項目中,對所有的消息都會聲明一個基類或者接口,每個訂閱者對只會處理消息的某個具體實現。這樣event bus會根據消息的具體類型,來調用真正關注此類消息的訂閱者的回調函數。這樣比起讓所有消息訂閱者去實現一個 onEvent(BaseEvent event)的方法, 實際上是避免了一個多路分配的問題。

總結

對於一些類庫在spring中使用,這種方法實際上是一種通用的模式,實現某個接口或者編寫某個注解,然后通過ApplicationContext來獲取對應的bean,之后進行某些注冊或者組裝操作。這樣的話,可以讓業務的代碼,和框架的代碼做到一定程度的關注點分離。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM