rocketmq&rabbitmq基本對比及接口封裝


Rabbitmq學習以及基本接口封裝

基於工程效率團隊線上已有消息中間件在使用,為了統一技術方案,所以采用rabbitmq作為消息中間件。rocketmq和rabbitmq這兩種消息中間件略有區別:

Rocketmq

Rabbitmq

Java開發,便於二次開發

Erlang語言開發

管理后台:cluster、topic、producer、consumer、nameserver。無登錄賬號密碼

管理后台:connection、channel、exchange、queueus、admin。有登陸賬號密碼

Broker向Nameserver注冊topic,producer從namesrver獲取topic,進而向關聯broker發送消息

通過exchange綁定不同queues進行消息派送

消費端支持集群消費和廣播消費;廣播消費下,同一個consumergroup下所有消費實例都會共享消費消息內容

同一個queue下所有消費實例會均分消息內容

支持事務消息

不支持事務消息

Broker支持通過tag過濾

Exchange綁定特定queues,每個queue為特定服務使用,兩個不同服務如果采用同一個queue,那么在從這個隊列進行消息消費時只能通過消息具體內容進行區分

通過將需要順序的消息發送到同一個queue里保證順序消息的功能

Exchange綁定固定的queue,實現順序消息

 

接口封裝步驟:

pom添加依賴

通用依賴:

        <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>1.5.9.RELEASE</version>

        <relativePath/> <!-- lookup parent from repository -->

</parent>

 

<dependency>

                  <groupId>org.springframework.boot</groupId>

                  <artifactId>spring-boot-starter-amqp</artifactId>

                  <version>1.5.2.RELEASE</version>

          </dependency>

添加rabbitmq服務配置信息

# rabbitmq配置

spring:

    rabbitmq:

      addresses: 172.16.4.201

      username: admin

          password: Pass@1234

 

Rabbitmq三種交換機模式:

a)         Direct Exchange

(直連):傳遞時需要一個Routekey,通過Routkey尋找對應隊列;這種模式Exchange不需要綁定(binding)queue

 

例子:

BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);

 

b)         Fanout Exchange

(廣播):任何發送到Fanout Exchange的消息都會轉發到與該Exchange綁定(bingding)的所有queue上;這種模式不需要routkey

 

 

例子:BindingBuilder.bind(queueA).to(fanoutExchange);

       

c)         Topic Exchange

可以理解為direct exchange+fanout exchange;任何發送到Exchange的消息都會被轉發所有關心Routekey中所指定的queue上;

需要指定routekey;exchange需要綁定(binding)queue;

模糊匹配:#表示0個或若干個關鍵字, “”表示一個關鍵字。如log.”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。

 

例子:BindingBuilder.bind(queueA()).to(defaultExchange()).with(RabbitConfig.ROUTINGKEY_A);

 

針對不同交換機模式具體rabbitmq配置文件如下:

public class RabbitConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

 

    @Value("${spring.rabbitmq.host}")

    private String host;

 

    @Value("${spring.rabbitmq.port}")

    private int port;

 

    @Value("${spring.rabbitmq.username}")

    private String username;

 

    @Value("${spring.rabbitmq.password}")

    private String password;

   

    //rabbitmq連接池

    @Bean

    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);

        connectionFactory.setUsername(username);

        connectionFactory.setPassword(password);

        connectionFactory.setVirtualHost("/");

        connectionFactory.setPublisherConfirms(true);

        return connectionFactory;

    }

    //rabbitTemplate用以簡化rabbitmq發送和接收消息

    @Bean

    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

    // 必須是prototype類型

    public RabbitTemplate rabbitTemplate() {

        RabbitTemplate template = new RabbitTemplate(connectionFactory());

        return template;

    }

   

    /*

     * 1、Direct Exchange(直連):傳遞時需要一個Routekey,通過Routkey尋找對應queue

     */

    //獲取DirectExchange類型交換機

    //交換機-exchange

    public static final String Direct_Exchange = "DIERCT_EXCHANGE";

    //隊列-queue

    public static final String Direct_Queue_A = "Direct_QUEUE_A";

    public static final String Direct_Queue_B = "Direct_QUEUE_B";

    //路由關鍵字-routingkey

    public static final String Direct_RoutingKey_A = "DIERCT_ROUTINGKEY_A";

    public static final String Direct_RoutingKey_B = "DIERCT_ROUTINGKEY_B";

    @Bean

    public DirectExchange directExchange() {

        return new DirectExchange(Direct_Exchange);

    }

    //獲取queue

    @Bean

    public Queue queueA() {

        return new Queue(Direct_Queue_A);

    }

    @Bean

    public Queue queueB() {

        return new Queue(Direct_Queue_B);

    }

    //傳遞routkey給exchange,將queue綁定到exchange上;可以將多個隊列綁定到同一個exchange上;

    //在生產者發送時需要routkey,格式:RabbitTemplate.convertAndSend(EXCHANGE, ROUTINGKEY, content);

    @Bean

    public Binding directBindingA() {

        return BindingBuilder.bind(queueA()).to(directExchange()).with(Direct_RoutingKey_A);

    }

    @Bean

    public Binding directBindingB() {

        return BindingBuilder.bind(queueB()).to(directExchange()).with(Direct_RoutingKey_B);

    }

   

    /*

     * 2、Fanout Exchange(廣播):任何發送到Fanout Exchange的消息都會轉發到與該Exchange綁定(bingding)的所有queue上;這種模式不需要routkey

     */

    static final String Fanout_Exchange="FANOUT_EXCHANGE";

    static final String Fanout_Queue_A="FANOUT_QUEUE_A";

    static final String Fanout_Queue_B="FANOUT_QUEUE_B";

    @Bean

    public FanoutExchange fanoutExchange() {

        return new FanoutExchange(Fanout_Exchange);

    }

    @Bean

    public Queue fanoutQueueA() {

        return new Queue(Fanout_Queue_A);

    }

    @Bean

    public Queue fanoutQueueB() {

        return new Queue(Fanout_Queue_B);

    }

    //廣播方式交換機與queue綁定無需routekey

    //生產者發送時routkey為空,格式rabbitTemplate.convertAndSend(Exchange,"", content);這樣可以將消息廣播到在RabbitConfig類中所有綁定的queues上

    @Bean

    public Binding fanoutBindingA() {

        return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());

    }

    @Bean

    public Binding fanoutBingB() {

        return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());

    }

   

    /*

     * 3、Topic Exchange:任何發送到Exchange的消息都會被轉發所有關心Routekey中所指定的queue上

     */

    static final String Topic_Exchange="TOPIC_EXCHANGE";

    static final String Topic_Queue_A="TOPIC_QUEUE_A";

    static final String Topic_Queue_B="TOPIC_QUEUE_B";

    static final String Topic_Routing_KeyA="TOPIC_#";

    static final String Topic_Routing_KeyB="TOPIC_*";

    @Bean

    public TopicExchange topicExchange() {

        return new TopicExchange(Topic_Exchange);

    }

    @Bean

    public Queue topicQueueA() {

        return new Queue(Topic_Queue_A);

    }

    @Bean

    public Queue topicQueueB() {

        return new Queue(Topic_Queue_B);

    }

    @Bean

    public Binding topicBingA() {

        return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(Topic_Routing_KeyA);

    }

    @Bean

    public Binding topicBingB() {

        return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(Topic_Routing_KeyB);

    }

}

 

消息發送接口:針對不同交換機類型,發送方法參數略有不同

@Component

public class RabbitSender {

       @Autowired

       private AmqpTemplate rabbitTemplate;

 

   /*

    * directExchange類型  需指定routingkey

    */

       public void directSend(String sendMessage) {

                     // 注意 第一個參數是我們交換機的名稱 ,第二個參數是routerKey 我們不用管空着就可以,第三個是你要發送的消息

                     this.rabbitTemplate.convertAndSend(RabbitConfig.Direct_Exchange, RabbitConfig.Direct_RoutingKey_A, sendMessage); // exchange,routkey,message

       }

       /*

        * fanoutExchange類型 無需指定routingkey

        */

       public void fanoutSend(String sendMessage) {

                     for(int i=0;i<1;i++) {

                                   rabbitTemplate.convertAndSend(RabbitConfig.Fanout_Exchange, "", sendMessage+i);

                     }

       }

      

       /*

        * topicExchange類型 需指定routingkey

        */

       public void topicSend(String sendMessage) {

                     rabbitTemplate.convertAndSend(RabbitConfig.Topic_Exchange, RabbitConfig.Topic_Routing_KeyA, sendMessage);

       }

}

 

監聽器  可以監聽某個或者某些queue

@Component

@RabbitListener(queues = {RabbitConfig.Direct_Queue_A})

public class Consumer1 {

       /**

     * 消息消費

     * @RabbitHandler 代表此方法為接受到消息后的處理方法

     */

    @RabbitHandler

    public void recieved(String message) {

        System.out.println("--------:"+JSON.toJSONString(message));

    }

}

 

 

基於docker rocketmq安裝&rocketmq基本接口封裝

Rocketmq單機現已部署到qa環境:http://30.16.80.9:8080/#/

Rocketmq接口封裝代碼以提交到cap/backed/develop-v2.0分支

具體rocketmq部署以及接口總結文檔如下:

 

一、基於docker rocketmq安裝:

1、  拉取rocketmq鏡像

docker pull rocketmqinc/rocketmq

 

2、  拉取rocketmq-console鏡像

docker pull styletang/rocketmq-console-ng

 

3、  啟動nameserver

docker run -d -p 9876:9876 -v `pwd`/data/namesrv/logs:/root/logs -v `pwd`/data/namesrv/store:/root/store --name rmqnamesrv rocketmqinc/rocketmq sh mqnamesrv

 

4、 啟動broker

docker run -d -p 10911:10911 -p 10909:10909 -v `pwd`/data/broker/logs:/root/logs -v `pwd`/data/broker/store:/root/store --name rmqbroker --link rmqnamesrv:namesrv -e "NAMESRV_ADDR=namesrv:9876" rocketmqinc/rocketmq sh mqbroker -c ../conf/broker.conf

 

由於啟動broker時rocketmq默認指定為內網地址,會導致外網無法連接到broker,報出如下錯誤信息: connect to xxx.xx.xx.xx:10911 failed

解決方案:①docker exec -it xxxxx bash    --xxx是指broker對應的containerid

②cd ../conf

③vi broker.conf

④增加brokerIP1 = xxx.xxx.xxx.xxx  --這里的ip地址指定為外網地址

               ⑤重啟broker容器

5、 啟動rocketmq-console

docker run -e "JAVA_OPTS=-Drocketmq.namesrv.addr=xxx.xxx.xxx.xxx:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 -t styletang/rocketmq-console-ng

--- xxx.xxx.xxx.xxx 為服務器地址

 

 

 

二、rocketmq接口封裝

1、  rocketmq結構

 

 

2、  rocketmq實例

a)         消費端

                         i.              普通發送消息

public SendResult send(String topic, String tag, String sendMsg) {

                 log.info("SendMessage_topic:" + topic + "SendMessage_tag:" + tag + ",sendMsg:" + sendMsg);

                 SendResult sendResult = null;

                 Message msgMessage = null;

                 if (StringUtils.isBlank(topic)) {

                         throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);

                 }

                 try {

                         if (StringUtils.isBlank(tag)) {

                                  msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

                         } else {

                                  msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

                         }

                         sendResult = producer.send(msgMessage);

                 } catch (Exception e) {

                         log.error(e.toString());

                 }

                 return sendResult;

        }

                       ii.              順序消息發送

RocketMQ中同一個隊列不能被並行消費,但可以並行消費多個隊列。基於此,Rocket可以保證將需要排序的內容放在同一個隊列中便可以保證消費的順序進行

        public SendResult sendOrderly(String orderId, String topic, String tag, String sendMsg) {

                 log.info("Orderly SendMessage_topic:" + topic + "SendMessage_tag:" + tag + ",sendMsg:" + sendMsg);

                 SendResult sendResult = null;

                 Message msgMessage = null;

                 if (StringUtils.isBlank(topic)) {

                         throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);

                 }

                 try {

                         if (StringUtils.isBlank(tag)) {

                                  msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

                         } else {

                                  msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));

                         }

                         sendResult = producer.send(msgMessage, new MessageQueueSelector() {

                                  @Override

                                  public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {

                                          Integer id = (Integer) arg;

                                          int index = id % mqs.size();

                                          return mqs.get(index);

                                  }

                         }, orderId);

                 } catch (Exception e) {

                         log.error(e.toString());

                 }

                 return sendResult;

        }

       

b)         消息監聽器

i.順序消費 實現MessageListenerOrderly接口

public class MQConsumeMsgOrderlyListener implements MessageListenerOrderly {

    @Value("${rocketmq.consumer.topic}")

    private String topic;

    @Value("${rocketmq.consumer.tag}")

    private String tag;

        @Override

        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {

                 ConsumeOrderlyStatus result=ConsumeOrderlyStatus.SUCCESS;

                 if (StringUtils.isBlank(topic)) {

                         throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);

                 }

                 for(MessageExt msg:msgs) {

                         try {

                                  if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topic)) {  //TODO 這個需要么,consumer訂閱會指定topic

                                          String Message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);

                                          log.info("MQConsumeMsgListenerProcessor consumeMessage body:" + Message);

                                          // TODO 根據接收到mq消息內容進行其他操作

                                          return result;

                                  }

                         }catch(Exception e) {

                                  result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;

                         }

                 }

                 return null;

        }

ii. 並發消費 實現MessageListenerConcurrently接口

public class MQConsumeMsgConcurrentListener implements MessageListenerConcurrently {

 

    @Value("${rocketmq.consumer.topic}")

    private String topic;

    @Value("${rocketmq.consumer.tag}")

    private String tag;

 

    @Override

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

        ConsumeConcurrentlyStatus result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

        if (StringUtils.isBlank(topic)) {

           throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL, "topics is null !!!", false);

        }

        for (MessageExt msg : msgs) {

           try {

               if (StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topic)) { // TODO

                   String Message = new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET);

                   log.info("MQConsumeMsgListenerProcessor consumeMessage body:" + Message);

                   // TODO 根據接收到mq消息內容進行其他操作

                   return result;

               }

           } catch (Exception e) {

               result = ConsumeConcurrentlyStatus.RECONSUME_LATER;

           }

        }

        return result;

    }

 

}

 

c)         消費端

i.)                   集群消費

一個ConsumerGroup中的Consumer實例平均分攤消費生產者發送的消息;

在初始化消費者時將consumer設置為集群消費模式:consumer.setMessageModel(MessageModel.CLUSTERING); 

 

ii.)                 廣播消費

一條消息被多個Consumer消費,幾十這些Consumer屬於同一個ConsumerGroup,消息也會被ConsumerGroup中的每個Consumer消費一次;

在初始化消費者時將consumer設置為廣播消費模式:

consumer.setMessageModel(MessageModel.BROADCASTING);

 
 
基本代碼
 Application : 
package com.leolztang.sb.aop.rocketmq2;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.jpa.repository.config.EnableJpaAuditing;

@SpringBootApplication
@EnableJpaAuditing 
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}
View Code

InitialProducer 

package com.leolztang.sb.aop.rocketmq2;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
public class InitialProducer {
    private static final Logger log = LoggerFactory.getLogger(InitialProducer.class);
    @Value("${rocketmq.producer.groupName}")
    private String groupName;
    @Value("${rocketmq.producer.namesrvAddr}")
    private String nameserAddr;
    @Value("${rocketmq.producer.instanceName}")
    private String instanceName;
    @Value("${rocketmq.producer.maxMessageSize}")
    private int maxMessageSize;
    @Value("${rocketmq.producer.sendMsgTimeout}")
    private int sendMsgTimeout;

    private DefaultMQProducer producer;
    @Bean
    public DefaultMQProducer getRocketMQProducer() {
        if (StringUtils.isEmpty(groupName)) {
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is blank",false);
        }
        if (StringUtils.isEmpty(nameserAddr)) {
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"nameServerAddr is blank",false);
        }
        producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameserAddr);
        producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(maxMessageSize);
        producer.setSendMsgTimeout(sendMsgTimeout);
        producer.setVipChannelEnabled(false);

        try {
            producer.start();
            log.info("rocketMQ is start !!groupName : {},nameserAddr:{}",groupName,nameserAddr);
        } catch (MQClientException e) {
            log.error(String.format("rocketMQ start error,{}",e.getMessage()));
            e.printStackTrace();
        }
        return producer;
    }

}
View Code

 ProducerDemo : 

package com.leolztang.sb.aop.rocketmq2;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/dmservice/rocket")
public class ProducerDemo {
    @Autowired
    private SendMessageService sendMessageService;
    
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    @RequestMapping(value="/sendmq2")
    public void sendMQDemo() {
        String body="rocketMQ test body";
        for(int i=0;i<10;i++) {
            sendMessageService.send(topic, tag,  body+i+"");
        }
    }
    
}
View Code
SendMessageService 
package com.leolztang.sb.aop.rocketmq2;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class SendMessageService {
    private static final Logger log = LoggerFactory.getLogger(SendMessageService.class);
    @Autowired
    private DefaultMQProducer producer;

    // 1、topic
    public SendResult send(String topic, String sendMsg) {
        log.info("SendMessage_topic:" + topic + ",sendMsg:" + sendMsg);
        SendResult sendResult = null;
        try {
            Message msgMessage = new Message(topic, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = producer.send(msgMessage);
            log.info("SendResult topic:" + topic + "sendResult.getSendStatus:" + sendResult.getSendStatus());
        } catch (Exception e) {
            log.error(e.toString());
        }
        return sendResult;
    }

    // 2、topci+tags
    public SendResult send(String topic, String tag, String sendMsg) {
        log.info("SendMessage_topic:" + topic + ",tag:" + tag + ",sendMsg:" + sendMsg);
        SendResult sendResult = null;
        try {
            Message msgMessage = new Message(topic, tag, sendMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));
            sendResult = producer.send(msgMessage);
            log.info("SendResult topic:" + topic + ",tag:" + tag +"sendResult.getSendStatus:" + sendResult.getSendStatus());
        } catch (Exception e) {
            log.error(e.toString());
        }
        return sendResult;
    }
}
View Code

InitialConsumer1 :

package com.leolztang.sb.aop.rocketmq2;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

@Service
public class InitialConsumer1 {
    private static final Logger log = LoggerFactory.getLogger(InitialConsumer1.class);
    
    @Value("${rocketmq.consumer.namesrvAddr}")
    private String namesrvAddr;
    @Value("${rocketmq.consumer.groupName}")
    private String groupName;
    @Value("${rocketmq.consumer.consumeThreadMin}")
    private int consumeThreadMin;
    @Value("${rocketmq.consumer.consumeThreadMax}")
    private int consumeThreadMax;
    @Value("${rocketmq.consumer.topic}")
    private String topic;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    @Autowired
    private MQConsumeMsgListenerProcessor2 mqMessageListenerProcessor;  //TODO MQConsumeMsgListenerProcessor改為接口,可以不同consuemr不同listener
    
    @Bean
    public DefaultMQPushConsumer getRocketMQConsumer() throws RocketMQException{
        if (StringUtils.isEmpty(groupName)){
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"groupName is null !!!",false);
        }
        if (StringUtils.isEmpty(namesrvAddr)){
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"namesrvAddr is null !!!",false);
        }
        if(StringUtils.isEmpty(topic)){
            throw new RocketMQException(RocketMQErrorEnum.PARAMM_NULL,"topics is null !!!",false);
        }
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeThreadMin(consumeThreadMin);
        consumer.setConsumeThreadMax(consumeThreadMax);
        consumer.registerMessageListener(mqMessageListenerProcessor);
        /**
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.setMessageModel(MessageModel.CLUSTERING); //集群消費
//        consumer.setMessageModel(MessageModel.BROADCASTING);//廣播消費  對應MessageListenerConcurrently監聽器
        try {
            consumer.subscribe(topic, tag); //TODO 這里是否可以注冊topic和tag
            consumer.start();
        }catch(Exception e) {
            log.error("groupName:{},topic:{},tag:{}, getRocketMQConsumer error",groupName,topic,tag,e);
        }
        /**
         * 設置一次消費消息的條數,默認為1條
         */
        return consumer;
    }
    
}
View Code

MQConsumeMsgListenerProcessor 

package com.leolztang.sb.aop.rocketmq2;

import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;


@Service
public class MQConsumeMsgListenerProcessor implements MessageListenerOrderly {
    private static final Logger log = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
    @Value("${rocketmq.consumer.topic}")
    private String topics;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        ConsumeOrderlyStatus result = ConsumeOrderlyStatus.SUCCESS;
        for(MessageExt msg:msgs) {
            try {
                if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topics)) {   //TODO 這個需要么,consumer訂閱會指定topic
                    if(StringUtils.isNotBlank(msg.getTags()) && msg.getTags().equals(tag)) {
                        String Message=new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET);
                        log.info("======================:"+Message);
                        //TODO 根據接收到mq消息內容進行其他操作
                        return result;
                    }
                }
            }catch(Exception e) {
                result = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        return result;
    }

}
View Code

MQConsumeMsgListenerProcessor2

package com.leolztang.sb.aop.rocketmq2;

import java.util.HashMap;
import java.util.List;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

@Service
public class MQConsumeMsgListenerProcessor2 implements MessageListenerConcurrently {
    private static final Logger log = LoggerFactory.getLogger(MQConsumeMsgListenerProcessor.class);
    @Value("${rocketmq.consumer.topic}")
    private String topics;
    @Value("${rocketmq.consumer.tag}")
    private String tag;
    
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        HashMap<K, V>
        ConsumeConcurrentlyStatus result = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        for(MessageExt msg:msgs) {
            try {
                if(StringUtils.isNotBlank(msg.getTopic()) && msg.getTopic().equals(topics)) {   //TODO 這個需要么,consumer訂閱會指定topic
                    if(StringUtils.isNotBlank(msg.getTags()) && msg.getTags().equals(tag)) {
                        String Message=new String(msg.getBody(),RemotingHelper.DEFAULT_CHARSET);
                        log.info("==================~~~~~~~~jjjjjjjjj============:"+Message);
                        //TODO 根據接收到mq消息內容進行其他操作
                        return result;
                    }
                }
            }catch(Exception e) {
                result = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return result;
    }

}
View Code

RocketMQErrorEnum 

package com.leolztang.sb.aop.rocketmq2;

public enum RocketMQErrorEnum implements ErrorCode {
    /********公共********/
    PARAMM_NULL("MQ_001","參數為空"),
    
    /********生產者*******/
    
    
    
    /********消費者*******/
    NOT_FOUND_CONSUMESERVICE("MQ_100","根據topic和tag沒有找到對應的消費服務"),
    HANDLE_RESULT_NULL("MQ_101","消費方法返回值為空"),
    CONSUME_FAIL("MQ_102","消費失敗")
    
    ;

    private String code;
    private String msg;

    private RocketMQErrorEnum(String code, String msg) {
        this.code = code;
        this.msg = msg;
    }
    public String getCode() {
        return this.code;
    }

    public String getMsg() {
        return this.msg;
    }
}
View Code

RocketMQException 

package com.leolztang.sb.aop.rocketmq2;

public class RocketMQException extends AppException {

    /**
     * 無參構造函數
     */
    public RocketMQException() {
        super();
    }

    public RocketMQException(Throwable e) {
        super(e);
    }

    public RocketMQException(ErrorCode errorType) {
        super(errorType);
    }

    public RocketMQException(ErrorCode errorCode, String... errMsg) {
        super(errorCode, errMsg);
    }

    /**
     * 封裝異常
     * 
     * @param errorCode
     * @param errMsg
     * @param isTransfer
     *            是否轉換異常信息,如果為false,則直接使用errMsg信息
     */
    public RocketMQException(ErrorCode errorCode, String errMsg, Boolean isTransfer) {
        super(errorCode, errMsg, isTransfer);
    }

    public RocketMQException(ErrorCode errCode, Throwable cause, String... errMsg) {
        super(errCode, cause, errMsg);
    }
}
View Code

ErrorCode 

package com.leolztang.sb.aop.rocketmq2;

import java.io.Serializable;

public interface ErrorCode extends Serializable {
    /* 
    * 錯誤碼
     * @return
     */
    String getCode();
    /**
     * 錯誤信息
     * @return
     */
    String getMsg();
}
View Code

AppException 

package com.leolztang.sb.aop.rocketmq2;

public class AppException extends RuntimeException{
     private static final long serialVersionUID = 1L;
        /**
         * 錯誤編碼
         */
        protected ErrorCode errCode;

        /**
         * 錯誤信息
         */
        protected String errMsg;

        /**
         * 無參構造函數
         */
        public AppException() {
            super();
        }
        public AppException(Throwable e) {
            super(e);
        }
        
        public AppException(ErrorCode errCode, String... errMsg) {
            super(errCode.getMsg());
            this.errCode = errCode;
            setErrMsg(errMsg,true);
        }
        
        public AppException(ErrorCode errCode, String errMsg,Boolean isTransfer) {
            super(errMsg);
            this.errCode = errCode;
            setErrMsg(new String[]{errMsg},isTransfer);
        }
        
        /**
         * 構造函數
         *
         * @param cause 異常
         */
        public AppException(ErrorCode errCode, Throwable cause, String... errMsg) {
            super(errCode.getCode() + errCode.getMsg(), cause);
            this.errCode = errCode;
            setErrMsg(errMsg,true);
        }

        public ErrorCode getErrCode() {
            return errCode;
        }

        public void setErrCode(ErrorCode errCode) {
            this.errCode = errCode;
        }

        public String getErrMsg() {
            return this.errMsg;
        }

        public void setErrMsg(String[] errMsg,Boolean isTransfer) {

            if (null != errMsg &&errMsg.length>0) {
                if(errCode.getMsg().contains("%s") && isTransfer){
                    this.errMsg = String.format(errCode.getMsg(), errMsg);
                }else{
                    StringBuffer sf = new StringBuffer();
                    for (String msg : errMsg) {
                        sf.append(msg+";");
                    }
                    this.errMsg = sf.toString();
                }
            }else{
                this.errMsg = errCode.getMsg();
            }

        }

        public static void main(String[] args) {
            String str = "ERRCode:1004--對象不存在:[%s]";
            if (str.contains("%s")){
             System.out.println("包含");
            }
        }
}
View Code

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM