1、下载rocketmq二进制包 rocketmq-all-4.3.2-bin-release.zip 并解压到指定目录
2、配置rocketmq环境变量
ROCKETMQ_HOME=rocketmq解压目录
3、启动nameserver
在ROCKETMQ_HOME/bin目录下双击运行mqnamesrv.cmd,出现如下信息表示启动成功,保持命令窗口开启(若窗口一闪而过,说明没有配置环境变量,请先配置环境变量)
4、启动broker
方法一:开启另一个windows终端cmd,进入ROCKETMQ_HOME/bin目录,先输入set NAMESRV_ADDR=127.0.0.1:9876
设置环境变量,输入mqbroker回车
启动broker,保持mqbroker运行,不要关闭这个终端。
方法二:开启另一个windows终端cmd,进入ROCKETMQ_HOME/bin目录,也可一步输入mqbroker -n 127.0.0.1:9876
启动broker,保持mqbroker运行,不要关闭这个终端
5、编写producer和consumer代码
pom依赖
<dependencies> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.2</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build>
RocketMQConsumer.java
import java.util.UUID; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListener; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; public class RocketMQConsumer { private DefaultMQPushConsumer consumer; private MessageListener listener; protected String nameServer; protected String groupName; protected String topics; public RocketMQConsumer(MessageListener listener, String nameServer, String groupName, String topics) { this.listener = listener; this.nameServer = nameServer; this.groupName = groupName; this.topics = topics; } public void init() { consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(nameServer); try { consumer.subscribe(topics, "*"); } catch (MQClientException e) { e.printStackTrace(); } consumer.setInstanceName(UUID.randomUUID().toString()); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.registerMessageListener((MessageListenerConcurrently) this.listener); try { consumer.start(); } catch (MQClientException e) { e.printStackTrace(); } System.out.println("RocketMQConsumer Started! group=" + consumer.getConsumerGroup() + " instance=" + consumer.getInstanceName() ); } }
RocketMQListener.java
import java.util.List; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; public class RocketMQListener implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // System.out.println("get data from rocketMQ:" + msgs); for (MessageExt message : msgs) { String msg = new String(message.getBody()); System.out.println("msg data from rocketMQ:" + msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
RocketMQProducer.java
import java.util.UUID; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.message.Message; public class RocketMQProducer { private DefaultMQProducer sender; protected String nameServer; protected String groupName; protected String topics; public void init() { sender = new DefaultMQProducer(groupName); sender.setNamesrvAddr(nameServer); sender.setInstanceName(UUID.randomUUID().toString()); try { sender.start(); } catch (MQClientException e) { e.printStackTrace(); } } public RocketMQProducer(String nameServer, String groupName, String topics) { this.nameServer = nameServer; this.groupName = groupName; this.topics = topics; } public void send(Message message) { message.setTopic(topics); try { SendResult result = sender.send(message); SendStatus status = result.getSendStatus(); System.out.println("messageId=" + result.getMsgId() + ", status=" + status); } catch (Exception e) { e.printStackTrace(); } } }
RocketMQConsumerTest.java
public class RocketMQConsumerTest { public static void main(String[] args) { String mqNameServer = "127.0.0.1:9876"; String mqTopics = "MQ-MSG-TOPICS-TEST"; String consumerMqGroupName = "CONSUMER-MQ-GROUP"; RocketMQListener mqListener = new RocketMQListener(); RocketMQConsumer mqConsumer = new RocketMQConsumer(mqListener, mqNameServer, consumerMqGroupName, mqTopics); mqConsumer.init(); try { Thread.sleep(1000 * 60L); } catch (InterruptedException e) { e.printStackTrace(); } } }
RocketMQProducerTest.java
import org.apache.rocketmq.common.message.Message; public class RocketMQProducerTest { public static void main(String[] args) { String mqNameServer = "127.0.0.1:9876"; String mqTopics = "MQ-MSG-TOPICS-TEST"; String producerMqGroupName = "PRODUCER-MQ-GROUP"; RocketMQProducer mqProducer = new RocketMQProducer(mqNameServer, producerMqGroupName, mqTopics); mqProducer.init(); for (int i = 0; i < 5; i++) { Message message = new Message(); message.setBody(("I send message to RocketMQ " + i).getBytes()); mqProducer.send(message); } } }
运行RocketMQConsumerTest,输出如下
运行RocketMQProducerTest,输出如下
2、Rocketmq事务消息
Half(Prepare) Message
指的是暂不能投递的消息,发送方已经将消息成功发送到了 MQ 服务端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息。
消息回查
由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,MQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
执行流程图
1.发送方向 MQ 服务端发送消息。
2.MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
3.发送方开始执行本地事务逻辑。
4.发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
5.在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
6.发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
7.发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。
使用示例代码
事务消息的生产者为TransactionMQProducer实例,我们需要编写事务监听器TransactionListener的实现类,并实现
executeLocalTransaction和checkLocalTransaction方法,并为事务消息的生产者绑定事务监听器,发送消息时调用
sendMessageInTransaction方法,此时会向rocketmq服务器发送一条消息,
本地事务执行逻辑写在executeLocalTransaction方法里,该方法返回本地事务执行的状态,rocketmq服务器根据返回
的状态值决定该事务消息如何处理:
1、如果return LocalTransactionState.COMMIT_MESSAGE ,rocketmq服务器则会将该事务消息标记为可投递
2、如果return LocalTransactionState.ROLLBACK_MESSAGE ,rocketmq服务器则会将该事务消息丢弃
3、如果return LocalTransactionState.UNKNOW ,rocketmq服务器对该事务消息不做任何操作,该事务消息依然为半消息
事务监听器 TransactionListenerImpl.java
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; //事务监听器实现类 public class TransactionListenerImpl implements TransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); //执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { System.out.println(sdf.format(new Date()) + "============== 执行本地事务 ============"); // int value = transactionIndex.getAndIncrement(); // int status = value % 3; // localTrans.put(msg.getTransactionId(), status); return LocalTransactionState.ROLLBACK_MESSAGE; // return LocalTransactionState.UNKNOW; // return LocalTransactionState.COMMIT_MESSAGE; } //检查本地事务 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { System.out.println(sdf.format(new Date()) + "============== 消息回查,检查本地事务 ============"); Integer status = localTrans.get(msg.getTransactionId()); if(null != status) { switch(status) { case 0: return LocalTransactionState.UNKNOW; case 1: return LocalTransactionState.COMMIT_MESSAGE; case 2: return LocalTransactionState.ROLLBACK_MESSAGE; default: return LocalTransactionState.COMMIT_MESSAGE; } } return LocalTransactionState.COMMIT_MESSAGE; } }
TransactionProducer.java
import java.util.UUID; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.TransactionListener; import org.apache.rocketmq.client.producer.TransactionMQProducer; import org.apache.rocketmq.common.message.Message; public class TransactionProducer { private static final String mqNameServer = "127.0.0.1:9876"; private static final String mqTopics = "MQ-MSG-TOPICS-TEST"; private static final String producerMqGroupName = "PRODUCER-MQ-GROUP"; private static final TransactionListener transactionListener = new TransactionListenerImpl(); public static void main(String[] args) throws MQClientException { //创建消息生产者实例 TransactionMQProducer mqProducer = new TransactionMQProducer(producerMqGroupName); mqProducer.setNamesrvAddr(mqNameServer); mqProducer.setInstanceName(UUID.randomUUID().toString()); mqProducer.setTransactionListener(transactionListener); //启动生产者 mqProducer.start(); //发送消息 Message msg = new Message(mqTopics, "Msg456".getBytes()); SendResult result = mqProducer.sendMessageInTransaction(msg, null); System.out.printf("%s%n", result); } }
拉模式消息消费
PullConsumer.java
import java.util.List; import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; public class PullConsumer { private static final String consumerMqGroupName = "CONSUMER-MQ-GROUP"; private static final String mqTopics = "MQ-MSG-TOPICS-TEST"; private static final String mqNameServer = "127.0.0.1:9876"; public static void main(String[] args) throws MQClientException { //创建拉模式的消息消费者 DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(consumerMqGroupName); consumer.setNamesrvAddr(mqNameServer); //启动消息消费者 consumer.start(); //获取指定topic下的所有消息队列(发送消息到指定topic时消息会被随机发送到不同的消息队列) Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(mqTopics); for (MessageQueue mq : mqs) { if(mq.getQueueId() == 0) { System.out.printf("Consume from the queue: %s%n", mq); try { /* * pullBlockIfNotFound方法从指定消息队列中拉取消息 * 第一个参数 : 指定消息队列 * 第二个参数 : * 第三个参数 : 指定从消息队列中读取消息的起始位置offset * 第四个参数 : 指定从消息队列中读取消息的个数 */ PullResult result = consumer.pullBlockIfNotFound(mq, null, 0, 10); //从消息拉取结果中获取找到的消息集 List<MessageExt> msgFoundList = result.getMsgFoundList(); for(MessageExt msg : msgFoundList) { System.out.println("收到消息 " + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); } } } } }