Windows下Rocketmq使用


1、下載rocketmq二進制包 rocketmq-all-4.3.2-bin-release.zip 並解壓到指定目錄

2、配置rocketmq環境變量

ROCKETMQ_HOME=rocketmq解壓目錄

3、啟動nameserver

在ROCKETMQ_HOME/bin目錄下雙擊運行mqnamesrv.cmd,出現如下信息表示啟動成功,保持命令窗口開啟(若窗口一閃而過,說明沒有配置環境變量,請先配置環境變量)

 

4、啟動broker

方法一:開啟另一個windows終端cmd,進入ROCKETMQ_HOME/bin目錄,先輸入set NAMESRV_ADDR=127.0.0.1:9876設置環境變量,輸入mqbroker回車啟動broker,保持mqbroker運行,不要關閉這個終端。 

方法二:開啟另一個windows終端cmd,進入ROCKETMQ_HOME/bin目錄,也可一步輸入mqbroker -n 127.0.0.1:9876啟動broker,保持mqbroker運行,不要關閉這個終端

5、編寫producer和consumer代碼

pom依賴

<dependencies>
      <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.2</version>
    </dependency>
  </dependencies>
  
  <build>
      <plugins>
          <plugin>  
            <artifactId>maven-compiler-plugin</artifactId>  
            <configuration>  
                <source>1.7</source>  
                <target>1.7</target>  
            </configuration>  
        </plugin>
    </plugins>
  </build>

RocketMQConsumer.java

import java.util.UUID;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 
public class RocketMQConsumer {
 
    private DefaultMQPushConsumer consumer;
 
    private MessageListener listener;
 
    protected String nameServer;
 
    protected String groupName;
 
    protected String topics;
 
    public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
        this.listener = listener;
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }
 
    public void init() {
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(nameServer);
        try {
            consumer.subscribe(topics, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener((MessageListenerConcurrently) this.listener);
 
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName()
        );
    }
}

RocketMQListener.java

import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
 
public class RocketMQListener  implements MessageListenerConcurrently {
 
 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//        System.out.println("get data from rocketMQ:" + msgs);
        for (MessageExt message : msgs) {
 
            String msg = new String(message.getBody());
            System.out.println("msg data from rocketMQ:" + msg);
        }
 
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

RocketMQProducer.java

import java.util.UUID;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
 
public class RocketMQProducer {
 
    private DefaultMQProducer sender;
 
    protected String nameServer;
 
    protected String groupName;
 
    protected String topics;
 
    public void init() {
        sender = new DefaultMQProducer(groupName);
        sender.setNamesrvAddr(nameServer);
        sender.setInstanceName(UUID.randomUUID().toString());
        try {
            sender.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
 
    public RocketMQProducer(String nameServer, String groupName, String topics) {
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }
 
    public void send(Message message) {
 
        message.setTopic(topics);
 
        try {
            SendResult result = sender.send(message);
            SendStatus status = result.getSendStatus();
            System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

RocketMQConsumerTest.java

public class RocketMQConsumerTest {
    public static void main(String[] args) {
         
        String mqNameServer = "127.0.0.1:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";
 
        String consumerMqGroupName = "CONSUMER-MQ-GROUP";
        RocketMQListener mqListener = new RocketMQListener();
        RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics);
        mqConsumer.init();
 
 
        try {
            Thread.sleep(1000 * 60L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
    }
}

RocketMQProducerTest.java

import org.apache.rocketmq.common.message.Message;

public class RocketMQProducerTest {
 
    public static void main(String[] args) {
 
        String mqNameServer = "127.0.0.1:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";
 
        String producerMqGroupName = "PRODUCER-MQ-GROUP";
        RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics);
        mqProducer.init();
 
 
        for (int i = 0; i < 5; i++) {
 
            Message message = new Message();
            message.setBody(("I send message to RocketMQ " + i).getBytes());
            mqProducer.send(message);
        }
    }
}

 

運行RocketMQConsumerTest,輸出如下

 

運行RocketMQProducerTest,輸出如下

 

2、Rocketmq事務消息

Half(Prepare) Message

指的是暫不能投遞的消息,發送方已經將消息成功發送到了 MQ 服務端,但是服務端未收到生產者對該消息的二次確認,此時該消息被標記成“暫不能投遞”狀態,處於該種狀態下的消息即半消息。

消息回查

由於網絡閃斷、生產者應用重啟等原因,導致某條事務消息的二次確認丟失,MQ 服務端通過掃描發現某條消息長期處於“半消息”時,需要主動向消息生產者詢問該消息的最終狀態(Commit 或是 Rollback),該過程即消息回查。

執行流程圖

1.發送方向 MQ 服務端發送消息。
2.MQ Server 將消息持久化成功之后,向發送方 ACK 確認消息已經發送成功,此時消息為半消息。
3.發送方開始執行本地事務邏輯。
4.發送方根據本地事務執行結果向 MQ Server 提交二次確認(Commit 或是 Rollback),MQ Server 收到 Commit 狀態則將半消息標記為可投遞,訂閱方最終將收到該消息;MQ      Server 收到 Rollback 狀態則刪除半消息,訂閱方將不會接受該消息。
5.在斷網或者是應用重啟的特殊情況下,上述步驟4提交的二次確認最終未到達 MQ Server,經過固定時間后 MQ Server 將對該消息發起消息回查。
6.發送方收到消息回查后,需要檢查對應消息的本地事務執行的最終結果。
7.發送方根據檢查得到的本地事務的最終狀態再次提交二次確認,MQ Server 仍按照步驟4對半消息進行操作。


事務消息發送對應步驟1、2、3、4,事務消息回查對應步驟5、6、7。

使用示例代碼


事務消息的生產者為TransactionMQProducer實例,我們需要編寫事務監聽器TransactionListener的實現類,並實現
executeLocalTransaction和checkLocalTransaction方法,並為事務消息的生產者綁定事務監聽器,發送消息時調用
sendMessageInTransaction方法,此時會向rocketmq服務器發送一條消息,

本地事務執行邏輯寫在executeLocalTransaction方法里,該方法返回本地事務執行的狀態,rocketmq服務器根據返回
的狀態值決定該事務消息如何處理:
1、如果return LocalTransactionState.COMMIT_MESSAGE ,rocketmq服務器則會將該事務消息標記為可投遞
2、如果return LocalTransactionState.ROLLBACK_MESSAGE ,rocketmq服務器則會將該事務消息丟棄
3、如果return LocalTransactionState.UNKNOW ,rocketmq服務器對該事務消息不做任何操作,該事務消息依然為半消息

事務監聽器   TransactionListenerImpl.java

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

//事務監聽器實現類
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    //執行本地事務
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println(sdf.format(new Date()) + "==============    執行本地事務    ============");
//        int value = transactionIndex.getAndIncrement();
//        int status = value % 3;
//        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.ROLLBACK_MESSAGE;
//        return LocalTransactionState.UNKNOW;
//        return LocalTransactionState.COMMIT_MESSAGE;
    }

    //檢查本地事務
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println(sdf.format(new Date()) + "==============    消息回查,檢查本地事務    ============");
        Integer status = localTrans.get(msg.getTransactionId());
        if(null != status) {
            switch(status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

TransactionProducer.java

import java.util.UUID;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

public class TransactionProducer {
    private static final String mqNameServer = "127.0.0.1:9876";
    private static final String mqTopics = "MQ-MSG-TOPICS-TEST";
    private static final String producerMqGroupName = "PRODUCER-MQ-GROUP";
    private static final TransactionListener transactionListener = new TransactionListenerImpl();
    
    public static void main(String[] args) throws MQClientException {
        //創建消息生產者實例
        TransactionMQProducer mqProducer = new TransactionMQProducer(producerMqGroupName);
        mqProducer.setNamesrvAddr(mqNameServer);
        mqProducer.setInstanceName(UUID.randomUUID().toString());
        mqProducer.setTransactionListener(transactionListener);
        //啟動生產者
        mqProducer.start();
        //發送消息
        Message msg = new Message(mqTopics, "Msg456".getBytes());
        SendResult result = mqProducer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", result);
    }
}

 

拉模式消息消費

PullConsumer.java

import java.util.List;
import java.util.Set;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

public class PullConsumer {
    private static final String consumerMqGroupName = "CONSUMER-MQ-GROUP";
    private static final String mqTopics = "MQ-MSG-TOPICS-TEST";
    private static final String mqNameServer = "127.0.0.1:9876";
    
    public static void main(String[] args) throws MQClientException {
        //創建拉模式的消息消費者
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerMqGroupName);
        consumer.setNamesrvAddr(mqNameServer);
        //啟動消息消費者
        consumer.start();
        //獲取指定topic下的所有消息隊列(發送消息到指定topic時消息會被隨機發送到不同的消息隊列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(mqTopics);
        for (MessageQueue mq : mqs) {
            if(mq.getQueueId() == 0) {
                System.out.printf("Consume from the queue: %s%n", mq);
                try {
                    /*
                     * pullBlockIfNotFound方法從指定消息隊列中拉取消息
                     * 第一個參數 : 指定消息隊列
                     * 第二個參數 :
                     * 第三個參數 : 指定從消息隊列中讀取消息的起始位置offset
                     * 第四個參數 : 指定從消息隊列中讀取消息的個數
                     */
                    PullResult result = consumer.pullBlockIfNotFound(mq, null, 0, 10);
                    //從消息拉取結果中獲取找到的消息集
                    List<MessageExt> msgFoundList = result.getMsgFoundList();
                    for(MessageExt msg : msgFoundList) {
                        System.out.println("收到消息 " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM