細聊Spring Cloud Bus


細聊Spring Cloud Bus

Spring 事件驅動模型

因為Spring Cloud Bus的運行機制也是Spring事件驅動模型所以需要先了解相關知識點:
Alt text
上面圖中是Spring事件驅動模型的實現示意圖,以下再補充一些圖中未提現的實現細節:抽象類abstract class AbstractApplicationEventMulticaster中根據事件和事件類型獲取對應的觀察者的方法是:

	protected Collection<ApplicationListener<?>> getApplicationListeners(
			ApplicationEvent event, ResolvableType eventType)  

該方法內具體檢索監聽器(觀察者的方法)是:

private Collection<ApplicationListener<?>> retrieveApplicationListeners(
            ResolvableType eventType, @Nullable Class<?> sourceType, @Nullable ListenerRetriever retriever)
            
            .....
        // Add programmatically registered listeners, including ones coming
		// from ApplicationListenerDetector (singleton beans and inner beans).
		for (ApplicationListener<?> listener : listeners) {
			if (supportsEvent(listener, eventType, sourceType)) {
				if (retriever != null) {
					retriever.applicationListeners.add(listener);
				}
				allListeners.add(listener);
			}
		}
            .....

此方法內根據傳入參數事的件對象遍歷所有對應(訂閱)的監聽者,其中有個很重要的方法boolean supportsEvent,此方法用於判斷是否是訂閱的監聽者:

	protected boolean supportsEvent(
			ApplicationListener<?> listener, ResolvableType eventType, @Nullable Class<?> sourceType) {

		GenericApplicationListener smartListener = (listener instanceof GenericApplicationListener ?
				(GenericApplicationListener) listener : new GenericApplicationListenerAdapter(listener));
		return (smartListener.supportsEventType(eventType) && smartListener.supportsSourceType(sourceType));
	}

其中接口GenericApplicationListener和GenericApplicationListenerAdapter類都是為了定義或實現supportsEventType方法和supportsSourceType方法,通過這兩個方法確定是否是事件的監聽器(觀察者、訂閱者)。

public interface GenericApplicationListener extends ApplicationListener<ApplicationEvent>, Ordered {

	/**
	 * Determine whether this listener actually supports the given event type.
	 * @param eventType the event type (never {@code null})
	 */
	boolean supportsEventType(ResolvableType eventType);

	/**
	 * Determine whether this listener actually supports the given source type.
	 * <p>The default implementation always returns {@code true}.
	 * @param sourceType the source type, or {@code null} if no source
	 */
	default boolean supportsSourceType(@Nullable Class<?> sourceType) {
		return true;
	}

	/**
	 * Determine this listener's order in a set of listeners for the same event.
	 * <p>The default implementation returns {@link #LOWEST_PRECEDENCE}.
	 */
	@Override
	default int getOrder() {
		return LOWEST_PRECEDENCE;
	}

}

其中判斷發布事件的來源對象supportsSourceType方法默認就返回true,意味着如果不重寫這個接口方法,是否是訂閱事件的監聽器不以事件來源對象進行判斷,只根據事件類型進行篩選,該方法的具體實現可參考GenericApplicationListenerAdapter類包裝的supportsSourceType方法實現:

public boolean supportsSourceType(@Nullable Class<?> sourceType) {
		return !(this.delegate instanceof SmartApplicationListener) ||
				((SmartApplicationListener) this.delegate).supportsSourceType(sourceType);
	}

Spring Cloud Bus的事件、發布、訂閱

Spring Cloud Bus的事件都繼承於RemoteApplicationEvent類,RemoteApplicationEvent類繼承於Spring事件驅動模型的事件抽象類ApplicationEvent,也就說Spring Cloud Bus的事件、發布、訂閱也是基於Spring的事件驅動模型,例如Spring Cloud Bus的配置刷新事件RefreshRemoteApplicationEvent:

Alt text

同理訂閱事件也是標准的Spring事件驅動模型,例如配置刷新的監聽器源碼繼承了Spring事件驅動模型中的接口ApplicationListener<E extends ApplicationEvent>:

public class RefreshListener
		implements ApplicationListener<RefreshRemoteApplicationEvent> {

	private static Log log = LogFactory.getLog(RefreshListener.class);

	private ContextRefresher contextRefresher;

	public RefreshListener(ContextRefresher contextRefresher) {
		this.contextRefresher = contextRefresher;
	}

	@Override
	public void onApplicationEvent(RefreshRemoteApplicationEvent event) {
		Set<String> keys = this.contextRefresher.refresh();
		log.info("Received remote refresh request. Keys refreshed " + keys);
	}

}

在BusRefreshAutoConfiguration類中會將RefreshListener對象注冊到Spring的BeanFactory中(不把監聽器類注冊到Spring的BeanFactory中就無法利用Spring的事件驅動模型對刷新事件進行處理)。

	@Bean
	@ConditionalOnProperty(value = "spring.cloud.bus.refresh.enabled",matchIfMissing = true)
	@ConditionalOnBean(ContextRefresher.class)
	public RefreshListener refreshListener(ContextRefresher contextRefresher) {
		return new RefreshListener(contextRefresher);
	}

也可以使用@EventListener創建監聽器,例如TraceListener類:

	@EventListener
	public void onAck(AckRemoteApplicationEvent event) {
		Map<String, Object> trace = getReceivedTrace(event);
		// FIXME boot 2 this.repository.add(trace);
	}

	@EventListener
	public void onSend(SentApplicationEvent event) {
		Map<String, Object> trace = getSentTrace(event);
		// FIXME boot 2 this.repository.add(trace);
	}

發布事件也是利用應用程序上下文進行事件發布,比如配置刷新的實現代碼:

@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

	public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
		super(context, id);
	}

	@WriteOperation
	public void busRefreshWithDestination(@Selector String destination) { // TODO:
																			// document
																			// destination
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
	}

	@WriteOperation
	public void busRefresh() {
		publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
	}

}

注解@WriteOperation實現POST操作,@Endpoint結合management.endpoints.web.exposure.include=* 配置項可實現一個接入點,接入點的URL是:/actuator/bus-refresh
父類AbstractBusEndpoint內用應用程序上下文實現事件的發布:

public class AbstractBusEndpoint {

	private ApplicationEventPublisher context;

	private String appId;

	public AbstractBusEndpoint(ApplicationEventPublisher context, String appId) {
		this.context = context;
		this.appId = appId;
	}

	protected String getInstanceId() {
		return this.appId;
	}

	protected void publish(ApplicationEvent event) {
		this.context.publishEvent(event);
	}

}

Spring Cloud Bus的底層通訊實現(對使用者透明)

Spring Cloud Bus的底層通訊基礎是Spring Cloud Stream,定義發送總線事件和接收總線事件監聽器的類是BusAutoConfiguration(在網絡上發送和接收其他節點的事件消息),因為繼承了ApplicationEventPublisherAware所以該類也具備發布本地事件的功能(可以查詢Aware接口作用),發布網絡事件消息的方法是:

@EventListener(classes = RemoteApplicationEvent.class)
	public void acceptLocal(RemoteApplicationEvent event) {
		if (this.serviceMatcher.isFromSelf(event)
				&& !(event instanceof AckRemoteApplicationEvent)) {
			this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
		}
	}

如果監聽到RemoteApplicationEvent類事件,首先檢查是否是自己發布並且不是ACK事件,如果是自己發布的非ACK事件就在總線上發送這個事件消息。發送AckRemoteApplicationEvent(ACK事件)已經在接收其他節點發的事件消息時觸發了,所以這里不用管發送ACK事件的工作了。

接收事件消息:

@StreamListener(SpringCloudBusClient.INPUT)
	public void acceptRemote(RemoteApplicationEvent event) {
		if (event instanceof AckRemoteApplicationEvent) {
			if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event)
					&& this.applicationEventPublisher != null) {
				this.applicationEventPublisher.publishEvent(event);
			}
			// If it's an ACK we are finished processing at this point
			return;
		}
		if (this.serviceMatcher.isForSelf(event)
				&& this.applicationEventPublisher != null) {
			if (!this.serviceMatcher.isFromSelf(event)) {
				this.applicationEventPublisher.publishEvent(event);
			}
			if (this.bus.getAck().isEnabled()) {
				AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
						this.serviceMatcher.getServiceId(),
						this.bus.getAck().getDestinationService(),
						event.getDestinationService(), event.getId(), event.getClass());
				this.cloudBusOutboundChannel
						.send(MessageBuilder.withPayload(ack).build());
				this.applicationEventPublisher.publishEvent(ack);
			}
		}
		if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
			// We are set to register sent events so publish it for local consumption,
			// irrespective of the origin
			this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this,
					event.getOriginService(), event.getDestinationService(),
					event.getId(), event.getClass()));
		}
	}

接收到其他節點發來的事件消息后會將此事件發布到本地的應用程序上下文中(this.applicationEventPublisher),監聽此事件類型的訂閱者就會相應的進行處理。

兩個跟蹤事件AckRemoteApplicationEvent和SentApplicationEvent

從他們的繼承關系可以看出,AckRemoteApplicationEvent可以發送到其他網絡節點(繼承於RemoteApplicationEvent),SentApplicationEvent只是本地事件(繼承於ApplicationEvent),SentApplicationEvent事件可以顯示收到事件消息的類型,AckRemoteApplicationEvent事件只顯示收到事件消息的ID,TraceListener類負責監聽和記錄他們的內容(配置項要打開spring.cloud.bus.trace.enabled=true):

public class TraceListener {

	@EventListener
	public void onAck(AckRemoteApplicationEvent event) {
		Map<String, Object> trace = getReceivedTrace(event);
		// FIXME boot 2 this.repository.add(trace);
	}

	@EventListener
	public void onSend(SentApplicationEvent event) {
		Map<String, Object> trace = getSentTrace(event);
		// FIXME boot 2 this.repository.add(trace);
	}

	protected Map<String, Object> getSentTrace(SentApplicationEvent event) {
		.....
	}

	protected Map<String, Object> getReceivedTrace(AckRemoteApplicationEvent event) {
		.....
	}

}

在總線事件發送端和總線事件接收端日志的記錄流程如下:
Alt text

測試A應用和B應用進行“聊天”

首先准備環境:
創建3個項目:spring-cloud-bus-shared-library、spring-cloud-bus-a、spring-cloud-bus-b

  • spring-cloud-bus-shared-library:負責定義事件和監聽器還有配置類
  • spring-cloud-bus-a:扮演A應用負責引用shared-library並利用BUS發送消息給B應用(此消息實際為廣播消息)
  • spring-cloud-bus-b:扮演B應用負責引用shared-library並利用BUS回復A應用發來的消息(此消息非廣播消息)

spring-cloud-bus-shared-library的POM的依賴項:

<properties>
		<java.version>1.8</java.version>
		<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-bus-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>

刪除構建的Maven插件節點否則構建后其他項目引用不了(格式不對):

<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

啟動一個rabbitmq:

docker pull rabbitmq:3-management

docker run -d --hostname my-rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

application.properties配置定義:

spring.application.name=spring-cloud-bus-shared-library
server.port=9007
# 開啟消息跟蹤
spring.cloud.bus.trace.enabled=true
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

#顯示的暴露接入點
management.endpoints.web.exposure.include=*

spring-cloud-bus-a、spring-cloud-bus-b的配置信息除了spring.application.name和server.port不一樣,其他都是一樣的。

自定義一個聊天事件類:

/**
 * 聊天事件
 */
public class ChatRemoteApplicationEvent extends RemoteApplicationEvent {

	private String message;

	//for serializers
	private ChatRemoteApplicationEvent(){}

	public ChatRemoteApplicationEvent(Object source, String originService,
			String destinationService,String message){
		super(source, originService, destinationService);

		this.message = message;
	}

	public void setMessage(String message){
		this.message = message;
	}

	public String getMessage(){
		return this.message;
	}
}

自定義聊天事件監聽器:

/**
 * 聊天事件監聽
 */
public class ChatListener implements ApplicationListener<ChatRemoteApplicationEvent> {

	private static Log log = LogFactory.getLog(ChatListener.class);

	public ChatListener(){}

	@Override
	public void onApplicationEvent(ChatRemoteApplicationEvent event){
		log.info(String.format("應用%s對應用%s悄悄的說:\"%s\"",
				event.getOriginService(),
				event.getDestinationService(),
				event.getMessage()));
	}
}

配置類將監聽器注冊到BeanFactory中,並需要顯示的告訴Spring Cloud Bus我們有一個自定義事件:@RemoteApplicationEventScan(basePackageClasses=ChatRemoteApplicationEvent.class),否則BUS收到消息后無法識別事件類型。

@Configuration
@ConditionalOnClass(ChatListener.class)
@RemoteApplicationEventScan(basePackageClasses=ChatRemoteApplicationEvent.class)
public class BusChatConfiguration {

	@Bean
	public ChatListener ChatListener(){
		return new ChatListener();
	}
}

發布到本地Maven倉庫:

mvn install

spring-cloud-bus-a、spring-cloud-bus-b的POM依賴:

<properties>
		<java.version>1.8</java.version>
		<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-bus</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>com.bluersw</groupId>
			<artifactId>spring-cloud-bus-shared-library</artifactId>
			<version>0.0.1-SNAPSHOT</version>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

在spring-cloud-bus-a、spring-cloud-bus-b的啟動Main函數上增加@ComponentScan(value = "com.bluersw")注解,否則不會掃描引用spring-cloud-bus-shared-library項目的配置類(也就加載不了自定義的事件和監聽器類型)。

spring-cloud-bus-a:

@SpringBootApplication
@ComponentScan(value = "com.bluersw")
public class SpringCloudBusAApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringCloudBusAApplication.class, args);
	}

}

spring-cloud-bus-b:

@SpringBootApplication
@ComponentScan(value = "com.bluersw")
public class SpringCloudBusBApplication {

	public static void main(String[] args) {
		SpringApplication.run(SpringCloudBusBApplication.class, args);
	}

}

spring-cloud-bus-a發送消息給spring-cloud-bus-b(啟動spring-cloud-bus-a程序和spring-cloud-bus-b程序):

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringCloudBusAApplicationTests {

	@Autowired
	private ApplicationEventPublisher context;

	@Autowired
	private BusProperties bp;

	@Test
	public void AChat() {
		context.publishEvent(new ChatRemoteApplicationEvent(this,bp.getId(),null,"hi!B應用,我是A應用,。"));
	}

}

執行AChat()之后,spring-cloud-bus-b的控制台會輸出:
”應用spring-cloud-bus-a👎33b6374cba32e6a3e7e2c8e7631de8c0對應用**悄悄的說:"hi!B應用,我是A應用,。”,說明spring-cloud-bus-b收到了消息並正確解析和執行了事件處理函數,但這條消息是群發的,因為destinationService參數我們給的是個null,所有引用spring-cloud-bus-shared-library項目注冊監聽器的項目都可以收到此信息。

spring-cloud-bus-b回復消息給spring-cloud-bus-a:

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringCloudBusBApplicationTests {

	@Autowired
	private ApplicationEventPublisher context;

	@Autowired
	private BusProperties bp;

	@Test
	public void BChat() {
		context.publishEvent(new ChatRemoteApplicationEvent(this,bp.getId(),"spring-cloud-bus-a:9008","hi!我是B應用,這樣才能不被其他應用接收到。"));
	}
}

spring-cloud-bus-a是項目名稱,9008是spring-cloud-bus-a項目的端口號,指定了目標服務參數destinationService后,其他應用就接收不到這條消息了。執行BChat()之后,spring-cloud-bus-a控制台會顯示:
“應用spring-cloud-bus-b👎d577ac1ab28f0fc465a1e4700e7f538a對應用spring-cloud-bus-a:9008:**悄悄的說:"hi!我是B應用,這樣才能不被其他應用接收到。”
此消息現在只有spring-cloud-bus-a項目會接收到。

源碼

Github倉庫:https://github.com/sunweisheng/spring-cloud-example


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM