申明
本文章首發自本人公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫
22.jpg
前言
之前出過一個設計模式的系列文章,這些文章和其他講設計模式的文章 有些不同
文章沒有拘泥於講解設計模式的原理,更多的是梳理工作中實際用到的一些設計模式,並提取出對應業務模型進行總結,回顧下之前的一些文章:
【一起學設計模式】策略模式實戰一:基於消息發送的策略模式實戰
【一起學習設計模式】策略模式實戰二:配合注解 干掉業務代碼中冗余的if else…
【一起學設計模式】命令模式+模板方法+工廠方法實戰: 如何優雅的更新商品庫存…
【一起學設計模式】狀態模式+裝飾器模式+簡單工廠模式實戰:(一)提交個訂單我到底經歷了什么鬼?
【一起學設計模式】中介者模式+觀察者模式+備忘錄模式實戰:(二)提交個訂單我到底經歷了什么鬼?
所以:任何脫離實際業務的設計模式都是耍流氓
image.png
業務梳理
最近項目在對接神策埋點相關需求。
有一個場景是:產品自定義了很多埋點事件,有些事件需要后端進行一定的業務處理,然后進行埋點。
業務其實很簡單,就是前端請求到后端,后端進行一定業務處理組裝后將數據發送到神策后台。
說到這里是不是還有小伙伴沒聽懂??那么就畫張圖吧:
image.png
這里只是簡單的舉個栗子,說明下業務場景。
針對這個業務場景,最開始的想法是盡量少的侵入原有業務方法,所以這里選擇使用觀察者模式。
原有業務場景中加入發布事件的能力,然后訂閱者自己消費進行埋點數據邏輯。做到盡可能的業務解耦。
觀察者模式
這里還是要多啰嗦幾句,說下觀察者模式原理:
所謂的觀察者模式也稱為發布訂閱模式,這里肯定至少存在兩種角色:發布者/訂閱者
接着看下UML圖:
image.png
所涉及到的角色如下:
- 抽象主題(Subject):提供接口,可以增加和剔除觀察者對象。一般用抽象類或者接口實現。
- 抽象觀察者(Observer):提供接口,在得到主題的通知時更新自己。一般用抽象類或者接口實現。
- 具體主題(ConcreteSubject):將有關狀態存入具體觀察者,在具體主題的內部狀態發生變化時,給所有注冊過的觀察者發出通知。一般是具體子類實現。
- 具體觀察者(ConcreteObserver):存儲與主題的狀態自恰的狀態。具體觀察者角色實現抽象觀察者角色所要求的更新接口,以便使本身的狀態與主題的狀態 像協調。如果需要,具體觀察者角色可以保持一個指向具體主題對象的引用
在上述類圖中,ConcreteSubject中有一個存儲Observer的列表,這意味着ConcreteSubject並不需要知道引用了哪些ConcreteObserver,只要實現(繼承)了Observer的對象都可以存到該列表中。在需要的時候調用Observer的update方法。
話不多說,我們自己動手來模擬一個簡單的觀察者模式:
/**
* 觀察者模式測試代碼
*
* @author wangmeng
* @date 2020/4/25 19:38
*/
public class ObserverTest {
public static void main(String[] args) {
Subject subject = new Subject();
Task1 task1 = new Task1();
subject.addObserver(task1);
Task2 task2 = new Task2();
subject.addObserver(task2);
subject.notifyObserver("xxxx");
}
}
class Subject {
// observer集合
private List<Observer> observerList = Lists.newArrayList();
// add
public void addObserver(Observer observer) {
observerList.add(observer);
}
// remove
public void removeObserver(Observer observer) {
observerList.remove(observer);
}
// 通知觀察者
public void notifyObserver(Object object) {
for (Observer item : observerList) {
item.update(object);
}
}
}
interface Observer {
void update(Object object);
}
class Task1 implements Observer {
@Override
public void update(Object object) {
System.out.println("task1 received: " + object);
}
}
class Task2 implements Observer {
@Override
public void update(Object object) {
System.out.println("task2 received: " + object);
}
}
針對於觀察者模式,JDK和Spring也有一些內置實現,具體可以參見:JDK中Observable
,Spring中ApplicationListener
這里就不再贅述了,想深入了解的小伙伴可執行谷歌,畢竟我們這次文章的重點還是Guava
中觀察者模式的使用實現原理。
業務代碼示例
這里使用的是Guava中自帶的EventBus組件,我們繼續用取消訂單業務場景做示例,這里抽離了部分代碼,只展示核心的一些代碼:
1. 事件總線服務
/**
* 事件總線服務
*
* @author wangmeng
* @date 2020/4/14
*/
@Service
public class EventBusService {
/**
* 訂閱者異步執行器,如果同步可以使用EventBus
**/
@Autowired
private AsyncEventBus asyncEventBus;
/**
* 訂閱者集合,里面方法通過@Subscribe進行事件訂閱
**/
@Autowired
private EventListener eventListener;
/**
* 注冊方法,啟動的時候將所有的訂閱者進行注冊
**/
@PostConstruct
public void register() {
asyncEventBus.register(eventListener);
}
/**
* 消息投遞,根據入參自動投遞到對應的方法中去消費。
*/
public void post(Object object) {
asyncEventBus.post(object);
}
}
這里使用了異步的實現方式,如果使用同步的方式可以將AsyncEventBus
改為EventBus
2. 異步AsyncEventBus配置:
/**
* AsyncEventBus 線程池配置
*
* @author wangmeng
* @date 2020/04/14
*/
@Configuration
public class EventBusConfiguration {
/** Set the ThreadPoolExecutor's core pool size. */
private int corePoolSize = 10;
/** Set the ThreadPoolExecutor's maximum pool size. */
private int maxPoolSize = 30;
/** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */
private int queueCapacity = 500;
@Bean
public AsyncEventBus asyncEventBus() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setThreadNamePrefix("jv-mall-user-sensorsData:");
executor.initialize();
return new AsyncEventBus(executor);
}
}
線程池數據大家可以隨意配置,這里只是參考。
3. 觀察者實現
/**
* 觀察者代碼
*
* @author wangmeng
* @date 2020/4/14
*/
@Service
@Slf4j
public class EventListener {
@Autowired
private SensorsDataManager sensorsDataManager;
/**
* 觀察者處理數據埋點方法
*/
@Subscribe
@AllowConcurrentEvents
public void handleCancelOrderEvent(TrackCancelOrderDTO cancelOrderDTO) {
Map<String, Object> propertyMap = this.buildBasicProperties(cancelOrderDTO);
propertyMap.put(SensorsDataConstants.ORDER_ID, registerDTO.getOrderId());
// 各種屬性賦值,這里只截取一點
propertyMap.put(SensorsDataConstants.PROPERTY_IS_SUCCESS, registerDTO.getIsSuccess());
propertyMap.put(SensorsDataConstants.PROPERTY_FAIL_REASON, registerDTO.getFailReason());
sensorsDataManager.send(registerDTO.getUserId(), SensorsEventConstants.EVENT_CANCEL_ORDER, propertyMap);
}
}
這個EventLister
是我們在上面EventBusService
中注冊的類,觀察者方法上面添加@Subscribe
即可對發布者的數據進行訂閱。
@AllowConcurrentEvents
注解字面意思是允許事件並發執行,這個原理后面會講。
PS:這里sensorsDataManager
是封裝生成埋點相關的類。
發布者實現
在業務邏輯中加入埋點數據發布的方法:
@Autowired
private EventBusService eventBusService;
public void cancelOrder(Long orderId) {
// 業務邏輯執行
// 埋點數據
TrackCancelOrderDTO trackCancelOrderDTO = trackBaseOrderInfoManager.buildTrackBaseOrderDTO(orderInfoDO, context.getOrderParentInfoDO(), TrackCancelOrderDTO.class);
trackCancelOrderDTO.setCancelReason(orderInfoDO.getCancelReason());
trackCancelOrderDTO.setCancelTime(orderInfoDO.getCancelTime());
trackCancelOrderDTO.setPlatformName(SensorsDataConstants.PLATFORM_APP);
trackCancelOrderDTO.setUserId(orderInfoDO.getUserId().toString());
eventBusService.post(trackCancelOrderDTO);
}
到了這里所有的如何使用EventBus
的代碼都已經貼出來了,下面就看看具體的源碼實現吧
源碼剖析
事件總線訂閱源碼實現
com.google.common.eventbus.SubscriberRegistry#register:
void register(Object listener) {
//查找所有訂閱者,維護了一個key是事件類型,value是定訂閱這個事件類型的訂閱者集合的一個map
Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
//獲取事件類型
Class<?> eventType = entry.getKey();
//獲取這個事件類型的訂閱者集合
Collection<Subscriber> eventMethodsInListener = entry.getValue();
//從緩存中按事件類型查找訂閱者集合
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
//從緩存中取不到,更新緩存
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers =
MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
事件和訂閱事件的訂閱者集合是在com.google.common.eventbus.SubscriberRegistry這里維護的:
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
到這里,訂閱者已經准備好了,准備接受事件了。通過debug 看下subscribers
中數據:
image.png
發布事件源碼實現
com.google.common.eventbus.EventBus#post
public void post(Object event) {
//獲取事件的訂閱者集合
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//轉發事件
dispatcher.dispatch(event, eventSubscribers);
//如果不是死亡事件,重新包裝成死亡事件重新發布
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
Iterator<Subscriber> getSubscribers(Object event) {
//獲取事件類型類的超類集合
ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators = Lists.newArrayListWithCapacity(eventTypes.size());
for (Class<?> eventType : eventTypes) {
//獲取事件類型的訂閱者集合
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
// eager no-copy snapshot
subscriberIterators.add(eventSubscribers.iterator());
}
}
return Iterators.concat(subscriberIterators.iterator());
}
事件轉發器有三種實現:
image.png
第一種是立即轉發,實時性比較高,其他兩種都是隊列實現。
我們使用的是AsyncEventBus
,其中指定的事件轉發器是:LegacyAsyncDispatcher
,接着看看其中的dispatch()
方法的實現:
com.google.common.eventbus.Dispatcher.LegacyAsyncDispatcher
private static final class LegacyAsyncDispatcher extends Dispatcher {
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
checkNotNull(event);
while (subscribers.hasNext()) {
// 先將所有發布的事件放入隊列中
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
// 消費隊列中的消息
e.subscriber.dispatchEvent(e.event);
}
}
}
接着看subscriber.dispatchEvent()
方法實現:
final void dispatchEvent(final Object event) {
executor.execute(
new Runnable() {
@Override
public void run() {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
執行訂閱方法都是異步實現,我們在上面初始化AsyncEventBus
的時候有為其構造線程池,就是在這里使用的。
在看invokeSubscriberMethod()
具體代碼之前,我們先來看看@AllowConcurrentEvents
,我們在訂閱方法上有加這個注解,來看看這個注解的作用吧:
image.png
在我們執行register()
方法的時候,會為每一個訂閱者構造一個Subscriber
對象,如果配置了@AllowConcurrentEvents
注解,就會為它配置一個允許並發的Subscriber
對象。
class Subscriber {
/**
* Creates a {@code Subscriber} for {@code method} on {@code listener}.
*/
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
private static boolean isDeclaredThreadSafe(Method method) {
// 如果有AllowConcurrentEvents注解,則返回true
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
@VisibleForTesting
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
// 通過反射直接執行訂閱者中方法
method.invoke(target, checkNotNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
@VisibleForTesting
static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
// SynchronizedSubscriber不支持並發,這里用synchronized修飾,所有執行都串行化執行
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
}
}
這里面包含了invokeSubscriberMethod()
方法的實現原理,其實就是通過反射去執行訂閱者中的方法。
還有就是如果沒有添加注解,就會走SynchronizedSubscriber
中invokeSubscriberMethod()
邏輯,添加了synchronized
關鍵字,不支持並發執行。
總結
這里主要是整理了guava 中實現觀察者模式的使用及原理。
大家如果有類似的業務場景也可以使用到自己項目中。