RocketMQ的生產者和消費者


生產者:

/**
                 * 生產者
                 */
                public class Provider {
                    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException, MQBrokerException {
                        //創建一個生產者
                        DefaultMQProducer producer=new DefaultMQProducer("rmq-group");
                        //設置NameServer地址
                        producer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
                        //設置生產者實例名稱
                        producer.setInstanceName("provider");
                        //啟動生產者
                        producer.start();
                        //發送消息
                        for (int i = 1; i <=10 ; i++) {
                            Thread.sleep(1000); //模擬網絡延遲
                            //創建消息  topic代表主題名稱     tags代表小分類     body代表消息體
                            Message message=new Message("weksoft_topic","TagA",("wdksoft-"+i).getBytes());
                            //發送消息
                            SendResult send = producer.send(message);
                            System.out.println(send.toString());
                        }
                    }
                }

消費者

/**
                 * 消費者:監聽消費
                 */
                public class Consumer {
                    public static void main(String[] args) throws MQClientException {
                        //創建消費者
                        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("rmq-group");
                        //設置NameServer地址
                        consumer.setNamesrvAddr("192.168.42.22:9876;192.168.42.33:9876");
                        //設置實例名稱
                        consumer.setInstanceName("consumer");
                        //訂閱Topic
                        consumer.subscribe("weksoft_topic","TagA");
                        //監聽消息
                        consumer.registerMessageListener(new MessageListenerConcurrently() {
                            @Override
                            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                                //獲取消息
                                for(MessageExt ext:msgs){
                                    //RocketMQ由於是集群環境,所以產生的消息ID可能會重復
                                    System.out.println(ext.getMsgId()+"----------"+new String(ext.getBody()));
                                }
                                //接受消息狀態 1.消費成功    2.消費失敗   隊列還有
                                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                            }
                        });
                        //啟動消費者
                        consumer.start();
                    }
                }

生產者生產消息

 

 

消費者消費消息

 

 控制台多了入隊和出隊的記錄

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM