SpringCloud Stream生產者配置RabbitMq的動態路由鍵


在寫這個文章前不得不吐槽目前國內一些blog的文章,盡是些復制粘貼的文章,提到點上但沒任何的深入和例子。.........

經過測試下來總結一下RabbitMQ的Exchange的特性:

1、direct

生產者可以指定路由鍵,消費者可以指定路由鍵,但不能講路由鍵設置為#(全部)。

2、topic

生產者可以指定路由鍵,消費者可以指定路由鍵,也可以不指定(或者是#)。

3、fanout

生產者和消費都忽略路由鍵。

 

在現實的場景里,通常是生產者會生產多個路由鍵的消費,然后多個消費來消費指定路由鍵的消息,但通常生產者的生產代碼是同一份,如何在發消息的時候動態指定當前消息的路由鍵呢?

例子:門店平台系統集中處理多個門店的數據,然后分別將不同門店的數據發送到不同的門店(即:A門店只消費屬於A門店的消息,B門店只消費屬於B的消息)

看例子:

application.yml

 1 spring:    
 2     cloud:
 3         stream:
 4             # 設置默認的binder
 5             default-binder: pos
 6             binders:
 7                 scm:
 8                     type: rabbit
 9                     environment:
10                         spring:
11                           rabbitmq:
12                             # 連接到scm的host和exchange
13                             virtual-host: scm
14             pos:
15                 type: rabbit
16                 environment:
17                     spring:
18                         rabbitmq:
19                             # 連接到pos的host和exchange
20                             virtual-host: pos
21                     
22             shop:
23                 type: rabbit
24                   environment:
25                     spring:
26                       rabbitmq:
27                         # 連接到shop的host和exchange
28                         virtual-host: shop
29             
30           bindings:
31             # ---------消息消費------------
32 
33             # 集單開始生產消費
34             order_set_start_produce_input:
35               binder: pos
36               destination: POS_ORDER_SET_STRAT_PRODUCE
37               group: pos_group
38               
39             # 門店ID為1的消費者    
40             shop_consumer_input_1:
41                 binder:    shop
42                 destination: POS_ORDER_SET_STRAT_PRODUCE
43                 group: shop_1_group
44      
45               
46             #-----------消息生產-----------
47             # 集單開始生產通知生產
48             order_set_start_produce_output:
49               binder: pos
50               destination: POS_ORDER_SET_STRAT_PRODUCE
51             
52           rabbit:
53             bindings:
54               # 集單開始生產消費者
55               order_set_start_produce_input:
56                 consumer:
57                   exchangeType: topic
58                   autoBindDlq: true
59                   republishToDlq: true
60                   deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_POS_DLX
61                   #bindingRoutingKey: '#'
62               # 門店1的消費者
63               shop_consumer_input_1:
64                 consumer:
65                   exchangeType: topic
66                   autoBindDlq: true
67                   republishToDlq: true
68                   deadLetterExchange: POS_ORDER_SET_STRAT_PRODUCE_SHOP_1_DLX
69                   bindingRoutingKey: 1
70                   deadLetterRoutingKey: 1
71 
72               # 生產者配置              
73               order_set_start_produce_output:
74                 producer:
75                   exchangeType: topic
76                   routingKeyExpression: headers.shopId
77                   # routingKeyExpression: headers['shopId']

 

上面的配置文件配置了一個動態的基於shopId做路由的生產者配置,一個消費全部路由鍵的消費者,如果要配置指定路由鍵的可以在配置文件里設置bindingRoutingKey屬性的值。

 

生產者java代碼:

import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.longge.pos.production.mq.dto.OrderSetProductionMsg;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MqSendUtil {
    private static MessageChannel orderSetStartProduceChannel;

    public static void setSfOrderCreateChannel(MessageChannel channel) {
        sfOrderCreateProduceChannel = channel;
    }
    
    public static void sendOrderSetPrintMsg(OrderSetProductionMsg msg) {
        // add kv pair - routingkeyexpression (which matches 'type') will then evaluate
        // and add the value as routing key
        log.info("發送開始生產的MQ:{}", JSONObject.toJSONString(msg));
        orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).setHeader("shopId", msg.getOrderSet().getShopId()).build());
        //orderSetStartProduceChannel.send(MessageBuilder.withPayload(JSON.toJSONString(msg)).build());
    }
}

動態路由的核心在於上面那個紅色的字體的地方,這個是和配置文件里的  routingKeyExpression 的配置是匹配的。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM