一、關於Spring-Cloud-Stream
Spring Cloud Stream本質上就是整合了Spring Boot和Spring Integration,實現了一套輕量級的消息驅動的微服務框架。通過使用Spring Cloud Stream,可以有效地簡化開發人員對消息中間件的使用復雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。
在這里我先放一張官網的圖:
應用程序通過Spring Cloud Stream注入到輸入和輸出通道與外界進行通信。根據此規則我們很容易的實現消息傳遞,訂閱消息與消息中轉。並且當需要切換消息中間件時,幾乎不需要修改代碼,只需要變更配置就行了。
在用例圖中 Inputs代表了應用程序監聽消息 、outputs代表發送消息、binder的話大家可以理解為將應用程序與消息中間件隔離的抽象,類似於三層架構下利用dao屏蔽service與數據庫的實現的原理。
springcloud默認提供了rabbitmq與kafka的實現。
二、springcloud集成kafka
1、添加gradle依賴:

dependencies{ compile('org.springframework.cloud:spring-cloud-stream') compile('org.springframework.cloud:spring-cloud-stream-binder-kafka') compile('org.springframework.kafka:spring-kafka') }
2、定義一個接口:
spring-cloud-stream已經給我們定義了最基本的輸入與輸出接口,他們分別是 Source,Sink, Processor
Sink接口:

package org.springframework.cloud.stream.messaging; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); }
Source接口:

package org.springframework.cloud.stream.messaging; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); }
Processor接口:

package org.springframework.cloud.stream.messaging; public interface Processor extends Source, Sink { }
這里面Processor這個接口既定義輸入通道又定義了輸出通道。同時我們也可以自己定義通道接口,代碼如下:

package com.bdqn.lyrk.shop.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface ShopChannel { /** * 發消息的通道名稱 */ String SHOP_OUTPUT = "shop_output"; /** * 消息的訂閱通道名稱 */ String SHOP_INPUT = "shop_input"; /** * 發消息的通道 * * @return */ @Output(SHOP_OUTPUT) MessageChannel sendShopMessage(); /** * 收消息的通道 * * @return */ @Input(SHOP_INPUT) SubscribableChannel recieveShopMessage(); }
3、定義服務類

package com.bdqn.lyrk.shop.server; import com.bdqn.lyrk.shop.channel.ShopChannel; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class ShopService { @Resource(name = ShopChannel.SHOP_OUTPUT) private MessageChannel sendShopMessageChannel; @GetMapping("/sendMsg") public String sendShopMessage(String content) { boolean isSendSuccess = sendShopMessageChannel. send(MessageBuilder.withPayload(content).build()); return isSendSuccess ? "發送成功" : "發送失敗"; } @StreamListener(ShopChannel.SHOP_INPUT) public void receive(Message<String> message) { System.out.println(message.getPayload()); } }
這里面大家注意 @StreamListener。這個注解可以監聽輸入通道里的消息內容,注解里面的屬性指定我們剛才定義的輸入通道名稱,而MessageChannel則可以通過
輸出通道發送消息。使用@Resource注入時需要指定我們剛才定義的輸出通道名稱
4、定義啟動類

package com.bdqn.lyrk.shop; import com.bdqn.lyrk.shop.channel.ShopChannel; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableBinding(ShopChannel.class) public class ShopServerApplication { public static void main(String[] args) { SpringApplication.run(ShopServerApplication.class, args); } }
注意@EnableBinding注解,這個注解指定剛才我們定義消息通道的接口名稱,當然這里也可以傳多個相關的接口
5、定義application.yml文件

spring: application: name: shop-server cloud: stream: bindings: #配置自己定義的通道與哪個中間件交互 shop_input: #ShopChannel里Input和Output的值 destination: zhibo #目標主題 shop_output: destination: zhibo default-binder: kafka #默認的binder是kafka kafka: bootstrap-servers: localhost:9092 #kafka服務地址 consumer: group-id: consumer1 producer: key-serializer: org.apache.kafka.common.serialization.ByteArraySerializer value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer client-id: producer1 server: port: 8100
這里是重頭戲,我們必須指定所有通道對應的消息主題,同時指定默認的binder為kafka,緊接着定義Spring-kafka的外部化配置,在這里指定producer的序列化類為ByteArraySerializer
啟動程序成功后,我們訪問 http://localhost:8100/sendMsg?content=2 即可得到如下結果