使用Guava EventBus構建publish/subscribe系統


Google的Guava類庫提供了EventBus,用於提供一套組件內publish/subscribe的解決方案.事件總線EventBus,用於管理事件的注冊和分發。在系統中,Subscribers會向EventBus注冊自己感興趣的事件,而publishers會將自己產生的事件發布給EventBus。在系統中,EventBus事件分發默認設置為串行的(可設置),我們在Subscribers中的事件處理速度要快,不要阻塞當前的事件紛發進程。

創建EventBus實例

EventBus提供兩個構造函數,可用於創建Evnet實例,如下所示。

    @Test
    public void should_create_event_bus_instance() throws Exception {
        EventBus eventBus = new EventBus();
        //string構造參數,用於標識EventBus
        EventBus eventBus1 = new EventBus("My Event Bus");
    }

Subscribe事件

  1. Subsciber對象需要定義handler method,用於接受並處理一個通知事件對象
  2. 使用Subscribe標簽標識事件handler method
  3. Subscriber向EvenetBus注冊,通過EventBus.register方法進行注冊

Post事件

post一個事件很簡單,只需要調用EventBus.post方法即可以實現。EventBus會調用Subscriber的handler method處理事件對象。

定義handler Method

方法接受一個事件類型對象,當publisher發布一個事件,eventbus會串行的處理event-handling method, 所以我們需要讓event-handing method處理的速度快一些,通常我們可以通過多線程手段來解決延遲的問題。

Concurrency

EventBus可以通過使用AllowConcurrentEvent注解來實現並發調用handle method。當handler method被標記為AllowConcurrentEvent(replace Subscribe標簽),我們認為handler Method是線程安全的。

Code Sample

例子中,我們使用cookie店為例,為了簡單起見,系統中只定義了五個對象:

  1. EmptyEvent對象:用於表明CookieContaier cookie數量為0
  2. CookieContaier對象:用於Cookie的存儲,當cookie數量為0時,會發布EmptyEvent事件
  3. CookieSeller: EmptyEvent事件訂閱者
  4. CookieMailBoss:EmptyEvent事件訂閱者
  5. HandlerService: 定義handler-method接口,使用@Subscribe標注
public interface HandlerService {
    @Subscribe
    void handler(EmptyEvent emptyEvent);
}
public class CookieSeller implements HandlerService {

    public CookieSeller(EventBus eventBus) {
        eventBus.register(this);
    }

    public void handler(EmptyEvent emptyEvent) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getClass().getName() + ":" + "receiving empty event");
    }
}
public class CookieMallBoss implements HandlerService {

    public CookieMallBoss(EventBus eventBus) {
        eventBus.register(this);
    }

    public void handler(EmptyEvent emptyEvent) {
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(this.getClass().getName() + ":" + "receiving empty event");
    }
}

getCookie函數,會計算事件觸發publish時間


public class CookieContainer {
    private EventBus eventBus;
    private AtomicInteger numberOfCookie = new AtomicInteger();

    public CookieContainer(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    public void setNumberOfCookie(int intenger) {
        numberOfCookie.set(intenger);
    }

    public void getACookie() {
        if (numberOfCookie.get() == 0) {
            long start = System.currentTimeMillis();
            eventBus.post(new EmptyEvent());
            System.out.println("Publishing event time: " + (System.currentTimeMillis() - start) + " ms");
            return;
        }
        numberOfCookie.decrementAndGet();
        System.out.println("retrieve a cookie");
    }
}

public class EmptyEvent {

}

Code Test Case

設置cookie數量為三,當第四次取cookie會觸發empty事件,EventBus會串行的發布事件。


    @Test
    public void should_recv_event_message() throws Exception {
        EventBus eventBus = new EventBus();
        CookieContainer cookieContainer=new CookieContainer(eventBus);
        HandlerService cookieSeller = new CookieSeller(eventBus);
        HandlerService cookieMallBoss = new CookieMallBoss(eventBus);

        //設置cookie的數量為3
        cookieContainer.setNumberOfCookie(3);
        //用戶取三次之后cookie數量為空
        cookieContainer.getACookie();
        cookieContainer.getACookie();
        cookieContainer.getACookie();
        System.out.println("=======再次取cookie, 觸發Empty事件發布============");
        cookieContainer.getACookie();
    }
    

測試結果如下所示,當第四次getCookie時,觸發EmptyEvent事件發布。耗時為4013ms

retrieve a cookie
retrieve a cookie
retrieve a cookie
=======觸發事件發布============
com.mj.ele.guava.CookieMallBoss:receiving empty event
com.mj.ele.guava.CookieSeller:receiving empty event
Publishing event time: 4013 ms

使用AllowConcurrentEvents標簽取Subscribe,設置cookie數量為三,當第四次取cookie會觸發empty事件,EventBus會並行的發布事件。

修改Handler method接口,標記為@AllowConcurrentEvents

public interface HandlerService {
    @AllowConcurrentEvents
    void handler(EmptyEvent emptyEvent);
}

測試代碼和串行代碼一致,測試結果如下所示,事件發布耗時只需1ms

retrieve a cookie
retrieve a cookie
retrieve a cookie
=======觸發事件發布============
Publishing event time: 1 ms

Conclusion

本文講解了如何使用Guava的EventBus來構建publish/subscribe系統,分別給出了串行和並行發布的使用方法,希望能夠給讀者帶來一些幫助和啟發。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM