EventBus 不是通用的消息系統,也不是用來做進程間的通信的,而是在進程內,用於解耦兩段直接調用的業務邏輯;
1、代碼結構
- event:eventbus中流轉的事件(消息),包結構按照業務模塊在細分(比如應用部署模塊就是deployment);
- subscriber:消費者,和event 是一一對應的,一個event 對應一個消費者,包結構按照業務模塊在細分(比如應用部署模塊就是deployment);
- poster:生產者,這邊把生產者單獨出來是為了收斂入口,這樣可以方便的知道有哪些地方在生產消息,按照業務模塊分為不同的類(因為生產消息的功能比較單薄);
2、代碼實現
在applicationContext.xml 中定義好EventBus
asyncEventBus
<
bean
id
=
"taskExecutor"
class
=
"org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
lazy-init
=
"true"
>
<
property
name
=
"corePoolSize"
value
=
"10"
/>
<
property
name
=
"maxPoolSize"
value
=
"50"
/>
<
property
name
=
"queueCapacity"
value
=
"10000"
/>
<
property
name
=
"keepAliveSeconds"
value
=
"300"
/>
<
property
name
=
"rejectedExecutionHandler"
>
<
bean
class
=
"java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"
/>
</
property
>
</
bean
>
<
bean
id
=
"asyncEventBus"
class
=
"com.google.common.eventbus.AsyncEventBus"
>
<
constructor-arg
name
=
"executor"
ref
=
"taskExecutor"
/>
</
bean
>
|
2.1、標准化subscriber
所有的subscriber都要實現 BaseSubscriber這個 interface
BaseSubscriber
public
interface
BaseSubscriber<E> {
/**
* event 處理邏輯入口
**/
void
subscribe(E event);
}
|
所有的subscriber在類上加上EventBusRegister 這個annotation
EventBusRegister
@Target
({ElementType.TYPE})
@Retention
(RetentionPolicy.RUNTIME)
@Documented
public
@interface
EventBusRegister {
}
|
實現EventBusAdapter用於自動注冊subscriber
EventBusAdapter
@Component
public
class
EventBusAdapter
implements
ApplicationContextAware, InitializingBean {
@Autowired
private
AsyncEventBus asyncEventBus;
private
ApplicationContext applicationContext;
@Override
public
void
afterPropertiesSet()
throws
Exception {
this
.applicationContext.getBeansWithAnnotation(EventBusRegister.
class
).forEach((name, bean) -> {
asyncEventBus.register(bean);
});
}
@Override
public
void
setApplicationContext(ApplicationContext applicationContext)
throws
BeansException {
this
.applicationContext = applicationContext;
}
}
|
舉個例子
BuildUpdateSubscriber
@Component
@EventBusRegister
public
class
BuildUpdateSubscriber
implements
BaseSubscriber<BuildUpdateEvent> {
@Autowired
private
BuildService buildService;
@Subscribe
@Override
public
void
subscribe(BuildUpdateEvent event) {
switch
(event.getEventType()) {
case
BUILD_CONNECTED:
List<BuildVo> buildVos = (List<BuildVo>) event.getData();
buildService.addBuildVosAndTriggerConnectEvent(buildVos);
break
;
case
BUILD_ADD:
BuildVo addedBuildVo = (BuildVo) event.getData();
buildService.addBuildVoAndTriggerClientEvent(addedBuildVo);
break
;
case
BUILD_MODIFY:
BuildVo modifiedBuildVo = (BuildVo) event.getData();
buildService.modifyBuildVoAndTriggerEvent(modifiedBuildVo);
break
;
case
BUILD_DELETE:
BuildVo deletedBuildVo = (BuildVo) event.getData();
buildService.deleteBuildVoAndTriggerClientEvent(deletedBuildVo);
break
;
default
:
// ignore
break
;
}
}
}
|
3、代碼實現改進
前面通過規范代碼的包結構、加了一些trick使得我們可以方便的使用eventbus解耦我們的業務邏輯,但是有時候我們需要的bean被注冊 的前后做一些業務邏輯,所以我們在bean 被注冊到eventbus前后加了兩個hook:AfterRegisterProcessor、BeforeRegisterProcessor;實現這兩個interface並且實現對於的方法,會在bean 被注冊前后被調用
bean 注冊到eventbus前的hook
BeforeRegisterProcessor
public
interface
BeforeRegisterProcessor {
void
beforeRegister();
}
|
bean 注冊到eventbus后的hook
AfterRegisterProcessor
public
interface
AfterRegisterProcessor {
void
afterRegister();
}
|
實現:保證在 client.watch 之前,注冊已經完成,這樣watch產生的消息就能夠保證被成功消費
GlueService
@Service
public
class
GlueService
implements
AfterRegisterProcessor {
@Autowired
private
PodListener podListener;
@Autowired
private
RouteListener routerListener;
@Autowired
private
BuildListener buildListener;
@Autowired
private
DeploymentListener deploymentListener;
@Autowired
private
OpenShiftClient openShiftClient;
@Override
public
void
afterRegister() {
IClient client = openShiftClient.getClient();
podWatch = client.watch(podListener, ResourceKind.POD);
routeWatch = client.watch(routerListener, ResourceKind.ROUTE);
buildWatch = client.watch(buildListener, ResourceKind.BUILD);
deploymentWatch = client.watch(deploymentListener, ResourceKind.REPLICATION_CONTROLLER);
}
}
|