前言
本文章為單體項目,將消費者和生產者寫在同一個項目中,介意者不用向下看了。
本文介紹三種應用方式:
1:普通整合RabbitMQ
2:消息分區
3:按條件消費(多個消費者只消費同一隊列中滿足自己條件的消息)
1:核心依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>${spring.cloud.stream}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
<version>${spring.cloud.stream}</version>
</dependency>
全部依賴:
項目目錄圖:
2:基礎版整合RabbitMQ
①:application.properties
spring.rabbitmq.host=192.168.1.218
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.cloud.stream.bindings.dev-exchange.destination=dev-exchange
spring.cloud.stream.bindings.dev-exchange.group=dev-queue
spring.cloud.stream.bindings.dev-exchange.content-type=application/json
spring.cloud.stream.bindings.dev-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.dev-exchange.consumer.max-attempts=1
②:定義生產者和消費者接口
import com.boot.rabbitmq.constance.MQConstants;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface RabbitStream {
/**
* 消息流入(消費)
**/
@Input(MQConstants.DEV_EXCHANGE)
SubscribableChannel devConsumer();
/**
* 消息流出(生產)
**/
@Output(MQConstants.DEV_EXCHANGE)
MessageChannel devProducer();
}
③:生產者代碼:
@Component
@EnableBinding(RabbitStream.class)
public class DevProducer {
private static final Logger logger = LoggerFactory.getLogger(DevProducer.class);
private final RabbitStream rabbitStream;
public DevProducer(RabbitStream rabbitStream) {
this.rabbitStream = rabbitStream;
}
public void sendMsg(MQModel model) {
logger.info("producer:{}", JSON.toJSONString(model));
rabbitStream.devProducer()
.send(MessageBuilder.withPayload(model).build());
}
}
④:費者代碼:
@Component
@EnableBinding(RabbitStream.class)
public class DevListener {
private static final Logger logger = LoggerFactory.getLogger(DevListener.class);
@StreamListener(MQConstants.DEV_EXCHANGE)
public void receiveMsgAutoCommit(@Payload String payload) {
logger.info("consumer:{}", payload);
}
}
⑥:controller代碼:
@PostMapping(value = "/dev")
public void dev(@RequestBody MQModel model) {
devProducer.sendMsg(model);
}
⑦:測試
發送請求:
控制台日志:
3:消息分區
3.1:概念
RabbitMQ 本身是不支持消息分區的,只是由於業務演變+代碼控制的一種方案而已(參考spring官方開頭文檔理解)。
個人理解:所謂消息分區就是將一個大隊列拆分 0、1...n 個小隊列,
然后分解成 producer-A -> queue-A -> Consumer-A 的一種場景。

3.2:如何在項目中使用
①:配置
不需要改很多東西,只需要添加少部分配置即可
## RabbitMQ 消息分區配置
spring.cloud.stream.bindings.partition-exchange.destination=partition-exchange
spring.cloud.stream.bindings.partition-exchange.group=partition-queue
spring.cloud.stream.bindings.partition-exchange.content-type=application/json
spring.cloud.stream.bindings.partition-exchange.consumer.concurrency=1
spring.cloud.stream.bindings.partition-exchange.consumer.max-attempts=1
## 消息分區
spring.cloud.stream.bindings.partition-exchange.consumer.partitioned=true
## 分區數量
spring.cloud.stream.bindings.partition-exchange.producer.partition-count=2
## 機器下標,最大值=partition-count-1
spring.cloud.stream.instance-index=0
## 分區策略表達式
spring.cloud.stream.bindings.partition-exchange.producer.partition-key-expression=payload.mid
②:路由規則
然后消息的路由的時候會從payload拿到mid進行條件運算:
mid/2=1則放在應用隊列下標為1的隊列,mid/2=0則放在隊列下標為0的隊列。

③:源碼截圖
消息的入隊前會計算出該消息應該進入哪個隊列↓↓↓↓↓↓↓↓↓↓

可以看到開啟分區之后,payload 的類型不是String,而是具備鍵值對的實體對象。
4:條件消費
4.1:概念
前面說過,Message 是由消息頭和消息體組成的。因此可以在發送消息的時候自定義一個key存放在消息頭,消費者可以根據自己的消費條件進行消費。
對同一個隊列中的消息按條件進行划分再派發給不同的消費者。我的示例就是在header 中設置了一個key。

4.2:匹配條件講解
除了可以用 MessageHeader 中的數據進行匹配條件外,payload(消息體)中的數據也可以作為條件。
消息實體結構:

4.3:測試
代碼截圖↓↓↓↓↓↓↓↓↓↓↓
效果↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
本文GitHub地址
個人理解,不精之處望指出。