自定義阿里雲RocketMQ的starter


目的: 阿里雲RocketMQ官方示例中,需要自己在定義consumer中指定訂閱的listener,使用起來不太方便,因此想基於注解的方式,在MessageListener類上注解指定topic和tag,減少項目配置項,使使用更加方便。

使用效果:

步驟1:引入pom依賴

<dependency>
  <groupId>com.demo</groupId>
  <artifactId>mq-spring-boot-starter</artifactId>
  <version>1.0-SNAPSHOT</version>
</dependency>

步驟2:配置yml文件

aliyun:
  mq:
    onsAddr: ****
    accessKey: ***
    secretKey: ***
    producer:
      enabled: true  #true:開啟producer;false:不加載producer
    consumer:
      enabled: true 
      groupId: GID_zl01

步驟3:定義監聽

@Component
@MqTopicListener(topic="Tcc", tag="*")
public class TestMessageListener extends AbstractMessageListener {
    @Override
    public void handle(String s) {
        System.out.println("消息內容是:--------------------------");
        System.out.println(s);
    }
}

步驟4:發送消息

@Resource
private MqTimerProducer mqTimerProducer;

public String sendMessage() {
        mqTimerProducer.send("Tcc", "1", "hello", 0);
        return "ok";
    }

----------------------------------------------------------------------------------------分割線-------------------------------------------------------------------------------------

下面來說一下 mq-spring-boot-starter

所需spring知識點: 

  1. 自定義注解

  2.  ConditionalOnMissingBean

    3.  ConditionalOnProperty

  4. 以及spring相關知識點

步驟1: 使用ons-client 1.8.4.Final版本

<dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.4.Final</version>
        </dependency>

步驟2: 定義自定義注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqTopicListener {
    /**
     * 訂閱的topic
     *
     * @return 訂閱的topic
     */
    String topic() default "";

    /**
     * 訂閱的tag
     *
     * @return 訂閱的tag,默認是主題下的全部,多個tag用'||'拼接,所有用*
     */
    String tag() default "*";
}

步驟3:讀取yml中定義的mq的配置

@ConfigurationProperties(prefix = "aliyun.mq")
@Data
public class MqPropertiesConfig {
    private String nameSrvAddr;
    private String accessKey;
    private String secretKey;
    private Properties producer;
    private Properties consumer;
}

步驟4:定義Producer,在該類中初始好消息生產者 Producer,以及相關方法

public class MqTimerProducer {
    private final static Logger LOG = LoggerFactory.getLogger(MqTimerProducer.class);
    private Properties properties;
    private Producer producer;

    public MqTimerProducer(Properties properties) {
        if (properties == null
                || properties.get(PropertyKeyConst.AccessKey) == null
                || properties.get(PropertyKeyConst.SecretKey) == null
                || properties.get(PropertyKeyConst.NAMESRV_ADDR) == null) {
            throw new ONSClientException("producer properties not set properly.");
        }
        this.properties = properties;
    }

    public void start() {
        this.producer = ONSFactory.createProducer(this.properties);
        this.producer.start();
    }

    public void shutdown() {
        if (this.producer != null) {
            this.producer.shutdown();
        }
    }

    public void send(String topic, String tag, String body, long delay) {
        LOG.info("start to send message. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
        if (topic == null || tag == null || body == null) {
            throw new RuntimeException("topic, tag, or body is null.");
        }
        Message message = new Message(topic, tag, body.getBytes());
        message.setStartDeliverTime(System.currentTimeMillis() + delay);
        SendResult result = this.producer.send(message);
        LOG.info("send message success. ", result.toString());
    }


    public void sendAsync(String topic, String tag, String body, long delay) {
        this.sendAsync(topic, tag, body, delay, new DefaultSendCallback());
    }

    public void sendAsync(String topic, String tag, String body, long delay, SendCallback sendCallback) {
        LOG.info("start to send message async. [topic: {}, tag: {}, body: {}, delay: {}]", topic, tag, body, delay);
        if (topic == null || tag == null || body == null) {
            throw new RuntimeException("topic, tag, or body is null.");
        }
        Message message = new Message(topic, tag, body.getBytes());
        message.setStartDeliverTime(System.currentTimeMillis() + delay);
        this.producer.sendAsync(message, sendCallback);
    }


    public Properties getProperties() {
        return properties;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }

    public boolean isStarted() {
        return this.producer.isStarted();
    }

    public boolean isClosed() {
        return this.producer.isClosed();
    }
}

步驟5: 定義comsumer,初始化Consumer,並根據Topic和Tag綁定訂閱關系

public class MqConsumer implements ApplicationContextAware {

