rocketMq消息的發送和消息消費


rocketMq消息的發送和消息消費

###一.消息推送 ```java public void pushMessage() { String message = "推送消息內容!"; try { DefaultMQProducer producer = new DefaultMQProducer(producerGroup); // 設置NameServer地址 producer.setNamesrvAddr("服務器地址+端口號"); producer.setInstanceName("producer"); // 只需要在發送前初始化一次 producer.start(); // 構建消息實體
        Message msg = new Message(topic,// topic
                tag,// tag
                message.getBytes()// body
        );
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);
        producer.shutdown();
    } catch (Exception ex) {
        ex.printStackTrace();
    }

}


###二.消息消費
```java
    @Autowired
    private MessageReceiveService messageReceiveService;
	//====好差評的服務器地址和端口=====
    @Value("${app.message.address}")
    private String address;
	//====好差評的topic=====
    @Value("${app.message.topic}")
    private String topic;
	//====好差評的組名=====
    @Value("${app.message.groupName}")
    private String consumerGroup;

    /**
     * 開始消費rocketMQ消息
     */
    @PostConstruct
    public void init() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
            consumer.setNamesrvAddr(address);
            consumer.subscribe(topic, "*");
            consumer.registerMessageListener(messageReceiveService);
            consumer.start();
            logger.info("rocketMQ consumer start");
        } catch (Exception e) {
            logger.error("reocketMQ consumer start error!", e);
            e.printStackTrace();
        }
    }
 @Service
public class MessageReceiveService implements MessageListenerConcurrently {

    private static Logger logger = LoggerFactory.getLogger(MessageReceiveService.class);

    @Value("${accept_system_interface}")
    private String acceptSystemInterface;

    /**
     * 消費rocketMQ上的消息
     *
     * @param msgs    rocketMQ消息
     * @param context 消息消費上下文
     * @return 消息處理狀態
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        // 判斷消息類型
        return handleHcpMessage(msgs, context);
    }

    /**
     * <p>好差評消息消費</p>
     *
     * @param msgs    當前消息(組)
     * @param context 消息消費上下文
     */
    @Transactional(rollbackFor = {RuntimeException.class})
    private ConsumeConcurrentlyStatus handleHcpMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            // 消息校驗與序列化
            String message = null;
            try {
			//獲得消息的內容,轉utf-8防止出現亂碼
                message = new String(msg.getBody(),"utf-8");
            }catch (Exception e){
                e.printStackTrace();
                errorLogSave(message,"當前消息轉化utf-8出現異常信息");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
			//對消息進行對應的操作
			...
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM