Rabbitmq學習以及基本接口封裝
基於工程效率團隊線上已有消息中間件在使用,為了統一技術方案,所以采用rabbitmq作為消息中間件。rocketmq和rabbitmq這兩種消息中間件略有區別:
| Rocketmq |
Rabbitmq |
| Java開發,便於二次開發 |
Erlang語言開發 |
| 管理后台:cluster、topic、producer、consumer、nameserver。無登錄賬號密碼 |
管理后台:connection、channel、exchange、queueus、admin。有登陸賬號密碼 |
| Broker向Nameserver注冊topic,producer從namesrver獲取topic,進而向關聯broker發送消息 |
通過exchange綁定不同queues進行消息派送 |
| 消費端支持集群消費和廣播消費;廣播消費下,同一個consumergroup下所有消費實例都會共享消費消息內容 |
同一個queue下所有消費實例會均分消息內容 |
| 支持事務消息 |
不支持事務消息 |
| Broker支持通過tag過濾 |
Exchange綁定特定queues,每個queue為特定服務使用,兩個不同服務如果采用同一個queue,那么在從這個隊列進行消息消費時只能通過消息具體內容進行區分 |
| 通過將需要順序的消息發送到同一個queue里保證順序消息的功能 |
Exchange綁定固定的queue,實現順序消息 |
接口封裝步驟:
pom添加依賴
通用依賴:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
添加rabbitmq服務配置信息
# rabbitmq配置
spring:
rabbitmq:
addresses: 172.16.4.201
username: admin
password: Pass@1234
Rabbitmq三種交換機模式:
a) Direct Exchange
(直連):傳遞時需要一個Routekey,通過Routkey尋找對應隊列;這種模式Exchange不需要綁定(binding)queue
例子:
BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
b) Fanout Exchange
(廣播):任何發送到Fanout Exchange的消息都會轉發到與該Exchange綁定(bingding)的所有queue上;這種模式不需要routkey
例子:BindingBuilder.bind(queueA).to(fanoutExchange);
c) Topic Exchange
可以理解為direct exchange+fanout exchange;任何發送到Exchange的消息都會被轉發所有關心Routekey中所指定的queue上;
需要指定routekey;exchange需要綁定(binding)queue;
模糊匹配:#表示0個或若干個關鍵字, “”表示一個關鍵字。如log.”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。
例子:BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);
針對不同交換機模式具體rabbitmq配置文件如下:
public class RabbitConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
//rabbitmq連接池
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
//rabbitTemplate用以簡化rabbitmq發送和接收消息
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
// 必須是prototype類型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
/*
* 1、Direct Exchange(直連):傳遞時需要一個Routekey,通過Routkey尋找對應queue
*/
//獲取DirectExchange類型交換機
//交換機-exchange
public static final String Direct_Exchange = "DIERCT_EXCHANGE";
//隊列-queue
public static final String Direct_Queue_A = "Direct_QUEUE_A";
public static final String Direct_Queue_B = "Direct_QUEUE_B";
//路由關鍵字-routingkey
public static final String Direct_RoutingKey_A = "DIERCT_ROUTINGKEY_A";
public static final String Direct_RoutingKey_B = "DIERCT_ROUTINGKEY_B";
@Bean
public DirectExchange directExchange() {
return new DirectExchange(Direct_Exchange);
}
//獲取queue
@Bean
public Queue queueA() {
return new Queue(Direct_Queue_A);
}
@Bean
public Queue queueB() {
return new Queue(Direct_Queue_B);
}
//傳遞routkey給exchange,將queue綁定到exchange上;可以將多個隊列綁定到同一個exchange上;
//在生產者發送時需要routkey,格式:RabbitTemplate.convertAndSend(EXCHANGE, ROUTINGKEY, content);
@Bean
public Binding directBindingA() {
return BindingBuilder.bind(queueA()).to(directExchange()).with(Direct_RoutingKey_A);
}
@Bean
public Binding directBindingB() {
return BindingBuilder.bind(queueB()).to(directExchange()).with(Direct_RoutingKey_B);
}
/*
* 2、Fanout Exchange(廣播):任何發送到Fanout Exchange的消息都會轉發到與該Exchange綁定(bingding)的所有queue上;這種模式不需要routkey
*/
static final String Fanout_Exchange="FANOUT_EXCHANGE";
static final String Fanout_Queue_A="FANOUT_QUEUE_A";
static final String Fanout_Queue_B="FANOUT_QUEUE_B";
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(Fanout_Exchange);
}
@Bean
public Queue fanoutQueueA() {
return new Queue(Fanout_Queue_A);
}
@Bean
public Queue fanoutQueueB() {
return new Queue(Fanout_Queue_B);
}
//廣播方式交換機與queue綁定無需routekey
//生產者發送時routkey為空,格式rabbitTemplate.convertAndSend(Exchange,"", content);這樣可以將消息廣播到在RabbitConfig類中所有綁定的queues上
@Bean
public Binding fanoutBindingA() {
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding fanoutBingB() {
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
/*
* 3、Topic Exchange:任何發送到Exchange的消息都會被轉發所有關心Routekey中所指定的queue上
*/
static final String Topic_Exchange="TOPIC_EXCHANGE";
static final String Topic_Queue_A="TOPIC_QUEUE_A";
static final String Topic_Queue_B="TOPIC_QUEUE_B";
static final String Topic_Routing_KeyA="TOPIC_#";
static final String Topic_Routing_KeyB="TOPIC_*";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(Topic_Exchange);
}
@Bean
public Queue topicQueueA() {
return new Queue(Topic_Queue_A);
}
@Bean
public Queue topicQueueB() {
return new Queue(Topic_Queue_B);
}
@Bean
public Binding topicBingA() {
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(Topic_Routing_KeyA);
}
@Bean
public Binding topicBingB() {
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(Topic_Routing_KeyB);
}
}
消息發送接口:針對不同交換機類型,發送方法參數略有不同
@Component
public class RabbitSender {
@Autowired
private AmqpTemplate rabbitTemplate;
/*
* directExchange類型 需指定routingkey
*/
public void directSend(String sendMessage) {
// 注意 第一個參數是我們交換機的名稱 ,第二個參數是routerKey 我們不用管空着就可以,第三個是你要發送的消息
this.rabbitTemplate.convertAndSend(RabbitConfig.Direct_Exchange, RabbitConfig.Direct_RoutingKey_A, sendMessage); // exchange,routkey,message
}
/*
* fanoutExchange類型 無需指定routingkey
*/
public void fanoutSend(String sendMessage) {
for(int i=0;i<1;i++) {
rabbitTemplate.convertAndSend(RabbitConfig.Fanout_Exchange, "", sendMessage+i);
}
}
/*
* topicExchange類型 需指定routingkey
*/
public void topicSend(String sendMessage) {
rabbitTemplate.convertAndSend(RabbitConfig.Topic_Exchange, RabbitConfig.Topic_Routing_KeyA, sendMessage);
}
}
監聽器 可以監聽某個或者某些queue
@Component
@RabbitListener(queues = {RabbitConfig.Direct_Queue_A})
public class Consumer1 {
/**
* 消息消費
* @RabbitHandler 代表此方法為接受到消息后的處理方法
*/
@RabbitHandler
public void recieved(String message) {
System.out.println("--------:"+JSON.toJSONString(message));
}
}
基於docker rocketmq安裝&rocketmq基本接口封裝
Rocketmq單機現已部署到qa環境:http://30.16.80.9:8080/#/;
Rocketmq接口封裝代碼以提交到cap/backed/develop-v2.0分支
具體rocketmq部署以及接口總結文檔如下:
一、基於docker rocketmq安裝:
1、 拉取rocketmq鏡像
docker pull rocketmqinc/rocketmq
2、 拉取rocketmq-console鏡像
docker pull styletang/rocketmq-console-ng
3、 啟動nameserver
docker run -d -p 9876:9876 -v `pwd`/data/namesrv/logs:/root/logs -v `pwd`/data/namesrv/store:/root/store --name rmqnamesrv rocketmqinc/rocketmq sh mqnamesrv
4、 啟動broker
docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/logs:/root/logs -v `pwd`/data/broker/store:/root/store --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq sh mqbroker -c ../conf/broker.conf
由於啟動broker時rocketmq默認指定為內網地址,會導致外網無法連接到broker,報出如下錯誤信息: connect to xxx.xx.xx.xx:10911 failed
解決方案:①docker exec -it xxxxx bash --xxx是指broker對應的containerid
②cd ../conf
③vi broker.conf
④增加brokerIP1 = xxx.xxx.xxx.xxx --這里的ip地址指定為外網地址
⑤重啟broker容器
5、 啟動rocketmq-console
docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng
--- xxx.xxx.xxx.xxx 為服務器地址
二、rocketmq接口封裝
1、 rocketmq結構
2、 rocketmq實例
a) 消費端
i. 普通發送消息
public SendResult send(String topic, String tag, String sendMsg) {
log.info("SendMessage_topic:" + topic + "SendMessage_tag:" + tag + ",sendMsg:" + sendMsg);
SendResult sendResult = null;
Message msgMessage = null;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
try {
if (StringUtils.isBlank(tag)) {
msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
} else {
msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
}
sendResult = producer.send(msgMessage);
} catch (Exception e) {
log.error(e.toString());
}
return sendResult;
}
ii. 順序消息發送
RocketMQ中同一個隊列不能被並行消費,但可以並行消費多個隊列。基於此,Rocket可以保證將需要排序的內容放在同一個隊列中便可以保證消費的順序進行
public SendResult sendOrderly(String orderId, String topic, String tag, String sendMsg) {
log.info("Orderly SendMessage_topic:" + topic + "SendMessage_tag:" + tag + ",sendMsg:" + sendMsg);
SendResult sendResult = null;
Message msgMessage = null;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
try {
if (StringUtils.isBlank(tag)) {
msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
} else {
msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
}
sendResult = producer.send(msgMessage, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
} catch (Exception e) {
log.error(e.toString());
}
return sendResult;
}
b) 消息監聽器
i.順序消費 實現MessageListenerOrderly接口
public class MQConsumeMsgOrderlyListener implements MessageListenerOrderly {
@Value("${rocketmq.consumer.topic}")
private String topic;
@Value("${rocketmq.consumer.tag}")
private String tag;
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
ConsumeOrderlyStatus result=ConsumeOrderlyStatus.SUCCESS;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
for(MessageExt msg:msgs) {
try {
if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topic)) { //TODO 這個需要么,consumer訂閱會指定topic
String Message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("MQConsumeMsgListenerProcessor consumeMessage body:" + Message);
// TODO 根據接收到mq消息內容進行其他操作
return result;
}
}catch(Exception e) {
result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return null;
}
ii. 並發消費 實現MessageListenerConcurrently接口
public class MQConsumeMsgConcurrentListener implements MessageListenerConcurrently {
@Value("${rocketmq.consumer.topic}")
private String topic;
@Value("${rocketmq.consumer.tag}")
private String tag;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
ConsumeConcurrentlyStatus result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
if (StringUtils.isBlank(topic)) {
throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);
}
for (MessageExt msg : msgs) {
try {
if (StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topic)) { // TODO
String Message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);
log.info("MQConsumeMsgListenerProcessor consumeMessage body:" + Message);
// TODO 根據接收到mq消息內容進行其他操作
return result;
}
} catch (Exception e) {
result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return result;
}
}
c) 消費端
i.) 集群消費
一個ConsumerGroup中的Consumer實例平均分攤消費生產者發送的消息;
在初始化消費者時將consumer設置為集群消費模式:consumer.setMessageModel(MessageModel.CLUSTERING);
ii.) 廣播消費
一條消息被多個Consumer消費,幾十這些Consumer屬於同一個ConsumerGroup,消息也會被ConsumerGroup中的每個Consumer消費一次;
在初始化消費者時將consumer設置為廣播消費模式:
consumer.setMessageModel(MessageModel.BROADCASTING);
package com.leolztang.sb.aop.rocketmq2; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; @SpringBootApplication @EnableJpaAuditing public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
InitialProducer
package com.leolztang.sb.aop.rocketmq2; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @Service public class InitialProducer { private static final Logger log = LoggerFactory.getLogger(InitialProducer.class); @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String nameserAddr; @Value("${rocketmq.producer.instanceName}") private String instanceName; @Value("${rocketmq.producer.maxMessageSize}") private int maxMessageSize; @Value("${rocketmq.producer.sendMsgTimeout}") private int sendMsgTimeout; private DefaultMQProducer producer; @Bean public DefaultMQProducer getRocketMQProducer() { if (StringUtils.isEmpty(groupName)) { throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is blank",false); } if (StringUtils.isEmpty(nameserAddr)) { throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"nameServerAddr is blank",false); } producer = new DefaultMQProducer(groupName); producer.setNamesrvAddr(nameserAddr); producer.setInstanceName(instanceName); producer.setMaxMessageSize(maxMessageSize); producer.setSendMsgTimeout(sendMsgTimeout); producer.setVipChannelEnabled(false); try { producer.start(); log.info("rocketMQ is start !!groupName : {},nameserAddr:{}",groupName,nameserAddr); } catch (MQClientException e) { log.error(String.format("rocketMQ start error,{}",e.getMessage())); e.printStackTrace(); } return producer; } }
ProducerDemo :
package com.leolztang.sb.aop.rocketmq2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/dmservice/rocket") public class ProducerDemo { @Autowired private SendMessageService sendMessageService; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @RequestMapping(value="/sendmq2") public void sendMQDemo() { String body="rocketMQ test body"; for(int i=0;i<10;i++) { sendMessageService.send(topic, tag, body+i+""); } } }
package com.leolztang.sb.aop.rocketmq2; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class SendMessageService { private static final Logger log = LoggerFactory.getLogger(SendMessageService.class); @Autowired private DefaultMQProducer producer; // 1、topic public SendResult send(String topic, String sendMsg) { log.info("SendMessage_topic:" + topic + ",sendMsg:" + sendMsg); SendResult sendResult = null; try { Message msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET)); sendResult = producer.send(msgMessage); log.info("SendResult topic:" + topic + "sendResult.getSendStatus:" + sendResult.getSendStatus()); } catch (Exception e) { log.error(e.toString()); } return sendResult; } // 2、topci+tags public SendResult send(String topic, String tag, String sendMsg) { log.info("SendMessage_topic:" + topic + ",tag:" + tag + ",sendMsg:" + sendMsg); SendResult sendResult = null; try { Message msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET)); sendResult = producer.send(msgMessage); log.info("SendResult topic:" + topic + ",tag:" + tag +"sendResult.getSendStatus:" + sendResult.getSendStatus()); } catch (Exception e) { log.error(e.toString()); } return sendResult; } }
InitialConsumer1 :
package com.leolztang.sb.aop.rocketmq2; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; @Service public class InitialConsumer1 { private static final Logger log = LoggerFactory.getLogger(InitialConsumer1.class); @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topic}") private String topic; @Value("${rocketmq.consumer.tag}") private String tag; @Autowired private MQConsumeMsgListenerProcessor2 mqMessageListenerProcessor; //TODO MQConsumeMsgListenerProcessor改為接口,可以不同consuemr不同listener @Bean public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException{ if (StringUtils.isEmpty(groupName)){ throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is null !!!",false); } if (StringUtils.isEmpty(namesrvAddr)){ throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"namesrvAddr is null !!!",false); } if(StringUtils.isEmpty(topic)){ throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"topics is null !!!",false); } DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(mqMessageListenerProcessor); /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 * 如果非第一次啟動,那么按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.setMessageModel(MessageModel.CLUSTERING); //集群消費 // consumer.setMessageModel(MessageModel.BROADCASTING);//廣播消費 對應MessageListenerConcurrently監聽器 try { consumer.subscribe(topic, tag); //TODO 這里是否可以注冊topic和tag consumer.start(); }catch(Exception e) { log.error("groupName:{},topic:{},tag:{}, getRocketMQConsumer error",groupName,topic,tag,e); } /** * 設置一次消費消息的條數,默認為1條 */ return consumer; } }
MQConsumeMsgListenerProcessor
package com.leolztang.sb.aop.rocketmq2; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly { private static final Logger log = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class); @Value("${rocketmq.consumer.topic}") private String topics; @Value("${rocketmq.consumer.tag}") private String tag; public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { ConsumeOrderlyStatus result = ConsumeOrderlyStatus.SUCCESS; for(MessageExt msg:msgs) { try { if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topics)) { //TODO 這個需要么,consumer訂閱會指定topic if(StringUtils.isNotBlank(msg.getTags()) && msg.getTags().equals(tag)) { String Message=new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET); log.info("======================:"+Message); //TODO 根據接收到mq消息內容進行其他操作 return result; } } }catch(Exception e) { result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return result; } }
MQConsumeMsgListenerProcessor2
package com.leolztang.sb.aop.rocketmq2; import java.util.HashMap; import java.util.List; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; @Service public class MQConsumeMsgListenerProcessor2 implements MessageListenerConcurrently { private static final Logger log = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class); @Value("${rocketmq.consumer.topic}") private String topics; @Value("${rocketmq.consumer.tag}") private String tag; public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { HashMap<K, V> ConsumeConcurrentlyStatus result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS; for(MessageExt msg:msgs) { try { if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topics)) { //TODO 這個需要么,consumer訂閱會指定topic if(StringUtils.isNotBlank(msg.getTags()) && msg.getTags().equals(tag)) { String Message=new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET); log.info("==================~~~~~~~~jjjjjjjjj============:"+Message); //TODO 根據接收到mq消息內容進行其他操作 return result; } } }catch(Exception e) { result = ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return result; } }
RocketMQErrorEnum
package com.leolztang.sb.aop.rocketmq2; public enum RocketMQErrorEnum implements ErrorCode { /********公共********/ PARAMM_NULL("MQ_001","參數為空"), /********生產者*******/ /********消費者*******/ NOT_FOUND_CONSUMESERVICE("MQ_100","根據topic和tag沒有找到對應的消費服務"), HANDLE_RESULT_NULL("MQ_101","消費方法返回值為空"), CONSUME_FAIL("MQ_102","消費失敗") ; private String code; private String msg; private RocketMQErrorEnum(String code, String msg) { this.code = code; this.msg = msg; } public String getCode() { return this.code; } public String getMsg() { return this.msg; } }
RocketMQException
package com.leolztang.sb.aop.rocketmq2; public class RocketMQException extends AppException { /** * 無參構造函數 */ public RocketMQException() { super(); } public RocketMQException(Throwable e) { super(e); } public RocketMQException(ErrorCode errorType) { super(errorType); } public RocketMQException(ErrorCode errorCode, String... errMsg) { super(errorCode, errMsg); } /** * 封裝異常 * * @param errorCode * @param errMsg * @param isTransfer * 是否轉換異常信息,如果為false,則直接使用errMsg信息 */ public RocketMQException(ErrorCode errorCode, String errMsg, Boolean isTransfer) { super(errorCode, errMsg, isTransfer); } public RocketMQException(ErrorCode errCode, Throwable cause, String... errMsg) { super(errCode, cause, errMsg); } }
ErrorCode
package com.leolztang.sb.aop.rocketmq2; import java.io.Serializable; public interface ErrorCode extends Serializable { /* * 錯誤碼 * @return */ String getCode(); /** * 錯誤信息 * @return */ String getMsg(); }
AppException
package com.leolztang.sb.aop.rocketmq2; public class AppException extends RuntimeException{ private static final long serialVersionUID = 1L; /** * 錯誤編碼 */ protected ErrorCode errCode; /** * 錯誤信息 */ protected String errMsg; /** * 無參構造函數 */ public AppException() { super(); } public AppException(Throwable e) { super(e); } public AppException(ErrorCode errCode, String... errMsg) { super(errCode.getMsg()); this.errCode = errCode; setErrMsg(errMsg,true); } public AppException(ErrorCode errCode, String errMsg,Boolean isTransfer) { super(errMsg); this.errCode = errCode; setErrMsg(new String[]{errMsg},isTransfer); } /** * 構造函數 * * @param cause 異常 */ public AppException(ErrorCode errCode, Throwable cause, String... errMsg) { super(errCode.getCode() + errCode.getMsg(), cause); this.errCode = errCode; setErrMsg(errMsg,true); } public ErrorCode getErrCode() { return errCode; } public void setErrCode(ErrorCode errCode) { this.errCode = errCode; } public String getErrMsg() { return this.errMsg; } public void setErrMsg(String[] errMsg,Boolean isTransfer) { if (null != errMsg &&errMsg.length>0) { if(errCode.getMsg().contains("%s") && isTransfer){ this.errMsg = String.format(errCode.getMsg(), errMsg); }else{ StringBuffer sf = new StringBuffer(); for (String msg : errMsg) { sf.append(msg+";"); } this.errMsg = sf.toString(); } }else{ this.errMsg = errCode.getMsg(); } } public static void main(String[] args) { String str = "ERRCode:1004--對象不存在:[%s]"; if (str.contains("%s")){ System.out.println("包含"); } } }
