Spring boot對MQ類如RabbitMQ、kafka支持都很好,但是仍然要寫一些模板代碼。Spring cloud stream進一步掩蓋了這個差異,僅僅使用配置就可以完成。
Spring cloud Stream 用了基於topic-subsriber的模式,雖然不支持全部MQ的特性,但絕大多數應用來說,這樣就足夠用了,畢竟方便很多。具體用法如下:
包含的包
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
MQ接口
public interface MyMQInterface{ @Input("通道名") SubscribableChannel inputChannel(); @Output("通道名") MessageChannel outputChanel(); }
- 通道名發送方和接受方必須是一致的
- 一個應用不能既是同一個通道的發送方和接收方,否則會警告。這實際上也沒有意義。
- 聲明后,啟用綁定
@EnableBinding(MyMQInterface.class)
- 可以綁定多個接口
發送MQ消息
@Autowired MyMQInterface myInterface; ..... myInterface.inputChannel().send(MessageBuilder....);
接受消息
@SystemListener("通道名") public void onReceive(Message content)
應用集群問題
- 如果某個應用起來多個實例,如上面的配置,會導致每條消息每個實例都會收到,如果你不想這么做,請在配置里面加上:
spring.cloud.stream.bindings.testOrders.group=分組名
- 每個應用定義一個唯一的分組名,不好和其他應用重復。
消息處理異常
- 如果收到消息處理有問題,比如寫入數據庫失敗,請拋出RuntimeException異常,MQ會重試,不過重試幾次后會失敗,這個要注意。