如果項目中我們用的是RabbitMQ進行消息傳輸,隨着后面的業務需求,我們需要向Kafka遷移,如果單純去修改代碼,那是很繁瑣的。
那么怎么解決這種情況呢,既能使用RabbitMQ又可以快速切換KafKa?這時就用到了SpringCloudStream:
其可以屏蔽底層消息中間件的差異,降低切換成本,統一消息的編程模型。不過目前只支持RabbitMQ 和 Kafka。
通過定義綁定器 Binder 作為中間層,實現了應用程序與消息中間件細節之間的隔離。向應用程序暴露統一的 Channel 通道,使得應用程序不需要再考慮各種消息中間件的實現
inputs是消費者,outputs是生產者
Stream中的消息通信方式遵循了發布-訂閱模式,用 Topic 主題進行廣播(在RabbitMQ就是Exchange,在Kafka中就是Topic)
其主要流程如下圖
Binder:很方便的連接中間件,屏蔽差異。
Channel:通道,是隊列Queue的一種抽象,在消息通訊系統中就是實現存儲和轉發的媒介,通過channel對隊列進行配置。
Source和Sink:簡單理解為消息的生產者和消費者。
SpringBoot整合SpringCLoudStream:
1、添加依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.54</version> </dependency> <!-- Swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.7.0</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.7.0</version> </dependency> <!-- Spring Cloud Stream --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> <version>2.1.3.RELEASE</version> <exclusions> <exclusion> <artifactId>objenesis</artifactId> <groupId>org.objenesis</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.1.3.RELEASE</version> </dependency>
2、配置文件application.yml
server:
port: 8088
spring:
application:
name: stream_demo
cloud:
stream:
binders: #需要綁定的rabbitmq的服務信息
defaultRabbit: #定義的名稱,用於binding整合
type: rabbit #消息組件類型
environment: #配置rabbitmq連接環境
spring:
rabbitmq:
host: **.**.**.** #rabbitmq 服務器的地址
port: 5672 #rabbitmq 服務器端口
username: **** #rabbitmq 用戶名
password: **** #rabbitmq 密碼
virtual-host: / #虛擬路徑
bindings: #服務的整合處理
#inputs 對應消費者,outputs 對應生產者
#Stream中的消息通信方式遵循了發布-訂閱模式
#在Stream中,處於同一個組的多個消費者是競爭關系,就可以保證消息只被一個服務消費一次,而不同組是可以重復消費的。現在默認分組就是不同的,組流水號不一樣。
#消費者宕機:如果未配置group,則消費者上線后無法消費之前的消息(消息丟失);如果配置了group,則消費上線后可以消費之前的消息(消息持久化)
testOutput: #生產者消息輸出通道 ---> 消息輸出通道 = 生產者相關的定義:Exchange & Queue
destination: exchange-test #exchange名稱,交換模式默認是topic;把SpringCloud Stream的消息輸出通道綁定到RabbitMQ的exchange-test交換器。
content-type: application/json #設置消息的類型,本次為json
default-binder: defaultRabbit #設置要綁定的消息服務的具體設置,默認綁定RabbitMQ
group: testGroup #分組=Queue名稱,如果不設置會使用默認的組流水號
testInput: #消費者消息輸入通道 ---> 消息輸入通道 = 消費者相關的定義:Exchange & Queue
destination: exchange-test #exchange名稱,交換模式默認是topic;把SpringCloud Stream的消息輸入通道綁定到RabbitMQ的exchange-test交換器。
content-type: application/json
default-binder: defaultRabbit
group: testGroup
3、Channel信道
package com.stream.api; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; import org.springframework.stereotype.Component; /** * @ClassName: TestChannelProcessor * @Description: 定義Channel信道 * @Author: qiaojiacheng * @Date: 2021/3/10 3:20 下午 */ @Component public interface TestChannelProcessor { /** * 生產者消息輸出通道(需要與配置文件中的保持一致) */ String TEST_OUTPUT = "testOutput"; /** * 消息生產 * * @return */ @Output(TEST_OUTPUT) MessageChannel testOutput(); /** * 消費者消息輸入通道(需要與配置文件中的保持一致) */ String TEST_INPUT = "testInput"; /** * 消息消費 * * @return */ @Input(TEST_INPUT) SubscribableChannel testInput(); }
4、生產者
package com.stream.provider; import com.alibaba.fastjson.JSON; import com.stream.api.TestChannelProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.binding.BinderAwareChannelResolver; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import java.util.HashMap; import java.util.Map; /** * @ClassName: TestMessageProducer * @Description: 生產者生產消息 * @Author: qiaojiacheng * @Date: 2021/3/10 3:21 下午 */ @EnableBinding(value = {TestChannelProcessor.class}) public class TestMessageProducer { @Autowired private BinderAwareChannelResolver channelResolver; /** * 生產消息 * * @param msg */ public void testSendMessage(String msg) { Map<String, Object> headers = new HashMap<>(); Map<String, Object> payload = new HashMap<>(); payload.put("msg", msg); System.err.println("生產者發送消息:" + JSON.toJSONString(payload)); channelResolver.resolveDestination(TestChannelProcessor.TEST_OUTPUT).send( MessageBuilder.createMessage(payload, new MessageHeaders(headers)) ); } }
5、發送消息的Controller
package com.stream.controller; import com.stream.provider.TestMessageProducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @ClassName: TestController * @Description: 測試發送消息 * @Author: qiaojiacheng * @Date: 2021/3/10 3:23 下午 */ @RestController public class TestController { @Autowired private TestMessageProducer testMessageProducer; /** * 發送保存訂單消息 * * @param message */ @GetMapping(value = "sendTestMessage") public void sendTestMessage(@RequestParam("message") String message) { //發送消息 testMessageProducer.testSendMessage(message); } }
6、消費者
package com.stream.provider; import com.stream.api.TestChannelProcessor; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; /** * @ClassName: TestMessageConsumer * @Description: 消費者消費消息 * @Author: qiaojiacheng * @Date: 2021/3/10 4:09 下午 */ @EnableBinding(TestChannelProcessor.class) public class TestMessageConsumer { @StreamListener(TestChannelProcessor.TEST_INPUT) public void testConsumeMessage(Message<String> message) { System.err.println("消費者消費消息:" + message.getPayload()); } }
7、swagger配置,方便測試
package com.stream.swagger; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.request.async.DeferredResult; import springfox.documentation.builders.ApiInfoBuilder; import springfox.documentation.builders.PathSelectors; import springfox.documentation.service.ApiInfo; import springfox.documentation.spi.DocumentationType; import springfox.documentation.spring.web.plugins.Docket; import springfox.documentation.swagger2.annotations.EnableSwagger2; /** * @author qiaojiacheng */ @Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket createRestApi() { return new Docket(DocumentationType.SWAGGER_2) .genericModelSubstitutes(DeferredResult.class) .select() .paths(PathSelectors.any()) .build().apiInfo(apiInfo()); } private ApiInfo apiInfo() { return new ApiInfoBuilder().title("Stream server") .description("測試SpringCloudStream") .termsOfServiceUrl("https://spring.io/projects/spring-cloud-stream") .version("1.0").build(); } }
8、啟動類
package com.stream; import com.stream.api.TestChannelProcessor; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableBinding(value= {TestChannelProcessor.class}) public class StreamDemoApplication { public static void main(String[] args) { SpringApplication.run(StreamDemoApplication.class, args); } }
訪問swagger進行測試
控制台輸出結果
查看rabbitmq控制台