阿里雲 rocketMq 延時消息


初始化消費者和生產者

  • 生產者 設置rocketmq的accesskey 和secretkey 以及rocketmq的 binder server。
    首先 編輯一個配置類,將關於配置rocketmq的東西寫在配置類中

`
@Component
@Getter
@Setter
@Slf4j
public class RocketMqConfig {

@Value("${spring.cloud.stream.rocketmq.binder.secret-key}")
private String secretKey;
@Value("${spring.cloud.stream.rocketmq.binder.access-key}")
private String accessKey;
@Value("${spring.cloud.stream.rocketmq.binder.name-server}")
private String nameServe;

private static final String TOPIC = "delay";
private static final String GROUP_ID = "GID_live_service_update_status";
private static final String TAG = "mq_delay_tag";
private Properties properties = null;

public String getTopic() {
    return TOPIC;
}

public String getTag() {
    return TAG;
}

public String getGroupId() {
    return GROUP_ID;
}

public Properties getProperties() {
    log.info("accessKey:" + getAccessKey());
    log.info("secretKey:" + getSecretKey());
    log.info("naemServer:" + getNameServe());
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.GROUP_ID, GROUP_ID);
    properties.setProperty(PropertyKeyConst.AccessKey, getAccessKey());
    properties.setProperty(PropertyKeyConst.SecretKey, getSecretKey());
    properties.put(PropertyKeyConst.NAMESRV_ADDR, getNameServe());
    return properties;
}

}

  • 生產者初始化
    `
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.Producer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;

/**

  • @author

  • @Date 2021/11/3.
    */
    @Component
    @Slf4j
    public class RocketMqProducerInit {

    @Autowired
    private RocketMqConfig mqConfig;

    private static Producer producer;

    @PostConstruct
    public void init(){
    log.info("啟動RocketMq生產者!");
    producer = ONSFactory.createProducer(mqConfig.getProperties());
    // 在發送消息前,初始化調用start方法來啟動Producer,只需調用一次即可,當項目關閉時,自動shutdown
    producer.start();
    }
    /**

    • 初始化生產者
    • @return
      */
      public Producer getProducer(){
      return producer;
      }

}
`

`

  • 編寫生產者
    `
    import cn.hutool.core.date.DateUtil;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.exception.ONSClientException;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

import static cn.hutool.core.convert.Convert.longToBytes;

@Component
@Slf4j
public class ProducerSendMessageHandler {
@Autowired
private RocketMqConfig config;

@Autowired
private RocketMqProducerInit producer;

public void sendGetLiveStatus(long periodId) {

    Message message = new Message(config.getTopic(), config.getTag(),  longToBytes(periodId));
    message.setStartDeliverTime(System.currentTimeMillis() + 100000);
    try {
        SendResult send = this.producer.getProducer().send(message);
        log.info("發送生產消息時間:"+DateUtil.now() + "發送延時消息成功! Topic is:" + config.getTopic() + "msgId is: " +
                send.getMessageId());
    } catch (ONSClientException e) {
        e.printStackTrace();
        log.error(e.getMessage());
    }
}

}
`

  • 監聽者

`
import cn.hutool.core.date.DateUtil;
import com.aliyun.openservices.ons.api.*;
import com.dapeng.cloud.service.live.application.period.PeriodManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import static cn.hutool.core.convert.Convert.bytesToLong;

@Component
@Slf4j
public class LiveMessageStreamListener {

@Autowired
private RocketMqConfig mqConfig;

private static Consumer consumer;
@Autowired
private PeriodManager periodManager;

@PostConstruct
public void init(){
    log.info("消費者啟動!");
    consumer = ONSFactory.createConsumer(mqConfig.getProperties());
    //監聽第一個topic,new對應的監聽器
    consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), new MessageListener() {
        @Override
        public Action consume(Message message, ConsumeContext context) {

            log.info("消費者接收到MQ消息時間: "+ DateUtil.now()+" -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), bytesToLong(message.getBody()));
            try {
                //調用場次接口
                long periodId = bytesToLong(message.getBody());
                log.info("獲取到 場次 id =" +periodId);
                log.info("開始執行調用異常回調處理");
               // 調用自己的業務代碼進行操作
                log.info("執行調用異常回調處理結果:"+b);
                //消費成功,繼續消費下一條消息
                return Action.CommitMessage;
            } catch (Exception e) {
                log.error("消費MQ消息失敗! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
                //消費失敗,告知服務器稍后再投遞這條消息,繼續消費其他消息
                return Action.ReconsumeLater;
            }
        }
    });
    // 在發送消息前,必須調用start方法來啟動consumer,只需調用一次即可,當項目關閉時,自動shutdown
    consumer.start();
}
/**
 * 初始化消費者
 * @return consumer
 */
public Consumer getConsumer(){
    return consumer;
}

}
`


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM