Liferay7 BPM門戶開發之21: 理解消息總線(Message Bus)體系


Liferay Message Bus提供了松耦合的消息發送接收機制(生產和消費的設計模式),用於本地服務,不支持遠程服務,支持集群。

主要用途:

  • 兩個或多個插件之間的通訊。
  • 在事件中發送搜索索引,比如傳遞工作流的實例索引。
  • 發送訂閱郵件或系統消息,比如在工作流中的待辦到達時,給用戶發送提醒消息。
  • 定時任務消息發送,比如在工作流中的定時任務啟動時,給用戶發送提醒消息。
  • 運行異步過程


主要分幾個組件

  • Message Bus: 管理消息發送;
  • Destinations: 目標終端地址,用於注冊監聽者接收消息的地址;可以把它想象為郵件地址,或XMPP服務里的To JID;
  • Listeners: 消息接收者,即Receivers;
  • Senders: 發送者;

一個服務,即可以是接收者,也可以同時為發送者。

消息分為兩種:

  • 1、同步消息,線程阻塞的消息,要求在一定時間內必須處理消息相應,否則拋出異常;
  • 2、異步消息,非阻塞,發送者可以指定兩種消息,要求call-back和單向消息(Send-and-Forget,字面意思也很直觀,就是發送然后忘了它)

 

配置文件:

  • WEB-INF/src/META-INF/messaging-spring.xml: 指定destinations、listeners、mappings關系;
  • WEB-INF/web.xml: 添加 messaging-spring.xml 到這個文件進行注冊;

 

同步消息

同步消息很簡單,就是發送-->接收一條流程。
首先要確定Destination (在messaging-spring.xml中配置)


現在以一個直觀例子解釋,比如要開一個演唱會,需要做一個Tasks portlet project,處理開唱前的各項准備工作,比如燈光、音響、升降機等設備的安裝。
這個project有2類角色,一類是負責接收(發自Task的任務)的跑腿安裝者(Setup),一類是負責管理任務的人(Task),用來給Setup下安裝指令;

這兩類人都具有Receivers、Senders功能;


比如Task通知Setup:去裝座椅(這時Task是Sender,Setup是Receivers)
Setup馬上收到消息開始干活,過了一段時間,座椅安裝好了,Setup回消息給Task:座椅安裝完畢!(這時Setup是Sender,Task是Receivers)
相對應的,需要先確定2個Destination Key:

不同角色對應的Destination

Destination Key Sender Receivers
tour/roadie/setup Tasks  Setup
tour/manager/task Setup  Tasks

 

消息發送處理


在我們的應用portlet中有TasksPortlet.java,在_updateTask 方法處理任務新建,(由Task新建任務)同時, 在添加新任務的時候發送消息(給Setup)

首先添加引用:
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBusException;
import com.liferay.portal.kernel.messaging.MessageBusUtil;

過程:

//1、New 一個 Message
Message message = new Message();

//2、通過key/value pairs寫入消息
message.put("name", name);
message.put("description", description);
message.put("status", status);

//3、設置response ID、response destination
//這是發送者給接收者指定的回發ID和回發響應地址
//可以想象一下,在發送電子郵件的時候,你的@地址是from發送者,對方接收后回復郵件給你,你的@地址就是收件人地址(to地址),
message.setResponseId("1111");
message.setResponseDestinationName("tour/manager/task");

//4、發送消息,10秒超時
//一旦超時,即拋出MessageBusException
try {
String roadieResponse = (String) MessageBusUtil.sendSynchronousMessage("tour/roadie/setup", message, 10000);
} catch (MessageBusException e) {
e.printStackTrace();
}

 

消息接收

需要實現MessageListener接口,在SetupMessagingImpl.java中處理接收
首先添加引用
import com.liferay.portal.kernel.messaging.Message;
import com.liferay.portal.kernel.messaging.MessageBusUtil;
import com.liferay.portal.kernel.messaging.MessageListener;

過程:

//1、在receive(Message message)方法中獲取message
String name = (String) message.get("name");

//2、創建response Message對象(基於MessageBusUtil.createResponseMessage(message)方法)
// 添加負載,是一個object,用於告訴管理者,我已經收到消息了
//(至於啥時候干完了,然后再通知管理者目前狀態:我已經干完了,就又要另外想辦法了,這種情況就不適合使用同步而是要用異步消息了)。
Message responseMessage = MessageBusUtil.createResponseMessage(message);
responseMessage.setPayload("RECEIVED");

//3、發送響應消息
MessageBusUtil.sendMessage(responseMessage.getDestinationName(), responseMessage);

 

WEB-INF/src/META-INF/messaging-spring.xml設置

<?xml version="1.0"?>
<beans
default-destroy-method="destroy"
default-init-method="afterPropertiesSet"
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"
>
<!-- Listeners -->
<bean id="messageListener.setup_listener" class="com.tour.portlet.tasks.messaging.impl.SetupMessagingImpl" />
<!-- Destinations -->
<bean id="tour.roadie.setup" class="com.liferay.portal.kernel.messaging.SynchronousDestination">
<property name="name" value="tour/roadie/setup" />
</bean>
<bean id="tour.manager.task" class="com.liferay.portal.kernel.messaging.SynchronousDestination">
<property name="name" value="tour/manager/task" />
</bean>
<!-- Configurator -->
<bean id="messagingConfigurator" class="com.liferay.portal.kernel.messaging.config.PluginMessagingConfigurator">
<property name="messageListeners">
<map key-type="java.lang.String" value-type="java.util.List">
<entry key="tour/roadie/setup">
<list value-type="com.liferay.portal.kernel.messaging.MessageListener">
<ref bean="messageListener.setup_listener" />
</list>
</entry>
</map>
</property>
<property name="destinations">
<list>
<ref bean="tour.roadie.setup"/>
<ref bean="tour.manager.task"/>
</list>
</property>
</bean>
</beans>

 

web.xml設置

<listener>
<listener-class>com.liferay.portal.kernel.spring.context.PortletContextLoaderListener</listener-class>
</listener>

<context-param>
<param-name>portalContextConfigLocation</param-name>
<param-value>/WEB-INF/classes/META-INF/messaging-spring.xml</param-value>
</context-param>

 


異步消息


通過JSONObject作為消息體。


分為兩種:
要求call-back的消息:
https://dev.liferay.com/develop/tutorials/-/knowledge_base/6-2/asynchronous-messaging-with-callbacks

單向消息(Send-and-Forget,字面意思也很直觀,就是發送然后忘了它):
https://dev.liferay.com/develop/tutorials/-/knowledge_base/6-2/asynchronous-send-and-forget-messaging


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM