阿里云 rocketMq 延时消息


初始化消费者和生产者

  • 生产者 设置rocketmq的accesskey 和secretkey 以及rocketmq的 binder server。
    首先 编辑一个配置类,将关于配置rocketmq的东西写在配置类中

`
@Component
@Getter
@Setter
@Slf4j
public class RocketMqConfig {

@Value("${spring.cloud.stream.rocketmq.binder.secret-key}")
private String secretKey;
@Value("${spring.cloud.stream.rocketmq.binder.access-key}")
private String accessKey;
@Value("${spring.cloud.stream.rocketmq.binder.name-server}")
private String nameServe;

private static final String TOPIC = "delay";
private static final String GROUP_ID = "GID_live_service_update_status";
private static final String TAG = "mq_delay_tag";
private Properties properties = null;

public String getTopic() {
    return TOPIC;
}

public String getTag() {
    return TAG;
}

public String getGroupId() {
    return GROUP_ID;
}

public Properties getProperties() {
    log.info("accessKey:" + getAccessKey());
    log.info("secretKey:" + getSecretKey());
    log.info("naemServer:" + getNameServe());
    Properties properties = new Properties();
    properties.setProperty(PropertyKeyConst.GROUP_ID, GROUP_ID);
    properties.setProperty(PropertyKeyConst.AccessKey, getAccessKey());
    properties.setProperty(PropertyKeyConst.SecretKey, getSecretKey());
    properties.put(PropertyKeyConst.NAMESRV_ADDR, getNameServe());
    return properties;
}

}

  • 生产者初始化
    `
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.Producer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import javax.annotation.PostConstruct;

/**

  • @author

  • @Date 2021/11/3.
    */
    @Component
    @Slf4j
    public class RocketMqProducerInit {

    @Autowired
    private RocketMqConfig mqConfig;

    private static Producer producer;

    @PostConstruct
    public void init(){
    log.info("启动RocketMq生产者!");
    producer = ONSFactory.createProducer(mqConfig.getProperties());
    // 在发送消息前,初始化调用start方法来启动Producer,只需调用一次即可,当项目关闭时,自动shutdown
    producer.start();
    }
    /**

    • 初始化生产者
    • @return
      */
      public Producer getProducer(){
      return producer;
      }

}
`

`

  • 编写生产者
    `
    import cn.hutool.core.date.DateUtil;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.SendResult;
    import com.aliyun.openservices.ons.api.exception.ONSClientException;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;

import static cn.hutool.core.convert.Convert.longToBytes;

@Component
@Slf4j
public class ProducerSendMessageHandler {
@Autowired
private RocketMqConfig config;

@Autowired
private RocketMqProducerInit producer;

public void sendGetLiveStatus(long periodId) {

    Message message = new Message(config.getTopic(), config.getTag(),  longToBytes(periodId));
    message.setStartDeliverTime(System.currentTimeMillis() + 100000);
    try {
        SendResult send = this.producer.getProducer().send(message);
        log.info("发送生产消息时间:"+DateUtil.now() + "发送延时消息成功! Topic is:" + config.getTopic() + "msgId is: " +
                send.getMessageId());
    } catch (ONSClientException e) {
        e.printStackTrace();
        log.error(e.getMessage());
    }
}

}
`

  • 监听者

`
import cn.hutool.core.date.DateUtil;
import com.aliyun.openservices.ons.api.*;
import com.dapeng.cloud.service.live.application.period.PeriodManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import static cn.hutool.core.convert.Convert.bytesToLong;

@Component
@Slf4j
public class LiveMessageStreamListener {

@Autowired
private RocketMqConfig mqConfig;

private static Consumer consumer;
@Autowired
private PeriodManager periodManager;

@PostConstruct
public void init(){
    log.info("消费者启动!");
    consumer = ONSFactory.createConsumer(mqConfig.getProperties());
    //监听第一个topic,new对应的监听器
    consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), new MessageListener() {
        @Override
        public Action consume(Message message, ConsumeContext context) {

            log.info("消费者接收到MQ消息时间: "+ DateUtil.now()+" -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
                    message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), bytesToLong(message.getBody()));
            try {
                //调用场次接口
                long periodId = bytesToLong(message.getBody());
                log.info("获取到 场次 id =" +periodId);
                log.info("开始执行调用异常回调处理");
               // 调用自己的业务代码进行操作
                log.info("执行调用异常回调处理结果:"+b);
                //消费成功,继续消费下一条消息
                return Action.CommitMessage;
            } catch (Exception e) {
                log.error("消费MQ消息失败! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
                //消费失败,告知服务器稍后再投递这条消息,继续消费其他消息
                return Action.ReconsumeLater;
            }
        }
    });
    // 在发送消息前,必须调用start方法来启动consumer,只需调用一次即可,当项目关闭时,自动shutdown
    consumer.start();
}
/**
 * 初始化消费者
 * @return consumer
 */
public Consumer getConsumer(){
    return consumer;
}

}
`


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM