SpringCloud stream連接RabbitMQ收發信息


百度上查的大部分都是一些很簡單的單消費者或者單生產者的例子,並且多是同一個服務器的配置,本文的例子為多服務器配置下的消費生產和消費者配置

參考資料:https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_binder_implementations

 

1、POM引入spring-cloud-starter-stream-rabbit

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2、application.properties

通用配置:

#rabbit的配置信息
spring.rabbitmq.addresses=amqp://10.18.75.231:5672
spring.rabbitmq.username=user_admin
spring.rabbitmq.password=12345678
#下面這個配置優先級太高,在配置中心分模塊(分文件)的場景下后面的binder屬性無法被覆蓋,如果有存在多個vhost的情況下建議將該屬性注釋掉
spring.rabbitmq.virtual-host=boss

當存在多個binder時必須指定一個默認的binder:

# 設置一個默認的binder,如果不配置將報錯
spring.cloud.stream.defaultBinder=boss

消費者配置: 1 # 配置ecm消費者的服務器配置信息

 2 spring.cloud.stream.binders.ecm.type=rabbit  3 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses}  4 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.username=${spring.rabbitmq.username}  5 #spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.password=${spring.rabbitmq.password}  6 spring.cloud.stream.binders.ecm.environment.spring.rabbitmq.virtual-host=ecm  7 
 8 #交易系統ECM的貨櫃模板變更消費者
 9 spring.cloud.stream.bindings.ecm_shop_template.binder=ecm 10 spring.cloud.stream.bindings.ecm_shop_template.destination=這里填exchange的名字 11 #默認情況下同一個隊列的只能被同一個group的消費者消費
12 spring.cloud.stream.bindings.ecm_shop_template.group=這里是消費者的名稱 13 spring.cloud.stream.bindings.ecm_shop_template.contentType=text/plain 14 #指定該主題的類型為廣播模式
15 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.exchangeType=fanout 16 #消費失敗的消息放入dlq隊列
17 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.autoBindDlq=true 18 spring.cloud.stream.rabbit.bindings.ecm_shop_template.consumer.republishToDlq=true

配置死信隊列會在消費者出現異常的時候重試3(默認為3,可以配置)次后將消息放入死信隊列中,效果如下:

 

生產者配置:

 1 # BOSS消息生產者服務器配置
 2 spring.cloud.stream.binders.boss.type=rabbit
 3 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.addresses=${spring.rabbitmq.addresses}
 4 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.username=${spring.rabbitmq.username}
 5 #spring.cloud.stream.binders.boss.environment.spring.rabbitmq.password=${spring.rabbitmq.password}
 6 spring.cloud.stream.binders.boss.environment.spring.rabbitmq.virtual-host=boss
 7 
 8 #BOSS基礎信息生產者
 9 spring.cloud.stream.bindings.message_output.destination=exchange的名稱
10 #exchange的類型為廣播模式
11 spring.cloud.stream.rabbit.bindings.message_output.producer.exchangeType=fanout

下面是java代碼

1、定義消息的Input和Output配置信息

 1 import org.springframework.cloud.stream.annotation.Input;
 2 import org.springframework.cloud.stream.annotation.Output;
 3 import org.springframework.messaging.MessageChannel;
 4 
 5 /**
 6  * mq連接源定義
 7  * 
 8  * 其中類中的2個屬性的值和properties里的配置需要一致
 9  **/
10 public interface MqMessageSource {
11     // BOSS生產者
12     String MESSAGE_OUTPUT = "message_output";
13     // ECM消費者
14     String ECM_SHOP_TEMPLATE_INPUT = "ecm_shop_template";
15 
16     @Output(MESSAGE_OUTPUT)
17     MessageChannel messageOutput();
18     
19     @Input(ECM_SHOP_TEMPLATE_INPUT)
20     MessageChannel messageInput();
21 
22 }

2、消息消費

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.cloud.stream.annotation.EnableBinding;
 3 import org.springframework.cloud.stream.annotation.StreamListener;
 4 import org.springframework.messaging.Message;
 5 
 6 import com.alibaba.fastjson.JSONObject;
 7 
 8 import lombok.extern.slf4j.Slf4j;
 9 
10 /**
11  * MQ消費者
12  * @author yangzhilong
13  *
14  */
15 @Slf4j
16 @EnableBinding(MqMessageSource.class)
17 public class MqMessageConsumer {
18 
19     @Autowired
20     private XXService xxService;
21     
22     /**
23      * 消費ECM的貨櫃模板變更
24      * @param message
25      */
26     @StreamListener(MqMessageSource.ECM_SHOP_TEMPLATE_INPUT)
27     public void receive(Message<String> message) {
28         log.info("接收貨櫃模板開始,參數={}", JSONObject.toJSONString(message));
29         if (null == message) {
30             return;
31         }
32         try {
33             String payload = message.getPayload();
34             log.info("具體消息內容= {}", JSONObject.toJSONString(payload));
35             JSONObject jsonObject = JSONObject.parseObject(payload);
36             ShopReqDto shopReqDto = new ShopReqDto();
37             shopReqDto.setCode(jsonObject.getString("shopNo"));
38             shopReqDto.setGoodsMarketTemplateId(jsonObject.getLong("goodsMarketTemplateId"));
39             shopReqDto.setGoodsMarketTemplateName(jsonObject.getString("goodsMarketTemplateName"));
40             ResponseResult<String> responseResult = xxService.updateTemplateIdAndName(shopReqDto);
41             if(responseResult.isSuccess()){
42                 log.info("【MQ消費貨櫃模板更新信息成功】");
43             }else{
44                 log.error("【MQ消費貨櫃模板更新信息失敗】,返回結果信息:" + JSONObject.toJSONString(responseResult));
45             }
46         } catch (Exception e) {
47             log.error("接收處理貨櫃模板MQ時出現異常:{}", e);
48             throw new RuntimeException(e);
49         }
50     }
51 }

3、消息生產者代碼

 1 import org.springframework.beans.factory.annotation.Autowired;
 2 import org.springframework.cloud.stream.annotation.EnableBinding;
 3 import org.springframework.cloud.stream.annotation.Output;
 4 import org.springframework.messaging.MessageChannel;
 5 import org.springframework.messaging.support.MessageBuilder;
 6 import com.alibaba.fastjson.JSON;
 7 import lombok.extern.slf4j.Slf4j;
 8 
 9 /**
10  * 消息生產者
11  *
12  **/
13 @EnableBinding(MqMessageSource.class)
14 @Slf4j
15 public class MqMessageProducer {
16     @Autowired
17     @Output(MqMessageSource.MESSAGE_OUTPUT)
18     private MessageChannel channel;
19 
20 
21     //品牌
22     public void sendBrandAdd(Brand brand) {
23         BossMessage<Brand> message = new BossMessage<>();
24         message.setData(brand);
25         message.setOpType(MqMessageProducer.ADD);
26         message.setDataType(MqMessageProducer.BRAND);
27         channel.send(MessageBuilder.withPayload(JSON.toJSONString(message)).build());
28         log.info("【MQ發送內容】" + JSON.toJSONString(message));
29     }
30 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM