RocketMQ學習筆記(6)----RocketMQ的Client的使用 Producer/Consumer


1.  添加依賴

  pom.xml如下:

 <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.3.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-srvutil</artifactId>
      <version>4.3.1</version>
    </dependency>
    <dependency>
      <groupId>ch.qos.logback</groupId>
      <artifactId>logback-classic</artifactId>
      <version>1.2.3</version>
    </dependency>
    <dependency>
      <groupId>org.javassist</groupId>
      <artifactId>javassist</artifactId>
        <version>3.23.1-GA</version>
    </dependency>
    <dependency>
      <groupId>io.openmessaging</groupId>
      <artifactId>openmessaging-api</artifactId>
      <version>0.3.0-alpha</version>
    </dependency>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-openmessaging</artifactId>
      <version>4.3.1</version>
    </dependency>

2. Producer 的開發步驟

  1. 實例化Producer Group,如下:

  DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");

  2. 設置namesrvAddr,集群環境多個nameserver用;分割,如下:

producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

  3. 調用start()方法啟動:

 producer.start();

  4. 發送消息

 for (int i = 0; i < 10; i++) {
            //構建實例,第一個參數為topic,第二個參數為tabs,第三個參數為消息體
            Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult result = producer.send(message);
            System.out.println(result);
        }

  5. 關閉生產者(根據自己需求確定是夠需要關閉)

 producer.shutdown();

  完整示例如下:

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

/**
 * 創建一個消費者
 */
public class Producer {

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException, UnsupportedEncodingException {
        //1. 實例化一個producer group
        DefaultMQProducer producer = new DefaultMQProducer("my-producer-group");

        //2. 設置namesrvAddr,集群環境多個nameserver用;分割

        producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");
        //3. 啟動
        producer.start();
        // 4. 發送消息
        for (int i = 0; i < 10; i++) {
            //構建實例,第一個參數為topic,第二個參數為tabs,第三個參數為消息體
            Message message = new Message("MyQuickStartTopic","tabA",("Hello World:" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult result = producer.send(message);
            System.out.println(result);
        }
        //關閉生產者
        producer.shutdown();

    }
}

  使用方式可以說非常簡單了。

3. Consumer開發步驟

  1. 實例化Consumer Group,如下:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");

  2. 設置namesrvAddr,集群環境多個nameserver用;分割,如下:

producer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

  3. 設置從什么位置開始都

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

  4. 訂閱topic.

consumer.subscribe("MyQuickStartTopic", "*");

  5. 注冊消息監聽器

 consumer.registerMessageListener();

  6. 重寫MessageListenerConcurrently接口的consumeMessage()方法

  完整代碼如下:

package com.wangx.rocketmq.quickstart;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 創建一個消費者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //實例化一個consumer組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group");
        //設置setNamesrvAddr,同生產者
        consumer.setNamesrvAddr("47.105.145.123:9876;47.105.149.61:9876");

        //設置消息讀取方式,這里設置的是隊尾開始讀取
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        //設置訂閱主題,第二個參數為過濾tabs的條件,可以寫為tabA|tabB過濾Tab,*表示接受所有
        consumer.subscribe("MyQuickStartTopic", "*");

        //注冊消息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    //的到MessageExt
                    MessageExt messageExt = list.get(0);
                    String topic = messageExt.getTopic();
                    String message = new String(messageExt.getBody(),"UTF-8");
                    int queueId = messageExt.getQueueId();
                    System.out.println("收到來自topic:" + topic + ", queueId:" + queueId + "的消息:" + message);

                } catch (Exception e) {
                    //失敗,請求稍后重發
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                //成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();

    }
}

  consumeMessage返回一個枚舉的兩種狀態,成功表示接受成功,否則返回稍后重發的狀態。這里注意,啟動的時候需要consumer先啟動,因為它需要在生產者之前先訂閱,否則將會收不到生產在consumer生產的消息,造成消息丟失。

  啟動consumer,在啟動producer

  producer控制台

  consumer控制台:

  rocketmq-console信息:

可以看到,我們前面部署的集群環境也是能夠實現消息的負載均衡的,會使兩個broker上都創建topic,且都能夠接收生產者生產的消息。

  進入topic,可以看到新增了兩個我們自定義的topic

可能會出現的問題:

  RemotingTooMuchRequestException: sendDefaultImpl call timeout

  在客戶端運行Producer時,可能會出現如上異常,這是因為從 Windows 上開發連接 虛擬機中的 nameServer 時要經過 Linux 系統的防火牆,而防火牆一般都會有超時的機制,在網絡連接長時間不傳輸數據時,會關閉這個 TCP 的會話,關閉后再讀寫,就有可能導致這個異常。

  解決辦法就是關閉防火牆,ubuntu下命令如下:

  

  contOS下命令如下:

  systemctl stop firewalld.service #停止firewall
  systemctl disable firewalld.service #禁止firewall開機啟動
  firewall-cmd --state #查看默認防火牆狀態(關閉后顯示notrunning,開啟后顯示running)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM