一、添加依賴
<!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.6.0</version> </dependency>
2個核心接口,3個默認實現。
interface MQProducer
--- DefaultMQProducer
interface MQConsumer
--- DefaultMQPushConsumer
--- DefaultMQPullConsumer
DefaultMQProducer是MQProducer的唯一默認實現,其實現 MQProducer 接口的時候 還繼承了 ClientConfig類 (客戶端配置類),可以配置如 sendMsgTimeout超時時間,producerGroup 生產者組 最大消息容量和是否啟用壓縮等。
關鍵方法是 send(Message) 發送一個消息到MQ。
DefaultMQPushConsumer 包含很多可以配置的信息,最主要的有:
messageModel 消息模型 支持以下兩種 1、集群消費 2、廣播消費
messageListener 消息監聽器
consumeThreadMin 消費線程池數量 默認10
consumeThreadMax 消費線程池數量 默認20
Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立刻回調 Listener 接口方法, MessageListenerOrderly有序的,MessageListenerConcurrently 無序的。
關鍵方法有registerMessageListener 注冊監聽器等。
二、生產者
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; /** * 生產者 */ public class RocketProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { //設置生產者組名 DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A"); //指定nameServer的地址, 多個地址用分號分隔 producer.setNamesrvAddr("localhost:9876"); //啟動實例 producer.start(); Message message = new Message("topic-name-A","tag-name-A","Message : My blog address".getBytes()); producer.send(message); System.out.println("Message sent"); //關閉生產者,釋放資源 producer.shutdown(); } }
三、消費者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 消費者 */ public class RocketConsumer { public static void main(String[] args) throws MQClientException{ //設置消費者組名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group-name-A"); //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //NameServer地址, 多個地址用分號(;)分隔 consumer.setNamesrvAddr("localhost:9876"); //參數1:topic名字 參數2:tag名字 consumer.subscribe("topic-name-A", "tag-name-A"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //啟動,會一直監聽消息 consumer.start(); System.out.println("Consumer Started!"); //consumer.shutdown(); } }