mosquitto简单应用


1. 简述

一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化,并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。

官网:

http://www.mosquitto.org/  

http://www.mosquitto.org/man/  

http://www.mosquitto.org/api/

https://mosquitto.org/man/mosquitto-8.html IBM的一个server(broker)示例用法说明

2. 函数

mosquitto结构体:

struct mosquitto;
struct mosquitto_message{
    int mid;
    char *topic;
    void *payload;
    int payloadlen;
    int qos;
    bool retain;
};

 mosquitto支持推送和订阅消息模式:

/* 
 * Function: mosquitto_publish
 *
 * Publish a message on a given topic.
 * 
 * Parameters:
 *  mosq -       a valid mosquitto instance.
 *  mid -        pointer to an int. If not NULL, the function will set this
 *               to the message id of this particular message. This can be then
 *               used with the publish callback to determine when the message
 *               has been sent.
 *               Note that although the MQTT protocol doesn't use message ids
 *               for messages with QoS=0, libmosquitto assigns them message ids
 *               so they can be tracked with this parameter.
 *  topic -      null terminated string of the topic to publish to.
 *  payloadlen - the size of the payload (bytes). Valid values are between 0 and
 *               268,435,455.
 *  payload -    pointer to the data to send. If payloadlen > 0 this must be a
 *               valid memory location.
 *  qos -        integer value 0, 1 or 2 indicating the Quality of Service to be
 *               used for the message.
 *  retain -     set to true to make the message retained.
 *
 * Returns:
 *  MOSQ_ERR_SUCCESS -      on success.
 *  MOSQ_ERR_INVAL -        if the input parameters were invalid.
 *  MOSQ_ERR_NOMEM -        if an out of memory condition occurred.
 *  MOSQ_ERR_NO_CONN -      if the client isn't connected to a broker.
 *  MOSQ_ERR_PROTOCOL -     if there is a protocol error communicating with the
 *                          broker.
 *  MOSQ_ERR_PAYLOAD_SIZE - if payloadlen is too large.
 *
 * See Also: 
 *  <mosquitto_max_inflight_messages_set>
 */
libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);
/*
 * Function: mosquitto_subscribe
 *
 * Subscribe to a topic.
 *
 * Parameters:
 *  mosq - a valid mosquitto instance.
 *  mid -  a pointer to an int. If not NULL, the function will set this to
 *         the message id of this particular message. This can be then used
 *         with the subscribe callback to determine when the message has been
 *         sent.
 *  sub -  the subscription pattern.
 *  qos -  the requested Quality of Service for this subscription.
 *
 * Returns:
 *  MOSQ_ERR_SUCCESS - on success.
 *  MOSQ_ERR_INVAL -   if the input parameters were invalid.
 *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred.
 *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
 */
libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos);

/*
 * Function: mosquitto_unsubscribe
 *
 * Unsubscribe from a topic.
 *
 * Parameters:
 *  mosq - a valid mosquitto instance.
 *  mid -  a pointer to an int. If not NULL, the function will set this to
 *         the message id of this particular message. This can be then used
 *         with the unsubscribe callback to determine when the message has been
 *         sent.
 *  sub -  the unsubscription pattern.
 *
 * Returns:
 *  MOSQ_ERR_SUCCESS - on success.
 *  MOSQ_ERR_INVAL -   if the input parameters were invalid.
 *  MOSQ_ERR_NOMEM -   if an out of memory condition occurred.
 *  MOSQ_ERR_NO_CONN - if the client isn't connected to a broker.
 */
libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub);

一般使用流程如下:

mosquitto_lib_init();
mosq=mosquitto_new();

mosquitto_connect_callback_set();  ----mosquitto_subscribe();
mosquitto_disconnect_callback_set();
mosquitto_message_callback_set();  ----接收解析消息 并推送mosquitto_publish()

mosquitto_username_pw_set(mosq, "user", "pw");
mosquitto_connect();
mosquitto_loop(mosq, timeout, 1);  // 需要不断循环判断, 也可根据需要永远运行:mosquitto_loop_forever(mosq, timeout, 1)

mosquitto_publish();

mosquitto_destroy(mosq);
mosquitto_lib_cleanup();

3. C应用

一般使用流程:

0. 依赖库安装

apt-get install openssl libssl-dev  uuid-dev

1. 编译安装
make 
make install
交叉编译:
CC=arm-linux-gnueabihf-gcc CXX=arm-linux-gnueabihf-g++ make WITH_SRV=no  WITH_UUID=no WITH_TLS=no WITH_DOCS=no WITH_WEBSOCKETS=no
2. 创建mosquitto用户 mosquitto默认以mosquitto用户启动 groupadd mosquitto useradd -g mosquitto mosquitto 3. 配置文件修改 根据需求修改配置文件/etc/mosquitto/mosquitto.conf
一般不修改直接可用,本机所有IP都可达,外部访问本机IP可达。
4. 启动 mosquitto -c /etc/mosquitto/mosquitto.conf -d 5. 测试 mosquitto_sub -t wang/ming mosquitto_pub -m "hello"
mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"
mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"  -u admin -P 48e848df75c08d4c0ba75bee
服务器需要配置密码,客户端不用修改可直接采用密码上报

一个可参考配置文件如下:

 # Port to use for the default listener.
#port 1883
port 1883

 # Websockets support is currently disabled by default at compile time.
 # Certificate based TLS may be used with websockets, except that
 # only the cafile, certfile, keyfile and ciphers options are supported.
#protocol mqtt
listener 9001
protocol websockets
listener 8883
protocol websockets

 #cafile
 #capath
 
certfile /etc/cert/server.crt
keyfile /etc/cert/server.key

 # For example, setting "secure-" here would mean a client "secure-
 # client" could connect but another with clientid "mqtt" couldn't.
#clientid_prefixes
clientid_prefixes ABC
 
 # Boolean value that determines whether clients that connect 
 # without providing a username are allowed to connect. If set to 
 # false then a password file should be created (see the 
 # password_file option) to control authenticated client access. 
 # Defaults to true.
#allow_anonymous true
allow_anonymous false
 
 # See the TLS client require_certificate and use_identity_as_username options
 # for alternative authentication options.
#password_file
password_file /etc/mosquitto/pwfile

mosquitto是一个server broker,可直接运行测试客户端。

manpage上一个示例:

#include <stdio.h>
#include <mosquitto.h>

void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("%s %s\n", message->topic, message->payload); }else{ printf("%s (null)\n", message->topic); } fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { int i; if(!result){ /* Subscribe to broker information topics on successful connect. */ mosquitto_subscribe(mosq, NULL, "$SYS/#", 2); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("%s\n", str); } int main(int argc, char *argv[]) { int i; char *host = "localhost"; int port = 1883; int keepalive = 60; bool clean_session = true; struct mosquitto *mosq = NULL; mosquitto_lib_init(); mosq = mosquitto_new(NULL, clean_session, NULL); if(!mosq){ fprintf(stderr, "Error: Out of memory.\n"); return 1; } mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if(mosquitto_connect(mosq, host, port, keepalive)){ fprintf(stderr, "Unable to connect.\n"); return 1; } mosquitto_loop_forever(mosq, -1, 1); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }

 自测程序

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <mosquitto.h>

char peerid[] = "wangq";
char host[] = "127.0.0.1";
int port = 1883;
int keepalive = 60;
bool clean_session = true;
struct mosquitto *mosq = NULL;
pthread_t pmosid = 0;


void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
    if(message->payloadlen){
        printf("====>recv:%s %s\n", message->topic, message->payload);
    }else{
        printf("%s (null)\n", message->topic);
    }

    mosquitto_publish(mosq, NULL, "wang/result", sizeof("loveresult"), "loveresult", 2, false);
    sleep(2);

    fflush(stdout);
}

void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
    if(!result){
        /* Subscribe to broker information topics on successful connect. */
        //mosquitto_subscribe(mosq, NULL, "$SYS/broker/uptime", 2);
        mosquitto_subscribe(mosq, NULL, "wang/hua", 1);
    }else{
        fprintf(stderr, "Connect failed\n");
    }
}

void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
    int i;

    printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
    for(i=1; i<qos_count; i++){
        printf(", %d", granted_qos[i]);
    }
    printf("\n");
}

void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
    /* Pring all log messages regardless of level. */
    printf("====>log:%s\n", str);
}

void mos_init()
{
    mosquitto_lib_init();
    mosq = mosquitto_new(peerid, clean_session, NULL);
    if(!mosq){
        fprintf(stderr, "Error: Out of memory.\n");
        exit(-1);
    }
    mosquitto_log_callback_set(mosq, my_log_callback);
    mosquitto_connect_callback_set(mosq, my_connect_callback);
    mosquitto_message_callback_set(mosq, my_message_callback);
    mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
    mosquitto_will_set(mosq,"xiao/ming", sizeof("livewill"), "livewill", 2, false);
    mosquitto_threaded_set(mosq, 1);
}

void * pthread_mos(void *arg)
{
    int toserver = -1;
    int timeout = 0;
    
    while(toserver){
        toserver = mosquitto_connect(mosq, host, port, keepalive);
        if(toserver){
            timeout++;
            fprintf(stderr, "Unable to connect server [%d] times.\n", timeout);
            if(timeout > 3){
                fprintf(stderr, "Unable to connect server, exit.\n" );
                pthread_exit(NULL);
            }
            sleep(10);
        }
    }

    mosquitto_loop_forever(mosq, -1, 1);

    mosquitto_disconnect(mosq);
    mosquitto_destroy(mosq);
    mosquitto_lib_cleanup();

    pthread_exit(NULL);
}

int main(int argc, char *argv[])
{
    int ret = 0;

    mos_init();

    ret = pthread_create(&pmosid, NULL, pthread_mos, NULL);
    if(ret != 0){
        printf("create pthread mos error.\n");
        exit(-1);
    }
    pthread_detach(pmosid);

    while(1){
        mosquitto_publish(mosq, NULL, "wang/ming", sizeof("love"), "love", 2, false);
        sleep(2);
    }

/*    
    mosquitto_loop_forever(mosq, -1, 1);

    while(1){
        mosquitto_publish(mosq, NULL, "wang/qin/hua", sizeof("love"), "love", 2, false);
        sleep(2);
    }
*/
    return 0;
}

 运行结果:

#./a.out
====>log:Client wangq sending CONNECT
====>log:Client wangq received CONNACK (0)
====>log:Client wangq sending SUBSCRIBE (Mid: 2, Topic: wang/hua, QoS: 1)
====>log:Client wangq sending PUBLISH (d1, q2, r0, m1, 'wang/ming', ... (5 bytes))
====>log:Client wangq received SUBACK
Subscribed (mid: 2): 1
====>log:Client wangq received PUBREC (Mid: 1)
====>log:Client wangq sending PUBREL (Mid: 1)
====>log:Client wangq received PUBCOMP (Mid: 1)
====>log:Client wangq sending PUBLISH (d0, q2, r0, m3, 'wang/ming', ... (5 bytes))
====>log:Client wangq received PUBREC (Mid: 3)
====>log:Client wangq sending PUBREL (Mid: 3)
====>log:Client wangq received PUBCOMP (Mid: 3)

订阅:

#mosquitto_sub -t wang/ming
love
love

will订阅:

#mosquitto_sub -t xiao/ming 
livewill

4. Golang应用

golang中采用库:github.com/eclipse/paho.mqtt.golang 

阿里的MQTT服务需要tls认证,可参考:Paho-MQTT Go接入示例,有完整的参考示例:

    // set the login broker url
    var raw_broker bytes.Buffer
    raw_broker.WriteString("tls://")
    raw_broker.WriteString(productKey)
    raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883")
    opts := MQTT.NewClientOptions().AddBroker(raw_broker.String());

    // calculate the login auth info, and set it into the connection options
    auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp)
    opts.SetClientID(auth.mqttClientId)
    opts.SetUsername(auth.username)
    opts.SetPassword(auth.password)
    opts.SetKeepAlive(60 * 2 * time.Second)
    opts.SetDefaultPublishHandler(f)

    // set the tls configuration
    tlsconfig := NewTLSConfig()
    opts.SetTLSConfig(tlsconfig)

    // create and start a client using the above ClientOptions
    c := MQTT.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    fmt.Print("Connect aliyun IoT Cloud Sucess\n");

    // subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered
    if token := c.Subscribe(subTopic, 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }
    fmt.Print("Subscribe topic " + subTopic + " success\n");

    // publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update")
    for i := 0; i < 5; i++ {
        fmt.Println("publish msg:", i)
        text := fmt.Sprintf("ABC #%d", i)
        token := c.Publish(pubTopic, 0, false, text)
        fmt.Println("publish msg: ", text)
        token.Wait()
        time.Sleep(2 * time.Second)
    }

    // unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get")
    if token := c.Unsubscribe(subTopic);token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

    c.Disconnect(250)

// define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    fmt.Printf("TOPIC: %s\n", msg.Topic())
    fmt.Printf("MSG: %s\n", msg.Payload())
}

简单的通过用户名和密码使用:

     addr := fmt.Sprintf("tcp://%s:%d", m.MqttIp, m.MqttPort)   
     opts := MQTT.NewClientOptions().AddBroker(addr)
        ////设置用户名
        opts.SetUsername(m.UserName)
        opts.SetPassword(m.PassWord)
        opts.SetKeepAlive(180 * time.Second)
        opts.SetDefaultPublishHandler(f)
        opts.SetPingTimeout(180 * time.Second)
        opts.SetAutoReconnect(true)

        //生成客户端对象
        o := MQTT.NewClient(opts)
        if token := o.Connect(); token.Wait() && token.Error() != nil {
                log.Println(token.Error())
        }

        //连接mqtt后,发布消息体
        if token := o.Publish("DEVICE_INFO_REPORT", 2, false, data); token.Wait() && token.Error() != nil {
                log.Println(token.Error())
        }
        log.Println("TOPIC: DEVICE_INFO_REPORT")
        log.Printf("MSG: %s\n", string(data))
        o.Disconnect(250)

 

参考:

1. MQTT协议规范 阿里云物理网

2. Mosquitto安装及使用

3. MQTT协议及EMQ应用

4. GO之MQTT使用中文文档


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM