Java事件總線


    在平時寫代碼的過程中,我們需要實現這樣一種功能:當執行某個邏輯時,希望能夠進行其他邏輯的處理。最粗暴的方法是直接依賴其他模塊,調用該模塊的相應函數或者方法。但是,這樣做帶來一些問題。

  • 模塊間相互依賴,耦合度高。以下訂單為例,訂單提交后需要進行支付以及進行一些其他處理,如發郵件等操作。相關的代碼可能是這樣。可以看到:訂單模塊依賴了支付服務以及用戶服務。
  • 維護困難。由於模塊間相互依賴,當需要修改訂單邏輯時則需要修改submitOrder方法的源代碼,而某些時候可能無法修改。再者,如果有多個這種邏輯,修改時可能涉及到多處操作。
public class OrderPage {
 
        private PaymentService paymentService;
        private UserService userService;
         
        public void submitOrder() {
                Integer userId = 1;
                BigDecimal amount = BigDecimal.TEN;
                 
                paymentService.doPayment(userId, amount);
                userService.registerPayment(userId, amount);
        }
}
public class PaymentService {
 
        private MailService mailService;
         
        public void doPayment(Integer userId, BigDecimal amount) {
                //Do payment...
                mailService.sendPaymentEmail(userId, amount);
        }
}
public class UserService {
 
        public String getEmailAddress(Integer userId) {
                return "foo@bar.com";
        }
         
        public void registerPayment(Integer userId, BigDecimal amount) {
                //Register payment in database...
        }
}
 
public class MailService {
 
        private UserService userService;
         
        public void sendPaymentEmail(Integer userId, BigDecimal amount) {
                String emailAddress = userService.getEmailAddress(userId);
                //Send email...
        }
}

 一、觀察者模式

   有時被稱作發布/訂閱模式,觀察者模式定義了一種一對多的依賴關系,讓多個觀察者對象同時監聽某一個主題對象。這個主題對象在狀態發生變化時,會通知所有觀察者對象,使它們能夠自動更新自己。

  通過觀察者模式來進行解耦,當對象發生變化時,通知其觀察者,由觀察者進行相應的處理。體現在訂單邏輯中時即為,定義多個觀察者觀察下訂單這個主題,當下訂單的動作發生時,通知其所有觀察者。再由每個觀察者進行處理。依據觀察者模式的實現,以上邏輯可改為如下代碼:

interface OrderListener {

	public void onSubmitOrder(Integer userId, BigDecimal amount);
}


public class OrderPage {

	private List<OrderListener> orderListeners=new ArrayList<OrderListener>();

	public void submitOrder() {
		Integer userId = 1;
		BigDecimal amount = BigDecimal.TEN;

		for (OrderListener orderListener : orderListeners) {
			orderListener.onSubmitOrder(userId, amount);
		}
	}
	
	public void addOrderListener(OrderListener orderListener){
		this.orderListeners.add(orderListener);
	}
}

class PaymentService implements OrderListener {

	private MailService mailService;

	public void doPayment(Integer userId, BigDecimal amount) {
		// Do payment...
		mailService.sendPaymentEmail(userId, amount);
	}

	@Override
	public void onSubmitOrder(Integer userId, BigDecimal amount) {

		doPayment(userId, amount);
	}
}

class UserService implements OrderListener {

	public String getEmailAddress(Integer userId) {
		return "foo@bar.com";
	}

	public void registerPayment(Integer userId, BigDecimal amount) {
		// Register payment in database...
	}

	@Override
	public void onSubmitOrder(Integer userId, BigDecimal amount) {

		registerPayment(userId, amount);

	}
}

class MailService {

	private UserService userService;

	public void sendPaymentEmail(Integer userId, BigDecimal amount) {
		String emailAddress = userService.getEmailAddress(userId);
		// Send email...
	}
}

   可以看到,首先定義了OrderListener接口,接口中有一個onSubmitOrder方法。原始的實現中的PayService和UserService實現了該接口。OrderPage中維護了一個OrderListener列表,當提交訂單時調用所有監聽者的onSubmitOrder方法。可以看到此實現的訂單邏輯沒有直接依賴付款模塊和用戶模塊。 主程序通過添加監聽器來使其得到通知

public static void main(String[] args) {
    	PaymentService paymentService=new PaymentService();
    	UserService userService=new UserService();
    	
    	OrderPage orderPage=new OrderPage();
    	
    	orderPage.addOrderListener(paymentService);
    	orderPage.addOrderListener(userService);
	}

 二、Guava EventBus(監聽者模式的優雅實現)

  雖然監聽者模式對源代碼進行了解耦,但是還是有一些不足。

  • 相關模塊需要實現相應接口;
  • 需要主動調用相關的addListener方法設置監聽器。
  • 一個監聽器智能監聽一種操作.

  EventBus是Guava對於監聽者模式的實現,其使用非常簡單。使用EventBus來實現監聽者模式,只需要三步操作。

  1. 通過注解@Subscribe來聲明事件回調方法;
  2. 調用EventBus的register方法來注冊監聽器;
  3. 通過post方法來觸發事件;

 訂單邏輯通過EventBus事件總線來實現,大概是以下這個樣子:

public class OrderPage {

	public static EventBus eventBus = new EventBus();

	public void submitOrder() {
		Integer userId = 1;
		BigDecimal amount = BigDecimal.TEN;

		eventBus.post(new PayEvent(userId, amount));
	}

}

class PaymentService {

	private MailService mailService;

	@Subscribe
	public void doPayment(PayEvent  payEvent) {
		// Do payment...
		mailService.sendPaymentEmail(payEvent.getUserId(), payEvent.getAmount());
	}

}

class UserService {

	public String getEmailAddress(Integer userId) {
		return "foo@bar.com";
	}

	@Subscribe
	public void registerPayment(PayEvent payEvent) {
		// Register payment in database...
	}
}

class PayEvent {

	private Integer userId;
	private BigDecimal amount;
	
	public PayEvent(Integer userId, BigDecimal amount) {
	}
	
	public Integer getUserId() {
		return userId;
	}
	public BigDecimal getAmount() {
		return amount;
	}
}

 public static void main(String[] args) {
    	PaymentService paymentService=new PaymentService();
    	UserService userService=new UserService();
    	
    	OrderPage orderPage=new OrderPage();
    	
    	orderPage.eventBus.register(paymentService);
    	orderPage.eventBus.register(userService);
	}

  要實現監聽者模式,時需要調用eventBus的register方法進行注冊,在需要處理事件的方法上使用@Subscribe注解。最后通過eventBus發布事件即可。使用事件總線,不需要定義特定的接口,不需要主動添加監聽器;

三、事件訂閱

  EventBus通過register方法來注冊處理相應事件的類

public void register(Object object) {
    Multimap<Class<?>, EventSubscriber> methodsInListener =
        finder.findAllSubscribers(object);
    subscribersByTypeLock.writeLock().lock();
    try {
      subscribersByType.putAll(methodsInListener);
    } finally {
      subscribersByTypeLock.writeLock().unlock();
    }
  }

  其核心是findAllSubscribers,找到實例中所有有Subscribe注解的方法並保存。返回的是一個Multimap < Class<?>,EventSubscriber>類型,其中Class是事件類型,EventSubsciber包含了類實例和具體處理事件的方法。Multimap保證了一種事件可以有多個監聽者來處理。

public Multimap<Class<?>, EventSubscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, EventSubscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      EventSubscriber subscriber = makeSubscriber(listener, method);
      methodsInListener.put(eventType, subscriber);
    }
    return methodsInListener;
  }

 發布事件

  EventBus通過post方法來發布事件,首先通過事件類型找到需要處理的事件:事件本身以及其父類。根據事件類型從事件訂閱的緩存中取出處理該事件的訂閱者,並將其入隊。最后處理該隊列中的數據.

public void post(Object event) {
    Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());

    boolean dispatched = false;
    for (Class<?> eventType : dispatchTypes) {
      subscribersByTypeLock.readLock().lock();
      try {
        Set<EventSubscriber> wrappers = subscribersByType.get(eventType);

        if (!wrappers.isEmpty()) {
          dispatched = true;
          for (EventSubscriber wrapper : wrappers) {
            enqueueEvent(event, wrapper);
          }
        }
      } finally {
        subscribersByTypeLock.readLock().unlock();
      }
    }

    if (!dispatched && !(event instanceof DeadEvent)) {
      post(new DeadEvent(this, event));
    }

    dispatchQueuedEvents();
  }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM