rocketMq消息的發送和消息消費
###一.消息推送 ```java public void pushMessage() { String message = "推送消息內容!"; try { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設置NameServer地址 producer.setNamesrvAddr("服務器地址+端口號"); producer.setInstanceName("producer"); // 只需要在發送前初始化一次 producer.start(); // 構建消息實體
Message msg = new Message(topic,// topic
tag,// tag
message.getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
} catch (Exception ex) {
ex.printStackTrace();
}
}
###二.消息消費
```java
@Autowired
private MessageReceiveService messageReceiveService;
//====好差評的服務器地址和端口=====
@Value("${app.message.address}")
private String address;
//====好差評的topic=====
@Value("${app.message.topic}")
private String topic;
//====好差評的組名=====
@Value("${app.message.groupName}")
private String consumerGroup;
/**
* 開始消費rocketMQ消息
*/
@PostConstruct
public void init() {
try {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(address);
consumer.subscribe(topic, "*");
consumer.registerMessageListener(messageReceiveService);
consumer.start();
logger.info("rocketMQ consumer start");
} catch (Exception e) {
logger.error("reocketMQ consumer start error!", e);
e.printStackTrace();
}
}
@Service
public class MessageReceiveService implements MessageListenerConcurrently {
private static Logger logger = LoggerFactory.getLogger(MessageReceiveService.class);
@Value("${accept_system_interface}")
private String acceptSystemInterface;
/**
* 消費rocketMQ上的消息
*
* @param msgs rocketMQ消息
* @param context 消息消費上下文
* @return 消息處理狀態
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 判斷消息類型
return handleHcpMessage(msgs, context);
}
/**
* <p>好差評消息消費</p>
*
* @param msgs 當前消息(組)
* @param context 消息消費上下文
*/
@Transactional(rollbackFor = {RuntimeException.class})
private ConsumeConcurrentlyStatus handleHcpMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 消息校驗與序列化
String message = null;
try {
//獲得消息的內容,轉utf-8防止出現亂碼
message = new String(msg.getBody(),"utf-8");
}catch (Exception e){
e.printStackTrace();
errorLogSave(message,"當前消息轉化utf-8出現異常信息");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//對消息進行對應的操作
...
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}