目的: 阿里雲RocketMQ官方示例中,需要自己在定義consumer中指定訂閱的listener,使用起來不太方便,因此想基於注解的方式,在MessageListener類上注解指定topic和tag,減少項目配置項,使使用更加方便。
使用效果:
步驟1:引入pom依賴
<dependency>
<groupId>com.demo</groupId>
<artifactId>mq-spring-boot-starter</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
步驟2:配置yml文件
aliyun: mq: onsAddr: **** accessKey: *** secretKey: *** producer: enabled: true #true:開啟producer;false:不加載producer consumer: enabled: true groupId: GID_zl01
步驟3:定義監聽
@Component @MqTopicListener(topic="Tcc", tag="*") public class TestMessageListener extends AbstractMessageListener { @Override public void handle(String s) { System.out.println("消息內容是:--------------------------"); System.out.println(s); } }
步驟4:發送消息
@Resource private MqTimerProducer mqTimerProducer; public String sendMessage() { mqTimerProducer.send("Tcc", "1", "hello", 0); return "ok"; }
----------------------------------------------------------------------------------------分割線-------------------------------------------------------------------------------------
下面來說一下 mq-spring-boot-starter
所需spring知識點:
1. 自定義注解
2. ConditionalOnMissingBean
3. ConditionalOnProperty
4. 以及spring相關知識點
步驟1: 使用ons-client 1.8.4.Final版本
<dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.4.Final</version> </dependency>
步驟2: 定義自定義注解
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface MqTopicListener { /** * 訂閱的topic * * @return 訂閱的topic */ String topic() default ""; /** * 訂閱的tag * * @return 訂閱的tag,默認是主題下的全部,多個tag用'||'拼接,所有用* */ String tag() default "*"; }
步驟3:讀取yml中定義的mq的配置
@ConfigurationProperties(prefix = "aliyun.mq") @Data public class MqPropertiesConfig { private String nameSrvAddr; private String accessKey; private String secretKey; private Properties producer; private Properties consumer; }
步驟4:定義Producer,在該類中初始好消息生產者 Producer,以及相關方法
public class MqTimerProducer { private final static Logger LOG = LoggerFactory.getLogger(MqTimerProducer.class); private Properties properties; private Producer producer; public MqTimerProducer(Properties properties) { if (properties == null || properties.get(PropertyKeyConst.AccessKey) == null || properties.get(PropertyKeyConst.SecretKey) == null || properties.get(PropertyKeyConst.NAMESRV_ADDR) == null) { throw new ONSClientException("producer properties not set properly."); } this.properties = properties; } public void start() { this.producer = ONSFactory.createProducer(this.properties); this.producer.start(); } public void shutdown() { if (this.producer != null) { this.producer.shutdown(); } } public void send(String topic, String tag, String body, long delay) { LOG.info("start to send message. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay); if (topic == null || tag == null || body == null) { throw new RuntimeException("topic, tag, or body is null."); } Message message = new Message(topic, tag, body.getBytes()); message.setStartDeliverTime(System.currentTimeMillis() + delay); SendResult result = this.producer.send(message); LOG.info("send message success. ", result.toString()); } public void sendAsync(String topic, String tag, String body, long delay) { this.sendAsync(topic, tag, body, delay, new DefaultSendCallback()); } public void sendAsync(String topic, String tag, String body, long delay, SendCallback sendCallback) { LOG.info("start to send message async. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay); if (topic == null || tag == null || body == null) { throw new RuntimeException("topic, tag, or body is null."); } Message message = new Message(topic, tag, body.getBytes()); message.setStartDeliverTime(System.currentTimeMillis() + delay); this.producer.sendAsync(message, sendCallback); } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public boolean isStarted() { return this.producer.isStarted(); } public boolean isClosed() { return this.producer.isClosed(); } }
步驟5: 定義comsumer,初始化Consumer,並根據Topic和Tag綁定訂閱關系
public class MqConsumer implements ApplicationContextAware { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } private final static Logger LOG = LoggerFactory.getLogger(MqConsumer.class); private Properties properties; private Consumer consumer; public MqConsumer(Properties properties) { if (properties == null || properties.get(PropertyKeyConst.AccessKey) == null || properties.get(PropertyKeyConst.SecretKey) == null || properties.get(PropertyKeyConst.NAMESRV_ADDR) == null ) { throw new ONSClientException("consumer properties not set properly."); } this.properties = properties; } public void start() { this.consumer = ONSFactory.createConsumer(properties); this.consumer.start(); this.subscribe(); } public void shutdown() { if (this.consumer != null) { this.consumer.shutdown(); } } public void subscribe() { Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(MqTopicListener.class); if (beanMap == null || beanMap.size() == 0) { return; } for (Object bean : beanMap.values()) { createConsumer(bean); } } private void createConsumer(Object bean) { if(!(bean instanceof AbstractMessageListener)) { return; } //獲取注解 MqTopicListener mqMsgListener = bean.getClass().getAnnotation(MqTopicListener.class); String topic = mqMsgListener.topic(); String tag = mqMsgListener.tag(); LOG.info("subscribe [topic: {}, tags: {}, messageListener: {}]", topic, tag, bean.getClass().getCanonicalName()); consumer.subscribe(topic, tag, (AbstractMessageListener)bean); } }
步驟6:定義MessageListener的抽象,用戶定義MessageListener需要繼承該類,並復寫handle方法,這里封裝了異常處理重試機制
public abstract class AbstractMessageListener implements MessageListener { private final static Logger LOG = LoggerFactory.getLogger(AbstractMessageListener.class); public abstract void handle(String body); @Override public Action consume(Message message, ConsumeContext context) { LOG.info("receive message. [topic: {}, tag: {}, body: {}, msgId: {}, startDeliverTime: {}]", message.getTopic(), message.getTag(), new String(message.getBody()), message.getMsgID(), message.getStartDeliverTime()); try { handle(new String(message.getBody())); LOG.info("handle message success."); return Action.CommitMessage; } catch (Exception e) { //消費失敗 LOG.warn("handle message fail, requeue it.", e); return Action.ReconsumeLater; } } }
步驟7:將MqConsumer和Producer注入容器
@Configuration @EnableConfigurationProperties(MqPropertiesConfig.class) public class MqAutoConfig { @Autowired private MqPropertiesConfig propConfig; @Bean(initMethod="start", destroyMethod = "shutdown") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "aliyun.mq.producer",value = "enabled",havingValue = "true") public MqTimerProducer mqTimerProducer(){ Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey()); properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey()); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, propConfig.getNameSrvAddr()); return new MqTimerProducer(properties); } @Bean(initMethod="start", destroyMethod = "shutdown") @ConditionalOnMissingBean @ConditionalOnProperty(prefix = "aliyun.mq.consumer",value = "enabled",havingValue = "true") public MqConsumer mqConsumer(){ Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.GROUP_ID, propConfig.getConsumer().getProperty("groupId")); properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey()); properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey()); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, propConfig.getNameSrvAddr()); return new MqConsumer(properties); } }
項目文件:https://files.cnblogs.com/files/code-sayhi/mq-spring-boot-starter.7z