Guava EventBus集成spring


EventBus 不是通用的消息系統,也不是用來做進程間的通信的,而是在進程內,用於解耦兩段直接調用的業務邏輯;

1、代碼結構

  • event:eventbus中流轉的事件(消息),包結構按照業務模塊在細分(比如應用部署模塊就是deployment);
  • subscriber:消費者,和event 是一一對應的,一個event 對應一個消費者,包結構按照業務模塊在細分(比如應用部署模塊就是deployment);
  • poster:生產者,這邊把生產者單獨出來是為了收斂入口,這樣可以方便的知道有哪些地方在生產消息,按照業務模塊分為不同的類(因為生產消息的功能比較單薄);

2、代碼實現

在applicationContext.xml 中定義好EventBus

asyncEventBus
< bean  id = "taskExecutor"  class = "org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"  lazy-init = "true" >
     < property  name = "corePoolSize"  value = "10" />
     < property  name = "maxPoolSize"  value = "50" />
     < property  name = "queueCapacity"  value = "10000" />
     < property  name = "keepAliveSeconds"  value = "300" />
     < property  name = "rejectedExecutionHandler" >
         < bean  class = "java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
     </ property >
</ bean >
< bean  id = "asyncEventBus"  class = "com.google.common.eventbus.AsyncEventBus" >
     < constructor-arg  name = "executor"  ref = "taskExecutor" />
</ bean >

2.1、標准化subscriber

所有的subscriber都要實現 BaseSubscriber這個 interface

BaseSubscriber
public  interface  BaseSubscriber<E> {
 
     /**
      * event 處理邏輯入口
      **/
     void  subscribe(E event);
}

所有的subscriber在類上加上EventBusRegister 這個annotation

EventBusRegister
@Target ({ElementType.TYPE})
@Retention (RetentionPolicy.RUNTIME)
@Documented
public  @interface  EventBusRegister {
}

實現EventBusAdapter用於自動注冊subscriber

EventBusAdapter
@Component
public  class  EventBusAdapter  implements  ApplicationContextAware, InitializingBean {
     @Autowired
     private  AsyncEventBus asyncEventBus;
 
     private  ApplicationContext applicationContext;
 
     @Override
     public  void  afterPropertiesSet()  throws  Exception {
         this .applicationContext.getBeansWithAnnotation(EventBusRegister. class ).forEach((name, bean) -> {
             asyncEventBus.register(bean);
         });
     }
 
     @Override
     public  void  setApplicationContext(ApplicationContext applicationContext)  throws  BeansException {
         this .applicationContext = applicationContext;
     }
}

舉個例子

BuildUpdateSubscriber
@Component
@EventBusRegister
public  class  BuildUpdateSubscriber  implements  BaseSubscriber<BuildUpdateEvent> {
     @Autowired
     private  BuildService buildService;
 
     @Subscribe
     @Override
     public  void  subscribe(BuildUpdateEvent event) {
         switch  (event.getEventType()) {
             case  BUILD_CONNECTED:
                 List<BuildVo> buildVos = (List<BuildVo>) event.getData();
                 buildService.addBuildVosAndTriggerConnectEvent(buildVos);
                 break ;
             case  BUILD_ADD:
                 BuildVo addedBuildVo = (BuildVo) event.getData();
                 buildService.addBuildVoAndTriggerClientEvent(addedBuildVo);
                 break ;
             case  BUILD_MODIFY:
                 BuildVo modifiedBuildVo = (BuildVo) event.getData();
                 buildService.modifyBuildVoAndTriggerEvent(modifiedBuildVo);
                 break ;
             case  BUILD_DELETE:
                 BuildVo deletedBuildVo = (BuildVo) event.getData();
                 buildService.deleteBuildVoAndTriggerClientEvent(deletedBuildVo);
                 break ;
             default :
                 // ignore
                 break ;
         }
     }
}

3、代碼實現改進

前面通過規范代碼的包結構、加了一些trick使得我們可以方便的使用eventbus解耦我們的業務邏輯,但是有時候我們需要的bean被注冊 的前后做一些業務邏輯,所以我們在bean 被注冊到eventbus前后加了兩個hook:AfterRegisterProcessor、BeforeRegisterProcessor;實現這兩個interface並且實現對於的方法,會在bean 被注冊前后被調用

bean 注冊到eventbus前的hook

BeforeRegisterProcessor
public  interface  BeforeRegisterProcessor {
     void  beforeRegister();
}

bean 注冊到eventbus后的hook

AfterRegisterProcessor
public  interface  AfterRegisterProcessor {
     void  afterRegister();
}

實現:保證在 client.watch 之前,注冊已經完成,這樣watch產生的消息就能夠保證被成功消費

GlueService
@Service
public  class  GlueService  implements  AfterRegisterProcessor {
     @Autowired
     private  PodListener podListener;
 
     @Autowired
     private  RouteListener routerListener;
 
     @Autowired
     private  BuildListener buildListener;
 
     @Autowired
     private  DeploymentListener deploymentListener;
 
     @Autowired
     private  OpenShiftClient openShiftClient;
 
     @Override
     public  void  afterRegister() {
         IClient client = openShiftClient.getClient();
         podWatch = client.watch(podListener, ResourceKind.POD);
         routeWatch = client.watch(routerListener, ResourceKind.ROUTE);
         buildWatch = client.watch(buildListener, ResourceKind.BUILD);
         deploymentWatch = client.watch(deploymentListener, ResourceKind.REPLICATION_CONTROLLER);
     }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM