1、導入依賴
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency> <!-- rocketmq dependencies --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
2、配置生產者
package com.example.demo.mq; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class MyProducer { /** * 生產組,生產者必須在生產組內 */ private String producerGroup = "newTest_group"; /** * nameSrv IP及端口 */ private String nameSrv = "****:9876"; /** * 生產者 */ private DefaultMQProducer producer; /** * 設置producer對應的組及namesrv信息,並啟動producer */ public MyProducer(){ producer = new DefaultMQProducer(producerGroup); producer.setNamesrvAddr(nameSrv); start(); } /** * 獲取producer * @return */ public DefaultMQProducer getProducer(){ return producer; } /** * 對象在使用之前必須調用一次,並且只能初始化一次 */ public void start(){ try{ this.producer.start(); }catch (MQClientException e){ e.printStackTrace(); } } /** * 一般在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown(){ producer.shutdown();; } }
3、發送消息Controller
package com.example.demo.controller; import com.example.demo.mq.MyProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; @RestController public class MQController { @Autowired private MyProducer producer; /** * topic */ private static final String topic = "my_topic_test01"; @GetMapping(value = "/mq/test1") public Object test1(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { /** * message構造函數,三個參數分別為 主題、二級分類、消息內容(需要轉換為字節數組) */ Message message = new Message(topic,"lcl",("hello RocketMQ"+text).getBytes()); SendResult result = producer.getProducer().send(message); System.out.println(result); return new HashMap<>(); } }
4、客戶端
package com.example2.demo2.mq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.springframework.stereotype.Component; @Component public class MyConsumer { /** * 生產組,生產者必須在生產組內 */ private String producerGroup = "newTest_group"; /** * nameSrv IP及端口 */ private String nameSrv = "118.190.215.76:9876"; /** * topic */ private static final String topic = "my_topic_test01"; private DefaultMQPushConsumer consumer; public MyConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(producerGroup); consumer.setNamesrvAddr(nameSrv); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(topic,"*"); consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context)->{ try{ Message message = msgs.get(0); System.out.println("%s Receive New Messages: %s %n" + Thread.currentThread().getName() + new String(msgs.get(0).getBody())); String topic = message.getTopic(); String body = new String(message.getBody(),"utf-8"); String tags = message.getTags(); String keys = message.getKeys(); System.out.println("topic="+ topic +",tags="+tags+",keys="+keys+",body="+body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch(Exception e){ e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); } }
5、測試
注意:一定要創建topic,創建topic可以通過控制台創建、可以通過linux命令進行創建,也可以通過程序自動創建,無論以哪種方式創建,都需要創建topic,否則會報錯。