Windows下Rocketmq使用


1、下载rocketmq二进制包 rocketmq-all-4.3.2-bin-release.zip 并解压到指定目录

2、配置rocketmq环境变量

ROCKETMQ_HOME=rocketmq解压目录

3、启动nameserver

在ROCKETMQ_HOME/bin目录下双击运行mqnamesrv.cmd,出现如下信息表示启动成功,保持命令窗口开启(若窗口一闪而过,说明没有配置环境变量,请先配置环境变量)

 

4、启动broker

方法一:开启另一个windows终端cmd,进入ROCKETMQ_HOME/bin目录,先输入set NAMESRV_ADDR=127.0.0.1:9876设置环境变量,输入mqbroker回车启动broker,保持mqbroker运行,不要关闭这个终端。 

方法二:开启另一个windows终端cmd,进入ROCKETMQ_HOME/bin目录,也可一步输入mqbroker -n 127.0.0.1:9876启动broker,保持mqbroker运行,不要关闭这个终端

5、编写producer和consumer代码

pom依赖

<dependencies>
      <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.2</version>
    </dependency>
  </dependencies>
  
  <build>
      <plugins>
          <plugin>  
            <artifactId>maven-compiler-plugin</artifactId>  
            <configuration>  
                <source>1.7</source>  
                <target>1.7</target>  
            </configuration>  
        </plugin>
    </plugins>
  </build>

RocketMQConsumer.java

import java.util.UUID;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 
public class RocketMQConsumer {
 
    private DefaultMQPushConsumer consumer;
 
    private MessageListener listener;
 
    protected String nameServer;
 
    protected String groupName;
 
    protected String topics;
 
    public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) {
        this.listener = listener;
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }
 
    public void init() {
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(nameServer);
        try {
            consumer.subscribe(topics, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.registerMessageListener((MessageListenerConcurrently) this.listener);
 
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName()
        );
    }
}

RocketMQListener.java

import java.util.List;

import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
 
public class RocketMQListener  implements MessageListenerConcurrently {
 
 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
//        System.out.println("get data from rocketMQ:" + msgs);
        for (MessageExt message : msgs) {
 
            String msg = new String(message.getBody());
            System.out.println("msg data from rocketMQ:" + msg);
        }
 
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

RocketMQProducer.java

import java.util.UUID;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
 
public class RocketMQProducer {
 
    private DefaultMQProducer sender;
 
    protected String nameServer;
 
    protected String groupName;
 
    protected String topics;
 
    public void init() {
        sender = new DefaultMQProducer(groupName);
        sender.setNamesrvAddr(nameServer);
        sender.setInstanceName(UUID.randomUUID().toString());
        try {
            sender.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }
 
    public RocketMQProducer(String nameServer, String groupName, String topics) {
        this.nameServer = nameServer;
        this.groupName = groupName;
        this.topics = topics;
    }
 
    public void send(Message message) {
 
        message.setTopic(topics);
 
        try {
            SendResult result = sender.send(message);
            SendStatus status = result.getSendStatus();
            System.out.println("messageId=" + result.getMsgId() + ", status=" + status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

RocketMQConsumerTest.java

public class RocketMQConsumerTest {
    public static void main(String[] args) {
         
        String mqNameServer = "127.0.0.1:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";
 
        String consumerMqGroupName = "CONSUMER-MQ-GROUP";
        RocketMQListener mqListener = new RocketMQListener();
        RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics);
        mqConsumer.init();
 
 
        try {
            Thread.sleep(1000 * 60L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
 
    }
}

RocketMQProducerTest.java

import org.apache.rocketmq.common.message.Message;

public class RocketMQProducerTest {
 
    public static void main(String[] args) {
 
        String mqNameServer = "127.0.0.1:9876";
        String mqTopics = "MQ-MSG-TOPICS-TEST";
 
        String producerMqGroupName = "PRODUCER-MQ-GROUP";
        RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics);
        mqProducer.init();
 
 
        for (int i = 0; i < 5; i++) {
 
            Message message = new Message();
            message.setBody(("I send message to RocketMQ " + i).getBytes());
            mqProducer.send(message);
        }
    }
}

 

运行RocketMQConsumerTest,输出如下

 

运行RocketMQProducerTest,输出如下

 

2、Rocketmq事务消息

Half(Prepare) Message

指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。

消息回查

由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。

执行流程图

1.发送方向 MQ 服务端发送消息。
2.MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
3.发送方开始执行本地事务逻辑。
4.发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ      Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
5.在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
6.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
7.发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。


事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。

使用示例代码


事务消息的生产者为TransactionMQProducer实例,我们需要编写事务监听器TransactionListener的实现类,并实现
executeLocalTransaction和checkLocalTransaction方法,并为事务消息的生产者绑定事务监听器,发送消息时调用
sendMessageInTransaction方法,此时会向rocketmq服务器发送一条消息,

本地事务执行逻辑写在executeLocalTransaction方法里,该方法返回本地事务执行的状态,rocketmq服务器根据返回
的状态值决定该事务消息如何处理:
1、如果return LocalTransactionState.COMMIT_MESSAGE ,rocketmq服务器则会将该事务消息标记为可投递
2、如果return LocalTransactionState.ROLLBACK_MESSAGE ,rocketmq服务器则会将该事务消息丢弃
3、如果return LocalTransactionState.UNKNOW ,rocketmq服务器对该事务消息不做任何操作,该事务消息依然为半消息

事务监听器   TransactionListenerImpl.java

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

//事务监听器实现类
public class TransactionListenerImpl implements TransactionListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    //执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        System.out.println(sdf.format(new Date()) + "==============    执行本地事务    ============");
//        int value = transactionIndex.getAndIncrement();
//        int status = value % 3;
//        localTrans.put(msg.getTransactionId(), status);
        return LocalTransactionState.ROLLBACK_MESSAGE;
//        return LocalTransactionState.UNKNOW;
//        return LocalTransactionState.COMMIT_MESSAGE;
    }

    //检查本地事务
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        System.out.println(sdf.format(new Date()) + "==============    消息回查,检查本地事务    ============");
        Integer status = localTrans.get(msg.getTransactionId());
        if(null != status) {
            switch(status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                default:
                    return LocalTransactionState.COMMIT_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }

}

TransactionProducer.java

import java.util.UUID;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

public class TransactionProducer {
    private static final String mqNameServer = "127.0.0.1:9876";
    private static final String mqTopics = "MQ-MSG-TOPICS-TEST";
    private static final String producerMqGroupName = "PRODUCER-MQ-GROUP";
    private static final TransactionListener transactionListener = new TransactionListenerImpl();
    
    public static void main(String[] args) throws MQClientException {
        //创建消息生产者实例
        TransactionMQProducer mqProducer = new TransactionMQProducer(producerMqGroupName);
        mqProducer.setNamesrvAddr(mqNameServer);
        mqProducer.setInstanceName(UUID.randomUUID().toString());
        mqProducer.setTransactionListener(transactionListener);
        //启动生产者
        mqProducer.start();
        //发送消息
        Message msg = new Message(mqTopics, "Msg456".getBytes());
        SendResult result = mqProducer.sendMessageInTransaction(msg, null);
        System.out.printf("%s%n", result);
    }
}

 

拉模式消息消费

PullConsumer.java

import java.util.List;
import java.util.Set;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

public class PullConsumer {
    private static final String consumerMqGroupName = "CONSUMER-MQ-GROUP";
    private static final String mqTopics = "MQ-MSG-TOPICS-TEST";
    private static final String mqNameServer = "127.0.0.1:9876";
    
    public static void main(String[] args) throws MQClientException {
        //创建拉模式的消息消费者
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerMqGroupName);
        consumer.setNamesrvAddr(mqNameServer);
        //启动消息消费者
        consumer.start();
        //获取指定topic下的所有消息队列(发送消息到指定topic时消息会被随机发送到不同的消息队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(mqTopics);
        for (MessageQueue mq : mqs) {
            if(mq.getQueueId() == 0) {
                System.out.printf("Consume from the queue: %s%n", mq);
                try {
                    /*
                     * pullBlockIfNotFound方法从指定消息队列中拉取消息
                     * 第一个参数 : 指定消息队列
                     * 第二个参数 :
                     * 第三个参数 : 指定从消息队列中读取消息的起始位置offset
                     * 第四个参数 : 指定从消息队列中读取消息的个数
                     */
                    PullResult result = consumer.pullBlockIfNotFound(mq, null, 0, 10);
                    //从消息拉取结果中获取找到的消息集
                    List<MessageExt> msgFoundList = result.getMsgFoundList();
                    for(MessageExt msg : msgFoundList) {
                        System.out.println("收到消息 " + new String(msg.getBody()));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } 
            }
        }
    }
}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM