前幾天寫了一個消息中間件(kafka)的封裝,業務方發現消費者需要配置的東西太多(每增加一個topic和實現類都需要在配置文件中加,會顯得很繁瑣)。於是我為了盡量減少這個XML配置,采用注解的方式來獲取topic和實現類。
第一步:先自定義一個注解,有一個topic的方法用於表明需要監聽的topic
@Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) @Documented public @interface TopicSub { TopicType topic() ; }
第二步:獲取所有使用該注解的類以及注解值
List<MessageContainer> messageContainerList= Lists.newArrayList(); Map<String, Object> maps= SpringBeanUtils.getAnnotation(TopicSub.class); for (Map.Entry<String, Object> entry : maps.entrySet()) { TopicSub topicAnn = entry.getValue().getClass().getAnnotation(TopicSub.class); MessageContainer messageContainer=new MessageContainer(topicAnn.topic().getName()); messageContainer.setMessageHandle((MessageListener)entry.getValue()); messageContainerList.add(messageContainer); }
特別說明:SpringBeanUtils.getAnnotation(TopicSub.class)是調用ApplicationContext容器的 getBeansWithAnnotation(Class<? extends Annotation> var1)方法,返回的是所有使用該注解的bean
然后遍歷這個map,拿到每個bean上的注解值(topic方法的值);這里拿bean的注解值的方式是反射.getClass().getAnnotation(),如果bean是放在被代理的包下(service包下一般都是被事務代理了,AOP的實現就是JDK(CJLIB)動態代理),那么.getClass().getAnnotation()這個方法是拿不到注解值的,因為.getClass()拿到的是代理類而不是真正的類,建議這些實現類放在一個不被代理的包下(就是建立一個普通的package即可),如果放在代理包下,那就需要先獲取真正的bean,再反射。
三、監聽到消息后,根據topic的值調用實現類
for (MessageContainer messageContainer : PropertyFactory.consumerProperty.getMessageContainers()) { if (consumerRecord.topic().equals(messageContainer.getTopic())) { try { messageContainer.getMessageHandle().onMessage(consumerMessageBO); }catch (KafkaComsumException e){ } }
四、業務方實現類
@Component @TopicSub(topic = TopicType.SMS_ASYNC_SEND) public class SmsMessageHandler implements MessageListener{ public static final Logger log= LoggerFactory.getLogger(SmsMessageHandler.class); @Autowired private SmsSendLogDao smsSendLogDao; public void onMessage(ConsumerMessageBO consumerMessageBO) throws KafkaComsumException{ log.info(JSONObject.toJSONString(consumerMessageBO)); try { SmsMessageEO message =JSONObject.parseObject(consumerMessageBO.getMessage().toString(),SmsMessageEO.class); for(String phone:message.getPhone() ){ SmsResult result= SmsUtils.sendSms(phone,message.getContent(),message.getProduct()); SmsSendLogDO smsSendLogDO=new SmsSendLogDO(); smsSendLogDO.setContent(message.getContent()); smsSendLogDO.setPhone(phone); smsSendLogDO.setProduct(message.getProduct()); smsSendLogDO.setCreateTime(new Date()); smsSendLogDO.setStatus(result.getCode()==1?1:0); smsSendLogDO.setSender(message.getSender().getName()); smsSendLogDao.insert(smsSendLogDO); } }catch (Exception e){ throw new KafkaComsumException(); } } }
前三步都是在jar包中完成(對業務方是透明的),業務方只需要完成第四步消息實現類的邏輯即可(不需要任何xml的配置),有沒有感覺很清爽啊!
這樣我們就進一步提升了中間件解耦的程度,其實也滿簡單的,唯一的坑就是AOP中反射獲取到的是代理類這個問題,也是小伙伴們需要注意的!