基於ubuntu16的mqtt服務器(apache-apollo1.7.1)


感謝博客https://www.cnblogs.com/chenrunlin/p/5109028.html

需要環境:

java1.8

 

把文件通過finalshell扔到/usr/local目錄下

使用命令

# sudo tar -zxvf apache-apollo-1.7.1-unix-distro.tar.gz

然后

我配置了一下apollo_home  更改 /etc/profile

按exc鍵后輸入:wq保存

然后

# sudo -s

# source /etc/profile 

刷新配置

 

再進入/var/lib

# cd /var/lib/

新建Mybroker

# /usr/local/apache-apollo-1.7.1/bin/apollo create mybroker

再更改mybroker中/etc/apollo.xml

#vim /var/lib/mybroker/etc/apollo.xml

紅框中改為0.0.0.0

最后運行

#  nohup /var/lib/mybroker/bin/apollo-broker run

nohup是為了關閉ssh之后也能在后台運行

 

注意點:

1、可能有時候需要sh命令 比如說 sh apollo create mybroker

2、阿里雲必須開放apollo.xml中的端口,如61680.61681等。

3、是否成功通過ip+61680即可知道。

4、初始賬號密碼是admin,password,存放在etc目錄的users.properties下

5、注意nohup命令運行,不然可能斷開連接后自動關停服務

 

成功截圖:

 

 

附送一下測試Demo,需要的jar包是

org.eclipse.paho.client.mqttv3-1.2.0.jar

 

服務器端server:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
 * 
 * Title:Server
 * Description: 服務器向多個客戶端推送主題,即不同客戶端可向服務器訂閱相同主題*/
public class Server {
    public static final String HOST="tcp://IP地址:61613";
    public static final String TOPIC1="toclient/1";
    public static final String TOPIC2="toclient/2";
    public static final String clientid="server";
    
    public MqttClient client;
    public MqttTopic topic1;
    public MqttTopic topic2;
    public String userName = "admin";
    public String passWord = "password";
    
    public MqttMessage message;
    
    public void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(10);//設置超時時間
        options.setKeepAliveInterval(20);//設置
        try {
            client.setCallback(new PushCallback());
            client.connect(options);
            topic1 = client.getTopic(TOPIC1);
            topic2 = client.getTopic(TOPIC2);
        }catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
    public void publish(MqttTopic topic,MqttMessage message) throws MqttPersistenceException, MqttException {
        MqttDeliveryToken token = topic.publish(message);
        token.waitForCompletion();
        System.out.println("消息推送成功 !"+ token.isComplete());
    }
    public Server() throws MqttException {
        client = new MqttClient(HOST, clientid,new MemoryPersistence());
        connect();
    }
    
    public static void main(String[] args) throws MqttException {
       Server server = new Server();
       
       server.message= new MqttMessage();
       server.message.setQos(2);
       /*
        QoS 0,最多一次送達。也就是發出去就fire掉。
        QoS 1,至少一次送達。發出去之后必須等待ack,沒有ack,就要找時機重發
        QoS 2,准確一次送達。消息id將擁有一個簡單的生命周期。
        * */
       server.message.setRetained(true);
       server.message.setPayload("發送消息到Topic1".getBytes());
       server.publish(server.topic1, server.message);
       System.out.println(server.message.isRetained() + "----ratained狀態");
    }
}

監聽端Client:

import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class Client {
    public static final String HOST="tcp://IP地址:61613";
    public static final String TOPIC="toclient/1";
    private static final String clientid="clientid1";
    
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "admin";
    private String passWord = "password";
    
    private ScheduledExecutorService scheduler;
    
    private void start() {
        try {
            client = new MqttClient(HOST, clientid);
            options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
             // 設置超時時間 單位為秒  
            options.setConnectionTimeout(10);
            // 設置會話心跳時間 單位為秒 服務器會每隔1.5*20秒的時間向客戶端發送個消息判斷客戶端是否在線,但這個方法並沒有重連的機制  
            options.setKeepAliveInterval(20);
            
            client.setCallback(new PushCallback());
            
            MqttTopic topic = client.getTopic(TOPIC);
            options.setWill(topic, "close".getBytes(), 2, true);
            client.connect(options);
            int []Qos = {1};
            String [] topic1 = {TOPIC};
            client.subscribe(topic1,Qos);
        }catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
    }
    
    
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Client client = new Client();
        client.start();
    }

}

 

回調函數 PushCallback:

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
  
/**  
 * 發布消息的回調類  
 *   
 * 必須實現MqttCallback的接口並實現對應的相關接口方法CallBack 類將實現 MqttCallBack。  
 * 每個客戶機標識都需要一個回調實例。在此示例中,構造函數傳遞客戶機標識以另存為實例數據。
 * 在回調中,將它用來標識已經啟動了該回調的哪個實例。  
 * 必須在回調類中實現三個方法:  
 *   
 *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已經預訂的發布。  
 *   
 *  public void connectionLost(Throwable cause)在斷開連接時調用。  
 *   
 *  public void deliveryComplete(MqttDeliveryToken token))  
 *  接收到已經發布的 QoS 1 或 QoS 2 消息的傳遞令牌時調用。  
 *  由 MqttClient.connect 激活此回調。  
 *   
 */    
public class PushCallback implements MqttCallback {  
  
    public void connectionLost(Throwable cause) {
        System.out.println("連接斷開,可以重連!");
    }
 
    public void deliveryComplete(IMqttDeliveryToken token) {
        // TODO Auto-generated method stub
        System.out.println("deliveryComplete -- "+token.isComplete());
        
    }
 
    public void messageArrived(String topic, MqttMessage msg) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("接收消息主題 : "+topic);
        System.out.println("接收消息Qos: "+msg.getQos());
        System.out.println("接收消息 : "+ new String(msg.getPayload()));
        
    }
}

 

效果:

先運行client端。

再運行server端發送給訂閱者:

再次查看監聽端:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM