1 共享訂閱
多個客戶端訂閱了同一個主題,發布者發布主題時,每個客戶端都會同時收到這個主題的消息。在客戶端集群部署的場景下會出現消息重復處理的問題。
EMQ支持共享訂閱,多個客戶端訂閱了同一個主題,發布者發布主題時,只有其中一個客戶端接收到消息。
共享訂閱有兩種方式:
(1)共享訂閱:訂閱前綴$queue/
多個客戶端訂閱了$queue/topic,發布者發布到topic,則只有一個客戶端會接收到消息。
(2)分組訂閱:訂閱前綴$share/<group>/
多組客戶端訂閱了$queue/group1/topic、$queue/group2/topic...,發布者發布到topic,則消息會發布到每個group中,但是每個group中只有一個客戶端會接收到消息。
2 Java客戶端實現共享訂閱
開發時發現,使用eclipse paho java客戶端時,無法處理共享訂閱。訂閱$queue/topic能夠訂閱成功,並且跟蹤代碼能看到emq也把消息轉發到了客戶端,但是客戶端丟棄掉了。
解決方法就是重寫mqtt的回調函數,實現MqttCallback接口。
實現MqttCallback接口的代碼如下:
package com.emqtest.emqtest; import java.util.HashMap; import java.util.Map; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttTopic; public class SharedSubCallbackRouter implements MqttCallback { private Map<String, IMqttMessageListener> topicFilterListeners; public SharedSubCallbackRouter(Map<String, IMqttMessageListener> topicFilterListeners) { this.topicFilterListeners = topicFilterListeners; } public void addSubscriber(String topicFilter, IMqttMessageListener listener) { if (this.topicFilterListeners == null) { this.topicFilterListeners = new HashMap<>(); } this.topicFilterListeners.put(topicFilter, listener); } @Override public void connectionLost(Throwable cause) { } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { for (Map.Entry<String, IMqttMessageListener> listenerEntry : topicFilterListeners.entrySet()) { String topicFilter = listenerEntry.getKey(); if (isMatched(topicFilter, topic)) { listenerEntry.getValue().messageArrived(topic, message); } } } @Override public void deliveryComplete(IMqttDeliveryToken token) { } /** * Paho topic matcher does not work with shared subscription topic filter of emqttd * https://github.com/eclipse/paho.mqtt.java/issues/367#issuecomment-300100385 * <p> * http://emqtt.io/docs/v2/advanced.html#shared-subscription * * @param topicFilter the topicFilter for mqtt * @param topic the topic * @return boolean for matched */ private boolean isMatched(String topicFilter, String topic) { if (topicFilter.startsWith("$queue/")) { topicFilter = topicFilter.replaceFirst("\\$queue/", ""); } else if (topicFilter.startsWith("$share/")) { topicFilter = topicFilter.replaceFirst("\\$share/", ""); topicFilter = topicFilter.substring(topicFilter.indexOf('/')); } return MqttTopic.isMatched(topicFilter, topic); } }
創建emq連接代碼如下:
mqttClient = new MqttClient("tcp://localhost:1883", "MqttClient"); mqttClient.connect(); Map<String, IMqttMessageListener> listeners = new HashMap<>(); IMqttMessageListener emqListener = new EmqListener(); listeners.put("$queue/testmqtt", emqListener); mqttClient.setCallback(new SharedSubCallbackRouter(listeners)); mqttClient.subscribe("$queue/testmqtt", new EmqListener());
還要再寫一個實現IMqttMessageListener接口的Emq消息處理類:
@Component public class EmqListener implements IMqttMessageListener { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { try { System.out.println("topic: " + topic); } catch (Exception e) { e.printStackTrace(); } } }
參考鏈接:
1 emq的github上關於這個問題的討論:
https://github.com/emqx/emqx/issues/921#event-1023359646
2 網上有人給的一個解決方法示例代碼:
https://github.com/yogin16/paho-shared-sub-example
3 eclipse paho的github鏈接:
https://github.com/eclipse/paho.mqtt.java