初始化消費者和生產者
- 生產者 設置rocketmq的accesskey 和secretkey 以及rocketmq的 binder server。
首先 編輯一個配置類,將關於配置rocketmq的東西寫在配置類中
`
@Component
@Getter
@Setter
@Slf4j
public class RocketMqConfig {
@Value("${spring.cloud.stream.rocketmq.binder.secret-key}")
private String secretKey;
@Value("${spring.cloud.stream.rocketmq.binder.access-key}")
private String accessKey;
@Value("${spring.cloud.stream.rocketmq.binder.name-server}")
private String nameServe;
private static final String TOPIC = "delay";
private static final String GROUP_ID = "GID_live_service_update_status";
private static final String TAG = "mq_delay_tag";
private Properties properties = null;
public String getTopic() {
return TOPIC;
}
public String getTag() {
return TAG;
}
public String getGroupId() {
return GROUP_ID;
}
public Properties getProperties() {
log.info("accessKey:" + getAccessKey());
log.info("secretKey:" + getSecretKey());
log.info("naemServer:" + getNameServe());
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.GROUP_ID, GROUP_ID);
properties.setProperty(PropertyKeyConst.AccessKey, getAccessKey());
properties.setProperty(PropertyKeyConst.SecretKey, getSecretKey());
properties.put(PropertyKeyConst.NAMESRV_ADDR, getNameServe());
return properties;
}
}
- 生產者初始化
`
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
-
@author
-
@Date 2021/11/3.
*/
@Component
@Slf4j
public class RocketMqProducerInit {@Autowired
private RocketMqConfig mqConfig;private static Producer producer;
@PostConstruct
public void init(){
log.info("啟動RocketMq生產者!");
producer = ONSFactory.createProducer(mqConfig.getProperties());
// 在發送消息前,初始化調用start方法來啟動Producer,只需調用一次即可,當項目關閉時,自動shutdown
producer.start();
}
/**- 初始化生產者
- @return
*/
public Producer getProducer(){
return producer;
}
}
`
`
- 編寫生產者
`
import cn.hutool.core.date.DateUtil;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static cn.hutool.core.convert.Convert.longToBytes;
@Component
@Slf4j
public class ProducerSendMessageHandler {
@Autowired
private RocketMqConfig config;
@Autowired
private RocketMqProducerInit producer;
public void sendGetLiveStatus(long periodId) {
Message message = new Message(config.getTopic(), config.getTag(), longToBytes(periodId));
message.setStartDeliverTime(System.currentTimeMillis() + 100000);
try {
SendResult send = this.producer.getProducer().send(message);
log.info("發送生產消息時間:"+DateUtil.now() + "發送延時消息成功! Topic is:" + config.getTopic() + "msgId is: " +
send.getMessageId());
} catch (ONSClientException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
`
- 監聽者
`
import cn.hutool.core.date.DateUtil;
import com.aliyun.openservices.ons.api.*;
import com.dapeng.cloud.service.live.application.period.PeriodManager;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static cn.hutool.core.convert.Convert.bytesToLong;
@Component
@Slf4j
public class LiveMessageStreamListener {
@Autowired
private RocketMqConfig mqConfig;
private static Consumer consumer;
@Autowired
private PeriodManager periodManager;
@PostConstruct
public void init(){
log.info("消費者啟動!");
consumer = ONSFactory.createConsumer(mqConfig.getProperties());
//監聽第一個topic,new對應的監聽器
consumer.subscribe(mqConfig.getTopic(), mqConfig.getTag(), new MessageListener() {
@Override
public Action consume(Message message, ConsumeContext context) {
log.info("消費者接收到MQ消息時間: "+ DateUtil.now()+" -- Topic:{}, tag:{},msgId:{} , Key:{}, body:{}",
message.getTopic(), message.getTag(), message.getMsgID(), message.getKey(), bytesToLong(message.getBody()));
try {
//調用場次接口
long periodId = bytesToLong(message.getBody());
log.info("獲取到 場次 id =" +periodId);
log.info("開始執行調用異常回調處理");
// 調用自己的業務代碼進行操作
log.info("執行調用異常回調處理結果:"+b);
//消費成功,繼續消費下一條消息
return Action.CommitMessage;
} catch (Exception e) {
log.error("消費MQ消息失敗! msgId:" + message.getMsgID() + "----ExceptionMsg:" + e.getMessage());
//消費失敗,告知服務器稍后再投遞這條消息,繼續消費其他消息
return Action.ReconsumeLater;
}
}
});
// 在發送消息前,必須調用start方法來啟動consumer,只需調用一次即可,當項目關閉時,自動shutdown
consumer.start();
}
/**
* 初始化消費者
* @return consumer
*/
public Consumer getConsumer(){
return consumer;
}
}
`