springcloud 總集:https://www.tapme.top/blog/detail/2019-02-28-11-33
代碼見文章結尾
想想平常生活中做飯的場景,在用電飯鍋做飯的同時,我們可以洗菜、切菜,等待電飯鍋發出飯做好的提示我們回去拔下電飯鍋電源(或者什么也不知讓它處於保溫狀態),反正這個時候我們知道飯做好了,接下來可以炒菜了。從這里可以看出我們在日常生活中與世界的互動並不是同步的、線性的,不是簡單的請求--響應模型。它是事件驅動的,我們不斷的發送消息、接受消息、處理消息。
同樣在軟件世界中也不全是請求--響應模型,也會需要進行異步的消息通信。使用消息實現事件通信的概念被稱為消息驅動架構(Event Driven Architecture,EDA),也被稱為消息驅動架構(Message Driven Architecture,MDA)。使用這類架構可以構建高度解耦的系統,該系統能夠對變化做出響應,且不需要與特定的庫或者服務緊密耦合。
在 Spring Cloud 項目中可以使用Spirng Cloud Stream輕而易舉的構建基於消息傳遞的解決方案。
為什么使用消息傳遞
要解答這個問題,讓我們從一個例子開始,之前一直使用的兩個服務:許可證服務和組織服務。每次對許可證服務進行請求,許可證服務都要通過 http 請求到組織服務上查詢組織信息。顯而易見這次額外的 http 請求會花費較長的時間。如果能夠將緩存組織數據的讀操作,將會大幅提高許可證服務的響應時間。但是緩存數據有如下 2 個要求:
- 緩存的數據需要在許可證服務的所有實例之間保存一致——這意味着不能將數據緩存到服務實例的內存中。
- 在更新或者刪除一個組織數據時,許可證服務緩存的數據需要失效——避免讀取到過期數據,需要盡早讓過時數據失效並刪除。
要實現上面的要求,現在有兩種辦法。
-
使用同步請求--響應模型來實現。組織服務在組織數據變化時調用許可證服務的接口通知組織服務已經變化,或者直接操作許可證服務的緩存。
-
使用事件驅動。組織服務發出一個異步消息。許可證服務收到該消息后清除對應的緩存。
同步請求-響應方式
許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務同步 http 請求通知許可證服務數據過期。這種方式有以下幾個問題:
- 組織服務和許可證服務緊密耦合
- 這種方式不夠靈活,如果要為組織服務添加新的消費者,必須修改組織服務代碼,以讓其通知新的服務數據變動。
使用消息傳遞方式
同樣的許可證服務在 redis 中緩存從組織服務中查詢到的服務信息,當組織數據更新時,組織服務將更新信息寫入到隊列中。許可證服務監聽消息隊列。使用消息傳遞有一下 4 個好處:
- 松耦合性:將服務間的依賴,變成了服務對隊列的依賴,依賴關系變弱了。
- 耐久性:即使服務消費者已經關閉了,也可以繼續往里發送消息,等消費者開啟后處理
- 可伸縮性: 消息發送者不用等待消息消費者的響應,它們可以繼續做各自的工作
- 靈活性:消息發送者不用知道誰會消費這個消息,因此在有新的消息消費者時無需修改消息發送代碼
spring cloud 中使用消息傳遞
spring cloud 項目中可以通過 spring cloud stream 框架來輕松集成消息傳遞。該框架最大的特點是抽象了消息傳遞平台的細節,因此可以在支持的消息隊列中隨意切換(包括 Apache Kafka 和 RabbitMQ)。
spring cloud stream 架構
spring cloud stream 中有 4 個組件涉及到消息發布和消息消費,分別為:
-
發射器
當一個服務准備發送消息時,它將使用發射器發布消息。發射器是一個 Spring 注解接口,它接收一個普通 Java 對象,表示要發布的消息。發射器接收消息,然后序列化(默認序列化為 JSON)后發布到通道中。 -
通道
通道是對隊列的一個抽象。通道名稱是與目標隊列名稱相關聯的。但是隊列名稱並不會直接公開在代碼中,代碼永遠只會使用通道名。 -
綁定器
綁定器是 spring cloud stream 框架的一部分,它是與特定消息平台對話的 Spring 代碼。通過綁定器,使得開發人員不必依賴於特定平台的庫和 API 來發布和消費消息。 -
接收器
服務通過接收器來從隊列中接收消息,並將消息反序列化。
處理邏輯如下:
實戰
繼續使用之前的項目,在許可證服務中緩存組織數據到 redis 中。
建立 redis 服務
為方便起見,使用 docker 創建 redis,建立腳本如下:
docker run -itd --name redis --net host redis:
建立 kafka 服務
在組織服務中編寫消息生產者
首先在 organization 服務中引入 spring cloud stream 和 kafka 的依賴。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
然后在 events 類中編寫SimpleSouce
類,用於組織數據修改,產生一條消息到隊列中。代碼如下:
@EnableBinding(Source.class)
public class SimpleSource {
private Logger logger = LoggerFactory.getLogger(SimpleSource.class);
private Source source;
@Autowired
public SimpleSource(Source source) {
this.source = source;
}
public void publishOrChange(String action, String orgId) {
logger.info("在請求:{}中,發送kafka消息:{} for Organization Id:{}", UserContextHolder.getContext().id, action, orgId);
OrganizationChange change = new OrganizationChange(action, orgId, UserContextHolder.getContext().id);
source.output().send(MessageBuilder.withPayload(change).build());
}
}
這里使用的是默認通道,Source 類定義的 output 通道發消息。后面通過 Sink 定義的 input 通道收消息。
然后在OrganizationController
類中定義一個 delete 方法,並注入 SimpleSouce 類,代碼如下:
@Autowired
private SimpleSource simpleSource;
@DeleteMapping(value = "/organization/{orgId}")
public void deleteOne(@PathVariable("orgId") String id) {
logger.debug("刪除了組織:{}", id);
simpleSource.publishOrChange("delete", id);
}
最后在配置文件中加入消息隊列的配置:
# 省略了其他配置
spring:
cloud:
stream:
bindings:
output:
destination: orgChangeTopic
content-type: application/json
kafka:
binder:
# 替換為部署kafka的ip和端口
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
現在我們可以測試下訪問localhost:5555/apis/org/organization/12,可以看到控制台打印消息生成的日志。
在許可證服務中編寫消息消費者
首先引入依賴,依賴項同上面組織服務。
然后在 event 包下創建OrgChange
的類,代碼如下:
@EnableBinding(Sink.class) //使用Sink接口中定義的通道來監聽傳入消息
public class OrgChange {
private Logger logger = LoggerFactory.getLogger(OrgChange.class);
@StreamListener(Sink.INPUT)
public void loggerSink(OrganizationChange change){
logger.info("收到一個消息,組織id為:{},關聯id為:{}",change.getOrgId(),change.getId());
//刪除失效緩存
RedisUtils.del(RedisKeyUtils.getOrgCacheKey(change.getOrgId()));
}
}
//下面兩個都在util包下
//RedisKeyUtils.java代碼如下
public class RedisKeyUtils {
private static final String ORG_CACHE_PREFIX = "orgCache_";
public static String getOrgCacheKey(String orgId){
return ORG_CACHE_PREFIX+orgId;
}
}
//RedisUtils.java代碼如下
@Component
@SuppressWarnings("all")
public class RedisUtils {
public static RedisTemplate redisTemplate;
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
RedisUtils.redisTemplate = redisTemplate;
}
public static boolean setObj(String key,Object value){
return setObj(key,value,0);
}
/**
* Description:
*
* @author fanxb
* @date 2019/2/21 15:21
* @param key 鍵
* @param value 值
* @param time 過期時間,單位ms
* @return boolean 是否成功
*/
public static boolean setObj(String key,Object value,long time){
try{
if(time<=0){
redisTemplate.opsForValue().set(key,value);
}else{
redisTemplate.opsForValue().set(key,value,time,TimeUnit.MILLISECONDS);
}
return true;
}catch (Exception e){
e.printStackTrace();
return false;
}
}
public static Object get(String key){
if(key==null){
return null;
}
try{
Object obj = redisTemplate.opsForValue().get(key);
return obj;
}catch (Exception e){
e.printStackTrace();
return null;
}
}
public static void del(String... key){
if(key!=null && key.length>0){
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
上面用到的是 Sink.INPUT 通道,這個和之前的 Source.OUTPUT 通道剛好一隊,一個負責收,一個負責發。
然后修改OrganizationByRibbonService.java
文件中的getOrganizationWithRibbon
方法:
public Organization getOrganizationWithRibbon(String id) {
String key = RedisKeyUtils.getOrgCacheKey(id);
//先從redis緩存取數據
Object res = RedisUtils.get(key);
if (res == null) {
logger.info("當前數據無緩存:{}", id);
try{
ResponseEntity<Organization> responseEntity = restTemplate.exchange("http://organizationservice/organization/{id}",
HttpMethod.GET, null, Organization.class, id);
res = responseEntity.getBody();
RedisUtils.setObj(key, res);
}catch (Exception e){
e.printStackTrace();
}
} else {
logger.info("當前數據為緩存數據:{}", id);
}
return (Organization) res;
}
最后修改配置文件,為 input 通道指定 topic,配置如下:
spring:
cloud:
stream:
bindings:
input:
destination: orgChangeTopic
content-type: application/json
# 定義將要消費消息的消費者組的名稱
# 可能多個服務監聽同一個消息隊列。如果定義了消費者組,那么同組中只要有一個消費了消息,剩余的不會再次消費該消息,保證只有消息的
# 一個副本會被該組的某個實例所消費
group: licensingGroup
kafka:
binder:
zk-nodes: 192.168.226.5:2181
brokers: 192.168.226.5:9092
基本和發送的配置相同,只是這里是為input
通道映射隊列,然后還定義了一個組名,避免一個消息被重復消費。
現在來多次訪問localhost:5555/apis/licensingservice/licensingByRibbon/12,可以看到 licensingservice 控制台打印數據從緩存中讀取,如下所示:
然后再以 delete 訪問localhost:5555/apis/org/organization/12清除緩存,再次訪問 licensingservice 服務,結果如下:
自定義通道
上面用的是Spring Cloud Stream
自帶的 input/output 通道,那么要如何自定義通道呢?下面以自定義customInput/customOutput
通道為例。
自定義發數據通道
public interface CustomOutput {
@Output("customOutput")
MessageChannel out();
}
對於每個自定義的發數據通道,需使用@OutPut 注解標記的返回 MessageChannel 類的方法。
自定義收數據通道
public interface CustomInput {
@Input("customInput")
SubscribableChannel in();
}
同上,對應自定義的收數據通道,需要使用@Input 注解標記的返回 SubscribableChannel 類的方法。
結束
看完本篇你應該已經能夠在 Spring Cloud 中集成 Spring Cloud Stream 消息隊列了,貌似這個也能用到普通的 spring boot 項目中,比直接集成 mq 更加的優雅。
2019,Fighting!
本篇原創發布於:FleyX 的個人博客
本篇所用全部代碼:FleyX 的 github