在微服務架構的系統中, 我們通常會使用輕量級的消息代理來構建 一 個共用的消息主題讓系統中所有微服務實例都連接上來, 由於該主題中產生的消息會被所有實例監聽和消費, 所以我們稱它為消息總線。 在總線上的各個實例都可以方便地廣播 一 些需要讓其他連接在該主題上的實例都知道的消息, 例如配置信息的變更或者其他 一 些管理操作等。
由於消息總線在微服務架構系統中被廣泛使用, 所以它同配置中心 一 樣, 幾乎是微服務架構中的必備組件。 Spring Cloud 作為微服務架構綜合性的解決方案,對此自然也有自己的實現。 通過使用 Spring Cloud Bus,可以非常容易地搭建起消息總線, 同時實現了 一 些消息總線中的常用功能, 比如, 配合Spring Cloud Config 實現微服務應用配置信息的動態更新等。
消息代理:
消息代理 (Message Broker) 是 一 種消息驗證、 傳輸、 路由的架構模式。 它在應用程序之間起到通信調度並最小化應用之間的依賴的作用, 使得應用程序可以高效地解耦通信過程。 消息代理是 一 個中間件產品, 它的核心是 一 個消息的路由程序, 用來實現接收和分發消息, 並根據設定好的消息處理流來轉發給正確的應用。 它包括獨立的通信和消息傳遞協議, 能夠實現組織內部和組織間的網絡通信。 設計代理的目的就是為了能夠從應用程序中傳入消息, 並執行 一 些特別的操作, 下面這些是在企業應用中, 我們經常需要使用消息代理的場景:
- 將消息路由到 一 個或多個目的地。
- 消息轉化為其他的表現方式。
- 執行消息的聚集、 消息的分解, 並將結果發送到它們的目的地, 然后重新組合響應返回給消息用戶。
- 調用Web服務來檢索數據。
- 響應事件或錯誤。
- 使用發布-訂閱模式來提供內容或基於主題的消息路由。
RabbitMQ實現消息總線整合Spring Cloud Bus:
關於 RabbitMQ 的安裝及使用等請查閱 https://www.cnblogs.com/wuzhenzhao/category/1528254.html。
接下去我們就基於 spring-cloud-config-server分布式配置中心 的服務端及客戶端的Demo工程來整合消息總線。
1.將服務端及客戶端都注冊到Eureka集群中。
2.修改客戶端的依賴,添加以下依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
3.在配置文件中增加關於RabbitMQ的連接和用戶信息。
spring.rabbitmq.host = 192.168.1.101 spring.rabbitmq.username = guest spring.rabbitmq.password = guest spring.rabbitmq.port = 5672 eureka.client.service-url.defaultZone = http://localhost:7001/eureka/,http://localhost:7002/eureka/
# actuator management.endpoints.web.exposure.include = *
4.在配置的需要刷新的屬性對應的類上添加注解@RefreshScope
啟動 config-server, 再啟動兩個config客戶端。我們可以在 config客戶端中的控制台中看到如下內容, 在啟動時, 客戶端程序多了一個 /actuator/bus-refresh 請求。
- 先訪問兩個 config 客戶端的/hello請求, 會返回當前config的 foo屬性。
- 接着,修改GitHub上面config-test.properties中的foo屬性值,並發送POST請求到 localhost:8889/actuator/bus-refresh 。
- 最后, 再訪問兩個config 客戶端的/hello請求,此時均會返回最新的 foo屬性。
到這里, 我們已經能夠通過SpringCloud Bus來實時更新總線上的屬性配置了 。
原理分析:
我們通過使用Spring Cloud Bus與Spring Cloud Config的整合, 並以RabbitMQ作為消息代理, 實現了應用配置的動態更新 。假設當前的服務架構如下圖所示:
當我們將系統啟動起來之后, 圖中"Service A"的三個實例會請求ConfigServer以 獲取配置信息, ConfigServer根據應用配置的規則從Git倉庫中獲取配置信息並返回。此時,通過Git管理工具去倉庫中修改對應的屬性值,但是這個修改並不會觸發"Service A"實例的屬性更新。我們向"Service A"的實例3發送POST請求, 訪問 /actuator/bus-refresh接口。 此時,"Service A"的實例3就會將刷新請求發送到消息總線中, 該消息事件會被"Service A"的實例1和實例2從總線中獲取到, 並重新從 ConfigServer中獲取它們的配置信息, 從而實現配置信息的動態更新。而從Git倉庫中配置的修改到發起/actuator/bus-refresh的POST請求這 一 步可以通過Git倉庫的Web Hook來自動觸發。 由於所有連接到消息總線上的應用都會接收到更新請求,所以在 Web Hook中就不需要維護所有節點內容來進行更新, 從而解決了僅通過Web Hook來逐個進行刷新的問題。
指定刷新范圍:
在上面的例子中,我們通過向服務實例 請求SpringCloud Bus的/actuator/bus-refresh接口,從而觸發 總線上其他服務實例的/refresh。 但是在一 些特殊場景下, 我們希望可以刷新微服務中某個具體實例的配置。Spring Cloud Bus 對這種場景也有很好的支待,/actuator/bus-refresh 接口提供了一 個destinatin參數, 用來定位具體要 刷新的應用程序。在啟動信息中庫看到:
比如, 我們可以請 求/actuator/bus-refresh?destination = customers:9000, 此時總線上的各應用實例會根據destination屬性的值來判斷是否為自己的實例名, 若符合才進行配置刷新, 若不符合就忽略該消息。關於應用的實例名,它的默認命名按此規則生成: ${spring.cloud.client.hostname}:${spring.application.name}:${spring.application.instance-id:${server.port}}。destination參數除了可以定位具體的實例之外, 還可以用來定位具體的服務。 定位服務的原理是通過使用 Spring的PathMatecher (路徑匹配)來實現的, 比如/actuator/bus-refresh?destination = customers:**, 該請求會觸發 customers服務的所有實例進行刷新。
既然SpringCloud Bus的 /actuator/bus-refresh接口提供了針對服務和實例進行配置更新的參數, 那么我們的架構也可以相應做出一 些調整。 在之前的架構中, 服務的配置更新需要通過向具體服務中的某個實例發送請求, 再觸發對整個服務集群的配置更新。 雖然能實現功能, 但是這樣的結果是, 我們指定的應用實例會不同千集群中的其他應用實例, 這樣會增加集群內部的復雜度, 不利於將來的運維工作。為解決這類問題我們主要做了以下這些改動:
- 在ConfigServer中也引入SpringCloud Bus, 將配置服務端也加入到消息總線中來。
- /actuator/bus-refresh請求不再發送到具體服務實例上, 而是發送給Config Server, 並通過destination參數來指定需要更新配置的服務或實例。通過上面的改動,我們的服務實例不需要再承擔觸發配置更新的職責。 同時,對於Git的觸發等配置都只需要針對ConfigServer即可, 從而簡化了集群上的 一 些維護工作。
Kafka實現消息總線:
Spring Cloud Bus除了支持RabbitMQ的自動化配置之外, 還支持現在被廣泛應用的Kafka。關於kafka的安裝入門請參考:https://www.cnblogs.com/wuzhenzhao/category/1528269.html
1.若我們要使用Kafka來實現消息總線 時, 只需把spring-cloud-starter-bus-amqp替換成spring-cloud-starter-bus- kafka模塊,客戶端及服務端都需要添加此依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>
2.客戶換及服務端都增加以下配置:
#Kafka的服務端列表 localhost spring.cloud.stream.kafka.binder.brokers=192.168.1.101 #Kafka服務端的默認端口,當brokers屬性中沒有配置端口信息時,就會使用這個默認端口 9092 spring.cloud.stream.kafka.binder.defaultBrokerPort = 9092 #Kafka服務端連接的ZooKeeper節點列表 localhost spring.cloud.stream.kafka.binder.zkNodes = 192.168.1.101 #ZooKeeper節點的默認端口,當zkNodes屬性中沒有配置端口信息時,就會使用這個默認端口2181 spring.cloud.stream.kafka.binder.defaultZkPort =2181
3.啟動服務端/客戶端會看到如下信息,那么說明我們的配置沒問題:
4.查看 kafka的 topic:可以在 Kafka 中看到已經多了一 個名為springCloudBus 的 Topico
5.可以查看kafka消息存儲日志,只要我們向/actuator/bus-refresh請求一次,這里消息就會增加:
由於服務端跟客戶端啟動的時候我們可以獲取到兩者的 groupId,所以我們可以通過這個命令查看對應的用戶的消費偏移量。具體怎么查看可以參考 https://www.cnblogs.com/wuzhenzhao/p/10137490.html
sh /mysoft/kafka/bin/kafka-console-consumer.sh --topic __consumer_offsets --partition 40 --bootstrap-server 192.168.1.101:9092 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
sh kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic springCloudBus --from-beginning
啟動對springCloudBus的 消費者控制台來進行觀察。啟動消費者控制台之后,我們向config-server發送 POST請求: /bus-refresh,此時在控制台中可以看到類似如下的內容:
這個就是消息的具體內容。下面, 我們來詳細理解消息中的信息內容。
- type: 消息的事件類型。在上面的例子中,包含了RefreshRemoteApplicationEvent和AckRemoteApplicatuonEvent。其中,RefreshRemoteApplicationEvent事件就是我們用來刷新配置的事件,而AckRemoteApplicationEvent是響應消息已經正確接收的告知消息事件。
- timestamp: 消息的時間戳。
- originService: 消息的來源服務實例。
- destinationService: 消息的目標服務實例。 上面示例中的 "**"代表了總線上的所有服務實例。 如果想要指定服務或是實例, 在之前介紹RabbitMQ實現消息總線時已經提過, 只需要通過使用destination參數來定位具體要刷新的應用實例即可, 比如發起/actuator/bus-refresh?destination = c9onfig-client請求。
- id: 消息的唯 一 標識。
上面的消息內容是RefreshRemoteApplicationEvent和AckRemoteApplicationEvent類型共有的, 下面幾個屬性是AckRemoteApplicationEvent所特有的,分別表示如下含義。
- ackId:Ack消息對應的消息來源。我們可以看到第 一 條AckRemoteApplicationEvent的 ackid對應了 RefreshRemoteApplicationEvent的id, 說明這條Ack是告知該 RefreshRemoteApplicationEvent事件的消息已經被收到。
- ackDestinationService: Ack 消息的目標服務實例。 可以看到這里使用的是
- destinationService:** , 所以消息總線上所有的實例都會收到該Ack消息。
- event: Ack 消息的來源事件。 可以看到上例中的兩個 Ack均來源於刷新 配置的
- RefreshRemoteApplicationEvent 事件, 我們在測試的時候由於啟動了兩個config-client, 所以有兩個實例接收到了配置刷新事件, 同時它們都會返回 一 個Ack消息 。 由於ackDestinationService為 **, 所以兩個 config-client都會收到對RefreshRemoteApplicationEvent事件的Ack消息。
源碼分析:
通過上面的分析, 我們已經得到了兩個非常重要的線索RefreshRemoteApplicationEvent和AckRemoteApplicationEvent。 我們不妨順着這兩個事件類來詳細看看Spring Cloud Bus的源碼, 以幫助我們理解它的運行機制。順着RefreshRemoteApplicationEvent和AckRemoteApplicationEvent,我們可以整理出如下的事件關系類圖。
可以看到其中RefreshRemoteApplicationEvent和AckRemoteApplicationEvent 這些 我們已經接觸過的事件都繼承了 RemoteApplicationEvent抽象類, 而RemoteApplicationEvent繼承自SpringFramework的ApplicationEvent, 可以斷定, SpringCloud Bus也采用了 Spring的事件驅動模型。
Spring 的事件驅動模型中包含了三個基本概念: 事件、 事件監聽者和 事件發布者, 如下圖所示。
事件: Spring中定義了事件的抽象類ApplicationEvent, 它繼承自JDK的EventObject類。從圖中我們可以看到,事件包含了兩個成員變量:tirnestamp,該字段用千存儲事件發生的時間戳, 以及父類中的source , 該字段表示源事件對象。 當我們需要 自定義事件的 時候, 只需要繼承ApplicationEvent,比如RernoteApplicationEvent 、RefreshRernoteApplicationEvent等, 可以在自定義 的Event中增加 一 些事件的屬性來給事件監聽者處理。
事件監聽者: Spring 中定義了事件監聽者的接口 ApplicationListener, 它繼承自 JDK 的EventListener接口, 同時ApplicationListener接口限定了ApplicationEvent子類作為該接口中onApplicationEvent(E event); 函數 的參數。 所以, 每一 個 ApplicationListener 都是針對某個ApplicationEvent子類的監聽和處理者。
那么, 事件與監聽者是如何關聯起來 的呢?我們看下圖:
事件發布者: Spring中定義了ApplicationEventPublisher和ApplicationEventMulticaster兩個接口用來發布事件。其中ApplicationEventPublisher接口定義了發布事件的函數publishEvent(ApplicationEvent event)和publishEvent(Object event); 而ApplicationEventMulticaster接口中定義了對ApplicationListener的維護操作(比如新增、 移除等)以及將ApplicationEvent多播給可用ApplicationListener的操作。ApplicationEventPublisher的publishEvent實現在AbstractApplicationContext 中, 具體如下:
protected void publishEvent(Object event, @Nullable ResolvableType eventType) { Assert.notNull(event, "Event must not be null");// .........
if (this.earlyApplicationEvents != null) { this.earlyApplicationEvents.add(applicationEvent); } else { getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); } // ........ }
可以看到, 它最終會調用 ApplicationEventMulticaster的multicastEvent來具體實現發布事件給監聽者的操作。 而ApplicationEventMulticaster在Spring的默認實現位於SimpleApplicationEventMulticaster中, 具體如下:
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) { ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
// for (final ApplicationListener<?> listener : getApplicationListeners(event, type)) { Executor executor = getTaskExecutor(); if (executor != null) { executor.execute(() -> invokeListener(listener, event)); } else { invokeListener(listener, event); } } } private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) { try { listener.onApplicationEvent(event); } catch (ClassCastException ex) { String msg = ex.getMessage(); if (msg == null || matchesClassCastMessage(msg, event.getClass().getName())) { // Possibly a lambda-defined listener which we could not resolve the generic event type for // -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass()); if (logger.isDebugEnabled()) { logger.debug("Non-matching event type for listener: " + listener, ex); } } else { throw ex; } } }
SimpleApplicationEventMulticaster 通過遍歷維護的 ApplicationListener集合來找到對應 ApplicationEvent 的監聽器,然后調用監聽器的 onApplicationEvent函數來對具體事件做出處理操作。
Spring Cloud Bus中的事件定義:
在對 Spring 的事件模型有了一 定的理解之后, 下面我們來詳細介紹Spring Cloud Bus中的事件定義。 首先, 從 RemoteApplicationEvent抽象類開始:
//Jackson 對多態類型的處理注解, 當進行序列化時,會使用子類的名稱作為 type 屬性的值, 比如之前示例中的type": "RefreshRernoteApplicationEvent" 。
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") //序列化的時候忽略 source 屬性,source 是 ApplicationEvent 的父類 EventObject 的屬性, 用來定義事件的發生源。
@JsonIgnoreProperties("source") public abstract class RemoteApplicationEvent extends ApplicationEvent { private static final Object TRANSIENT_SOURCE = new Object(); private final String originService; private final String destinationService; private final String id; ...... }
來看看它的屬性: originService、 destinationService、 id, 這些內容都可以在 RemoteApplicationEvent 的子類事件消息中找到。再來分別看看 RemoteApplicationEvent 的幾個具體實現的事件類。
RefreshRemoteApplicationEvent 事件類, 該事件用於遠程刷新應用的配置信息。 它的實現非常簡單, 只是繼承了 RemoteApplicationEvent, 並沒有增加其他內容。從之前的示例中我們也能看到,消息中的內容與 RemoteApplicationEvent 中包含的屬性完全 一 致。
AckRemoteApplicationEvent 事件類,該事件用於告知某個事件消息已經被接收, 通過該消息我們可以監控各個事件消息的響應。 從其成員屬性中, 我們可以找到之前示例中所總結的, 比 RefreshRemoteApplicationEvent 事件的消息多出的幾個屬性: ackid、 ackDestinationService 以及 event 。 其中 event成員變量通過泛型限定了必須為 RemoteApplicationEvent 的子類對象, 該定義符合這樣的邏輯: Ack 消息肯定有一 個事件源頭, 而每 一 個事件都必須繼承RemoteApplicationEvent 抽象類,所以 AckRemoteApplicationEvent 的事件源頭肯定是 一 個 RemoteApplicationEvent 的子類, 比如示例中的 Ack 消息源頭就是 RemoteApplicationEvent 的子類事件: RefreshRemoteApplicationEvent 。
public class AckRemoteApplicationEvent extends RemoteApplicationEvent { private final String ackId; private final String ackDestinationService; private Class<? extends RemoteApplicationEvent> event; //......
}
EnvironmentChangeRemoteApplicationEvent事件類, 該事件用於動態更新消息總線上每個節點的 Spring 環境屬性。可以看到,該類中定義了一 個Map 類型的成員變量,而接收消息的節點就是根據該 Map 對象中的屬性來覆蓋本地的 Spring環境屬性。
public class EnvironmentChangeRemoteApplicationEvent extends RemoteApplicationEvent { private final Map<String, String> values; //......
}
SentApplicationEvent事件類,該類的結構和內容與 RemoteApplicationEvent 非常相似, 不同的是: 該類不是抽象 類, 並且多一個成員 Class<? extendsRemoteApplicationEvent> type 。 SentApplicatonEvent 事件較為特殊, 它主要用於發送信號來表示 一 個遠程的事件已經在系統中被發送到某些地方了, 從它的繼承關系中, 我們可以知道它本身並不是 一 個遠程的事件(不是繼承自 RemoteApplicationEvent), 所以它不會被發送到消息總線上去, 而是在本地產生(通常是由於響應了某個遠程的事件)。由於該事件的遠屬性能夠匹配消費者 AckRemoteApplicationEvent 消息中的ackid, 所以應用程序可以通過監聽這個事件來監控遠程事件消息的消費情況。
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") @JsonIgnoreProperties("source") public class SentApplicationEvent extends ApplicationEvent { private static final Object TRANSIENT_SOURCE = new Object(); private final String originService; private final String destinationService; private final String id; private Class<? extends RemoteApplicationEvent> type; //.......
}
Spring Cloud Bus中的事件監聽器:
了解了Spring Cloud Bus 中的事件類之后, 我們來看看另外 一 個重要元素: 事件監聽器。
其中, RefreshListener 和 EnvironmentChangeListener 都繼承了 Spring 事件模型中的監聽器接口 ApplicationListener
- RefreshListener :從泛型中我們可以看到該監聽器就是針對我們之前所介紹的 RefreshRemoteApplicationEvent 事件的, 其中 onApplicationEvent函 數中調用了ContextRefresher 中的 refresh ()函數進行配置屬性的刷新。
- EnvironmentChangeListener :針對 EnvironmentChangeRemoteApplicationEvent 事件的監聽類, 在處理類中, 可以看到它從 EnvironmentChangeRemoteApplicationEvent 中獲取了之前提到的事件中定義的 Map 對象,然后通過遍歷來更新 EnvironmentManager 中的屬性內容。
事件跟蹤:
還有 一 個與它們都有點不同的 TraceListener 監聽器。我們都可以看到該監聽器並沒有實現 ApplicationListener 接口, 但可以看到這里使用了 @EventListener 注解。 該注解是從 Spring 4.2 開始提供的新功能,通過它可以自動地將函數注冊為 一 個 ApplicationListener 的實現。所以在該類中, 實際上等價於實現了兩個監聽器,一 個監聽AckRemoteApplicationEvent事件, 一個監聽 SentApplicationEvent 事件。
public class TraceListener { private static Log log = LogFactory.getLog(TraceListener.class); private HttpTraceRepository repository; public TraceListener(HttpTraceRepository repository) { this.repository = repository; } @EventListener public void onAck(AckRemoteApplicationEvent event) { Map<String, Object> trace = getReceivedTrace(event); // FIXME boot 2 this.repository.add(trace);
} @EventListener public void onSend(SentApplicationEvent event) { Map<String, Object> trace = getSentTrace(event); // FIXME boot 2 this.repository.add(trace);
} protected Map<String, Object> getSentTrace(SentApplicationEvent event) { Map<String, Object> map = new LinkedHashMap<String, Object>(); map.put("signal", "spring.cloud.bus.sent"); map.put("type", event.getType().getSimpleName()); map.put("id", event.getId()); map.put("origin", event.getOriginService()); map.put("destination", event.getDestinationService()); if (log.isDebugEnabled()) { log.debug(map); } return map; } protected Map<String, Object> getReceivedTrace(AckRemoteApplicationEvent event) { Map<String, Object> map = new LinkedHashMap<String, Object>(); map.put("signal", "spring.cloud.bus.ack"); map.put("event", event.getEvent().getSimpleName()); map.put("id", event.getAckId()); map.put("origin", event.getOriginService()); map.put("destination", event.getAckDestinationService()); if (log.isDebugEnabled()) { log.debug(map); } return map; } }
這兩個函數會收集關於發送和接收到的 Ack 事件信息, 並且兩個函數獲得的內容就是事件定義相關的 一 些屬性, 看到這里大家是否感覺似曾相識?是的, 這些信息與之前我們通過 Kafka 的控制台工具獲取的消息內容非常類似。既然Spring Cloud Bus已經提供了 Trace 跟蹤信息的監聽和記錄, 我們不妨嘗試使用一 下。 要開啟該功能非常簡單,只需在配置文件中將下面的屬性設置為 true 即可:
spring.cloud.bus.trace.enabled=true
然后請求對應的 /actuator/trace 接口就會獲取到kafka上消息的類似信息。
Spring Cloud Bus中的事件發布:
通過上面的分析, 我們已經了解了 Spring Cloud Bus 中事件以及監聽器的定義, 下面來看看這些事件是如何發布給監聽器進行處理的。我們可以從 Spring Cloud Bus 的自動化配置類中看看它在啟動的時候都加載了什么內容:
BusAutoConfiguration 類是關鍵:
@Configuration @ConditionalOnBusEnabled @EnableBinding(SpringCloudBusClient.class) @EnableConfigurationProperties(BusProperties.class) public class BusAutoConfiguration implements ApplicationEventPublisherAware { public static final String BUS_PATH_MATCHER_NAME = "busPathMatcher"; @Autowired @Output(SpringCloudBusClient.OUTPUT) //定義了發送消息的抽象方法。
private MessageChannel cloudBusOutboundChannel; //該對象中提供了下面兩個重要函數, 用來判斷事件的來源服務是否為自己, 以及判斷目標是否為自己, 以此作為依據是否要響應消息進行事件的處理。
@Autowired private ServiceMatcher serviceMatcher; //定義了消息服務的綁定屬性。
@Autowired private BindingServiceProperties bindings;
//該對象定義了Spring Cloud Bus的屬性
@Autowired private BusProperties bus;
//Spring事件模型中用來發布事件的接口
private ApplicationEventPublisher applicationEventPublisher; //.....
}
除了定義的這些成員變量之外, 還能看到這里定義了兩個監聽方法 acceptLocal 和 acceptRemote。
其中, acceptLocal方法如下所示。在其實現中並非所有的RemoteApplicationEvent事件都會處理。 根據 if 中的條件, 可以 看到在該監聽 處理中,只對事件來源是自己並且事件 類型不是AckRemoteApplicationEvent的內容進行后續的處理, 而后續的處理就是通過消息管道 將該事件 發送出去。 所以, 該 監聽器的功能就是監聽本地事件來進行消息的發送。
//可以將該函數理解為對 RemoteApplicationEvent事件的監聽器
@EventListener(classes = RemoteApplicationEvent.class) public void acceptLocal(RemoteApplicationEvent event) { if (this.serviceMatcher.isFromSelf(event) && !(event instanceof AckRemoteApplicationEvent)) { this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build()); } }
再來看看acceptRemote方法。 該方法中使用了@StreamListener注解修飾, 該注解的作用是將該函數注冊為消息代理上數據流的事件 監聽器, 注解中的屬性值SpringCloudBusClient.INPUT指定了監聽的通道名。 同時, 回頭看該函數所在類的定義,使用 了@EnableBinding注解, 該注解用來實現與消息代理的連接,注解中的 屬性值SpringCloudBusClient.class聲明了輸入和輸出通道的定義(這部分內容源自Spring Cloud Stream這里我們只需理解它用來綁定消息代理的輸入與輸出, 以 實現向消息總線上發送和接收消息即可)。
@StreamListener(SpringCloudBusClient.INPUT) public void acceptRemote(RemoteApplicationEvent event) { if (event instanceof AckRemoteApplicationEvent) { if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) { this.applicationEventPublisher.publishEvent(event); } // If it's an ACK we are finished processing at this point
return; } if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) { if (!this.serviceMatcher.isFromSelf(event)) { this.applicationEventPublisher.publishEvent(event); } if (this.bus.getAck().isEnabled()) { AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this, this.serviceMatcher.getServiceId(), this.bus.getAck().getDestinationService(), event.getDestinationService(), event.getId(), event.getClass()); this.cloudBusOutboundChannel .send(MessageBuilder.withPayload(ack).build()); this.applicationEventPublisher.publishEvent(ack); } } if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) { // We are set to register sent events so publish it for local consumption, // irrespective of the origin
this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(), event.getId(), event.getClass())); } }
通過上面的分析,我們已經可以知道 Spring Cloud Bus 通過 acceptRemote 方法來監聽消息代理的輸入通道, 並根據事件類型和配置內容來確定是否要發布事件給我們之前分析的幾個事件監聽器來對事件做具體的處理;而 acceptLocal 方法用來監聽本地的事件,針對事件來源是自己,並且事件類型不是 AckRemoteApplicationEvent的內容通過消息代理的輸出通道發送到總線上去。
控制端點:
在介紹了 Spring Cloud Bus 中實現的事件模型之后, 我們已經知道每個節點是如何響應消息總線上的事件了。 那么這些發送到消息總線上用來觸發各個節點的事件處理的動作是如何實現的呢?回想 一 下之前在實現配置屬性刷新時, 我們在修改了 Git 倉庫上的配置信息之后,往總線上的某個節點發送了 一 個請求 /actuator/bus-refresh 來觸發總線上的所有節點進行配置刷新。在 org.springframework.cloud.bus. endpoint包下的 RefreshBusEndpoint 和 EnvironmentBusEndpoint 分別創建了兩個控制端點: /actuator/bus-refresh 和 /actuator/bus-env。 通過整理 org.springframework.cloud.bus.endpoint 包下的內容, 我們可以得到如下類圖:
springCloud Bus中的Endpoint也是通過spring-boot-actuator模塊來實現的 。我們需要添加以下配置開啟端點
# actuator
management.endpoints.web.exposure.include = *