maven引用
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.0</version> </dependency> <!--一個好用的工具包,可以不引入--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.3.0</version> </dependency>
說明
不管是生產者還是消費者,都有很多參數可以配置,rocketmq命名比較好,基本可以從參數名上判斷具體作用,還有注釋可以看。
下面例子中只給出了常用的一些參數設置。更多參數可自行探索。
簡單生產者實現
注意:
1、NamesrvAddr參數在多個節點時,用英文分號分隔,例: 192.168.9.58:9876;192.168.9.59:9876
import cn.hutool.core.util.RandomUtil; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); //發送超時時間,默認3000 單位ms producer.setSendMsgTimeout(5000); producer.start(); try { Message msg = new Message("TestTopic",// topic "177", // tag 可以為空,用以簡單的篩選。 RandomUtil.randomString(8), // key 可以為空,可用以查詢。 ("test" + RandomUtil.randomString(8)).getBytes()); // body ,我常將對象轉json再獲取byte[] 進行傳輸。 SendResult send = producer.send(msg); if (send.getSendStatus().equals(SendStatus.SEND_OK)) { //發送成功處理 }else { //發送失敗處理 } } catch (Exception e) { //發送失敗處理 e.printStackTrace(); } //正式環境不要發完就就shutdown,要在應用退出時再shutdown。 producer.shutdown(); } }
多線程加批量生產者模擬實現
注意:
1、批量發送時,topic必須為同一個,否則發送會報異常。
2、批量發送相較於單條發送速度提升很大。
import cn.hutool.core.util.RandomUtil; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; public class Producer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("127.0.0.1:9876"); //發送超時時間,默認3000 單位ms producer.setSendMsgTimeout(5000); producer.start(); int threadCount = 20; int forCount = 100000; CountDownLatch latch = new CountDownLatch(threadCount); long start = System.currentTimeMillis(); for (int i = 0; i < threadCount; i++) { new Thread(() -> { try { List<Message> list = new ArrayList<>(); for (int j = 0; j < forCount; j++) { try { Message msg = new Message("TestTopic",// topic "177", // tag RandomUtil.randomString(8), // key ("test" + RandomUtil.randomString(8)).getBytes()); // body list.add(msg); if (list.size() >= 100) { SendResult send = producer.send(list); if (send.getSendStatus().equals(SendStatus.SEND_OK)) { //發送成功處理 list.clear(); }else { //發送失敗處理 } } } catch (Exception e) { //發送失敗處理 e.printStackTrace(); } } if (list.size() > 0) { SendResult send = producer.send(list); if (!send.getSendStatus().equals(SendStatus.SEND_OK)) { System.out.println(send); } list.clear(); } } catch (Exception e) { e.printStackTrace(); } finally { latch.countDown(); } }).start(); } latch.await(); long hs = System.currentTimeMillis() - start; System.out.println(hs); long speed = (threadCount * forCount) / (hs >= 0 ? 1 : hs / 1000); System.out.println("速度" + speed); //正式環境不要發完就就shutdown,要在應用退出時再shutdown。 producer.shutdown(); } }
push消費者
import cn.hutool.core.lang.Console; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; public class PushConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("PushConsumerGroupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); //一個GroupName第一次消費時的位置 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(20); //要消費的topic,可使用tag進行簡單過濾 consumer.subscribe("TestTopic", "*"); //一次最大消費的條數 consumer.setConsumeMessageBatchMaxSize(100); //消費模式,廣播或者集群,默認集群。 consumer.setMessageModel(MessageModel.CLUSTERING); //在同一jvm中 需要啟動兩個同一GroupName的情況需要這個參數不一樣。 consumer.setInstanceName("InstanceName"); //配置消息監聽 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { try { //業務處理 msgs.forEach(msg -> { Console.log(msg); }); } catch (Exception e) { System.err.println("接收異常" + e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.println("Consumer Started."); } }
pull消費者
從4.6之后,提供了DefaultLitePullConsumer 大大簡化了pull的操作。以下為新實現,4.6之前的版本不支持。
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.lang.Console; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class PullConsumer { private static boolean runFlag = true; public static void main(String[] args) throws Exception { DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("PullConsumerGroupName"); consumer.setNamesrvAddr("127.0.0.1:9876"); //要消費的topic,可使用tag進行簡單過濾 consumer.subscribe("TestTopic", "*"); //一次最大消費的條數 consumer.setPullBatchSize(100); //無消息時,最大阻塞時間。默認5000 單位ms consumer.setPollTimeoutMillis(5000); consumer.start(); while (runFlag){ try { //拉取消息,無消息時會阻塞 List<MessageExt> msgs = consumer.poll(); if (CollUtil.isEmpty(msgs)){ continue; } //業務處理 msgs.forEach(msg-> Console.log(new String(msg.getBody()))); //同步消費位置。不執行該方法,應用重啟會存在重復消費。 consumer.commitSync(); }catch (Exception e){ e.printStackTrace(); } } consumer.shutdown(); } }