    private ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    private final static Logger LOG = LoggerFactory.getLogger(MqConsumer.class);
    private Properties properties;
    private Consumer consumer;

    public MqConsumer(Properties properties) {
        if (properties == null
                || properties.get(PropertyKeyConst.AccessKey) == null
                || properties.get(PropertyKeyConst.SecretKey) == null
                || properties.get(PropertyKeyConst.NAMESRV_ADDR) == null
                ) {
            throw new ONSClientException("consumer properties not set properly.");
        }
        this.properties = properties;
    }

    public void start() {
        this.consumer = ONSFactory.createConsumer(properties);
        this.consumer.start();
        this.subscribe();
    }

    public void shutdown() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
    }

    public void subscribe() {
        Map<String, Object> beanMap = applicationContext.getBeansWithAnnotation(MqTopicListener.class);
        if (beanMap == null || beanMap.size() == 0) {
            return;
        }
        for (Object bean : beanMap.values()) {
            createConsumer(bean);
        }
    }

    private void createConsumer(Object bean) {
        if(!(bean instanceof AbstractMessageListener)) {
            return;
        }
        //獲取注解
        MqTopicListener mqMsgListener = bean.getClass().getAnnotation(MqTopicListener.class);
        String topic = mqMsgListener.topic();
        String tag = mqMsgListener.tag();
        LOG.info("subscribe [topic: {}, tags: {}, messageListener: {}]", topic, tag, bean.getClass().getCanonicalName());
        consumer.subscribe(topic, tag, (AbstractMessageListener)bean);
    }
}

步驟6:定義MessageListener的抽象,用戶定義MessageListener需要繼承該類,並復寫handle方法,這里封裝了異常處理重試機制

public abstract class AbstractMessageListener implements MessageListener {

    private final static Logger LOG = LoggerFactory.getLogger(AbstractMessageListener.class);

    public abstract void handle(String body);

    @Override
    public Action consume(Message message, ConsumeContext context) {
        LOG.info("receive message. [topic: {}, tag: {}, body: {}, msgId: {}, startDeliverTime: {}]", message.getTopic(), message.getTag(), new String(message.getBody()), message.getMsgID(), message.getStartDeliverTime());
        try {
            handle(new String(message.getBody()));
            LOG.info("handle message success.");
            return Action.CommitMessage;
        } catch (Exception e) {
            //消費失敗
            LOG.warn("handle message fail, requeue it.", e);
            return Action.ReconsumeLater;
        }
    }
}

步驟7:將MqConsumer和Producer注入容器

@Configuration
@EnableConfigurationProperties(MqPropertiesConfig.class)
public class MqAutoConfig {

    @Autowired
    private MqPropertiesConfig propConfig;

    @Bean(initMethod="start", destroyMethod = "shutdown")
    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aliyun.mq.producer",value = "enabled",havingValue = "true")
    public MqTimerProducer mqTimerProducer(){
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
        properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, propConfig.getNameSrvAddr());
        return new MqTimerProducer(properties);
    }

    @Bean(initMethod="start", destroyMethod = "shutdown")
    @ConditionalOnMissingBean
    @ConditionalOnProperty(prefix = "aliyun.mq.consumer",value = "enabled",havingValue = "true")
    public MqConsumer mqConsumer(){
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.GROUP_ID, propConfig.getConsumer().getProperty("groupId"));
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        properties.setProperty(PropertyKeyConst.AccessKey, propConfig.getAccessKey());
        properties.setProperty(PropertyKeyConst.SecretKey, propConfig.getSecretKey());
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, propConfig.getNameSrvAddr());
        return new MqConsumer(properties);
    }
}

 項目文件:https://files.cnblogs.com/files/code-sayhi/mq-spring-boot-starter.7z


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM