1. 简述
一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化,并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。
官网:
https://mosquitto.org/man/mosquitto-8.html IBM的一个server(broker)示例用法说明
2. 函数
mosquitto结构体:
struct mosquitto;
struct mosquitto_message{ int mid; char *topic; void *payload; int payloadlen; int qos; bool retain; };
mosquitto支持推送和订阅消息模式:
/* * Function: mosquitto_publish * * Publish a message on a given topic. * * Parameters: * mosq - a valid mosquitto instance. * mid - pointer to an int. If not NULL, the function will set this * to the message id of this particular message. This can be then * used with the publish callback to determine when the message * has been sent. * Note that although the MQTT protocol doesn't use message ids * for messages with QoS=0, libmosquitto assigns them message ids * so they can be tracked with this parameter. * topic - null terminated string of the topic to publish to. * payloadlen - the size of the payload (bytes). Valid values are between 0 and * 268,435,455. * payload - pointer to the data to send. If payloadlen > 0 this must be a * valid memory location. * qos - integer value 0, 1 or 2 indicating the Quality of Service to be * used for the message. * retain - set to true to make the message retained. * * Returns: * MOSQ_ERR_SUCCESS - on success. * MOSQ_ERR_INVAL - if the input parameters were invalid. * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. * MOSQ_ERR_PROTOCOL - if there is a protocol error communicating with the * broker. * MOSQ_ERR_PAYLOAD_SIZE - if payloadlen is too large. * * See Also: * <mosquitto_max_inflight_messages_set> */ libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);
/* * Function: mosquitto_subscribe * * Subscribe to a topic. * * Parameters: * mosq - a valid mosquitto instance. * mid - a pointer to an int. If not NULL, the function will set this to * the message id of this particular message. This can be then used * with the subscribe callback to determine when the message has been * sent. * sub - the subscription pattern. * qos - the requested Quality of Service for this subscription. * * Returns: * MOSQ_ERR_SUCCESS - on success. * MOSQ_ERR_INVAL - if the input parameters were invalid. * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. */ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos); /* * Function: mosquitto_unsubscribe * * Unsubscribe from a topic. * * Parameters: * mosq - a valid mosquitto instance. * mid - a pointer to an int. If not NULL, the function will set this to * the message id of this particular message. This can be then used * with the unsubscribe callback to determine when the message has been * sent. * sub - the unsubscription pattern. * * Returns: * MOSQ_ERR_SUCCESS - on success. * MOSQ_ERR_INVAL - if the input parameters were invalid. * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. */ libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub);
一般使用流程如下:
mosquitto_lib_init(); mosq=mosquitto_new(); mosquitto_connect_callback_set(); ----mosquitto_subscribe(); mosquitto_disconnect_callback_set(); mosquitto_message_callback_set(); ----接收解析消息 并推送mosquitto_publish() mosquitto_username_pw_set(mosq, "user", "pw"); mosquitto_connect(); mosquitto_loop(mosq, timeout, 1); // 需要不断循环判断, 也可根据需要永远运行:mosquitto_loop_forever(mosq, timeout, 1) mosquitto_publish(); mosquitto_destroy(mosq); mosquitto_lib_cleanup();
3. C应用
一般使用流程:
0. 依赖库安装
apt-get install openssl libssl-dev uuid-dev
1. 编译安装 make make install
交叉编译:
CC=arm-linux-gnueabihf-gcc CXX=arm-linux-gnueabihf-g++ make WITH_SRV=no WITH_UUID=no WITH_TLS=no WITH_DOCS=no WITH_WEBSOCKETS=no 2. 创建mosquitto用户 mosquitto默认以mosquitto用户启动 groupadd mosquitto useradd -g mosquitto mosquitto 3. 配置文件修改 根据需求修改配置文件/etc/mosquitto/mosquitto.conf
一般不修改直接可用,本机所有IP都可达,外部访问本机IP可达。 4. 启动 mosquitto -c /etc/mosquitto/mosquitto.conf -d 5. 测试 mosquitto_sub -t wang/ming mosquitto_pub -m "hello"
mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"
mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang" -u admin -P 48e848df75c08d4c0ba75bee
服务器需要配置密码,客户端不用修改可直接采用密码上报
一个可参考配置文件如下:
# Port to use for the default listener. #port 1883 port 1883 # Websockets support is currently disabled by default at compile time. # Certificate based TLS may be used with websockets, except that # only the cafile, certfile, keyfile and ciphers options are supported. #protocol mqtt listener 9001 protocol websockets listener 8883 protocol websockets #cafile #capath certfile /etc/cert/server.crt keyfile /etc/cert/server.key # For example, setting "secure-" here would mean a client "secure- # client" could connect but another with clientid "mqtt" couldn't. #clientid_prefixes clientid_prefixes ABC # Boolean value that determines whether clients that connect # without providing a username are allowed to connect. If set to # false then a password file should be created (see the # password_file option) to control authenticated client access. # Defaults to true. #allow_anonymous true allow_anonymous false # See the TLS client require_certificate and use_identity_as_username options # for alternative authentication options. #password_file password_file /etc/mosquitto/pwfile
mosquitto是一个server broker,可直接运行测试客户端。
manpage上一个示例:
#include <stdio.h>
#include <mosquitto.h>
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("%s %s\n", message->topic, message->payload); }else{ printf("%s (null)\n", message->topic); } fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { int i; if(!result){ /* Subscribe to broker information topics on successful connect. */ mosquitto_subscribe(mosq, NULL, "$SYS/#", 2); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("%s\n", str); } int main(int argc, char *argv[]) { int i; char *host = "localhost"; int port = 1883; int keepalive = 60; bool clean_session = true; struct mosquitto *mosq = NULL; mosquitto_lib_init(); mosq = mosquitto_new(NULL, clean_session, NULL); if(!mosq){ fprintf(stderr, "Error: Out of memory.\n"); return 1; } mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); if(mosquitto_connect(mosq, host, port, keepalive)){ fprintf(stderr, "Unable to connect.\n"); return 1; } mosquitto_loop_forever(mosq, -1, 1); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); return 0; }
自测程序
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #include <mosquitto.h> char peerid[] = "wangq"; char host[] = "127.0.0.1"; int port = 1883; int keepalive = 60; bool clean_session = true; struct mosquitto *mosq = NULL; pthread_t pmosid = 0; void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("====>recv:%s %s\n", message->topic, message->payload); }else{ printf("%s (null)\n", message->topic); } mosquitto_publish(mosq, NULL, "wang/result", sizeof("loveresult"), "loveresult", 2, false); sleep(2); fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { if(!result){ /* Subscribe to broker information topics on successful connect. */ //mosquitto_subscribe(mosq, NULL, "$SYS/broker/uptime", 2); mosquitto_subscribe(mosq, NULL, "wang/hua", 1); }else{ fprintf(stderr, "Connect failed\n"); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf("\n"); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("====>log:%s\n", str); } void mos_init() { mosquitto_lib_init(); mosq = mosquitto_new(peerid, clean_session, NULL); if(!mosq){ fprintf(stderr, "Error: Out of memory.\n"); exit(-1); } mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); mosquitto_will_set(mosq,"xiao/ming", sizeof("livewill"), "livewill", 2, false); mosquitto_threaded_set(mosq, 1); } void * pthread_mos(void *arg) { int toserver = -1; int timeout = 0; while(toserver){ toserver = mosquitto_connect(mosq, host, port, keepalive); if(toserver){ timeout++; fprintf(stderr, "Unable to connect server [%d] times.\n", timeout); if(timeout > 3){ fprintf(stderr, "Unable to connect server, exit.\n" ); pthread_exit(NULL); } sleep(10); } } mosquitto_loop_forever(mosq, -1, 1); mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); pthread_exit(NULL); } int main(int argc, char *argv[]) { int ret = 0; mos_init(); ret = pthread_create(&pmosid, NULL, pthread_mos, NULL); if(ret != 0){ printf("create pthread mos error.\n"); exit(-1); } pthread_detach(pmosid); while(1){ mosquitto_publish(mosq, NULL, "wang/ming", sizeof("love"), "love", 2, false); sleep(2); } /* mosquitto_loop_forever(mosq, -1, 1); while(1){ mosquitto_publish(mosq, NULL, "wang/qin/hua", sizeof("love"), "love", 2, false); sleep(2); } */ return 0; }
运行结果:
#./a.out ====>log:Client wangq sending CONNECT ====>log:Client wangq received CONNACK (0) ====>log:Client wangq sending SUBSCRIBE (Mid: 2, Topic: wang/hua, QoS: 1) ====>log:Client wangq sending PUBLISH (d1, q2, r0, m1, 'wang/ming', ... (5 bytes)) ====>log:Client wangq received SUBACK Subscribed (mid: 2): 1 ====>log:Client wangq received PUBREC (Mid: 1) ====>log:Client wangq sending PUBREL (Mid: 1) ====>log:Client wangq received PUBCOMP (Mid: 1) ====>log:Client wangq sending PUBLISH (d0, q2, r0, m3, 'wang/ming', ... (5 bytes)) ====>log:Client wangq received PUBREC (Mid: 3) ====>log:Client wangq sending PUBREL (Mid: 3) ====>log:Client wangq received PUBCOMP (Mid: 3)
订阅:
#mosquitto_sub -t wang/ming
love
love
will订阅:
#mosquitto_sub -t xiao/ming
livewill
4. Golang应用
golang中采用库:github.com/eclipse/paho.mqtt.golang
阿里的MQTT服务需要tls认证,可参考:Paho-MQTT Go接入示例,有完整的参考示例:
// set the login broker url var raw_broker bytes.Buffer raw_broker.WriteString("tls://") raw_broker.WriteString(productKey) raw_broker.WriteString(".iot-as-mqtt.cn-shanghai.aliyuncs.com:1883") opts := MQTT.NewClientOptions().AddBroker(raw_broker.String()); // calculate the login auth info, and set it into the connection options auth := calculate_sign(clientId, productKey, deviceName, deviceSecret, timeStamp) opts.SetClientID(auth.mqttClientId) opts.SetUsername(auth.username) opts.SetPassword(auth.password) opts.SetKeepAlive(60 * 2 * time.Second) opts.SetDefaultPublishHandler(f) // set the tls configuration tlsconfig := NewTLSConfig() opts.SetTLSConfig(tlsconfig) // create and start a client using the above ClientOptions c := MQTT.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { panic(token.Error()) } fmt.Print("Connect aliyun IoT Cloud Sucess\n"); // subscribe to subTopic("/a1Zd7n5yTt8/deng/user/get") and request messages to be delivered if token := c.Subscribe(subTopic, 0, nil); token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } fmt.Print("Subscribe topic " + subTopic + " success\n"); // publish 5 messages to pubTopic("/a1Zd7n5yTt8/deng/user/update") for i := 0; i < 5; i++ { fmt.Println("publish msg:", i) text := fmt.Sprintf("ABC #%d", i) token := c.Publish(pubTopic, 0, false, text) fmt.Println("publish msg: ", text) token.Wait() time.Sleep(2 * time.Second) } // unsubscribe from subTopic("/a1Zd7n5yTt8/deng/user/get") if token := c.Unsubscribe(subTopic);token.Wait() && token.Error() != nil { fmt.Println(token.Error()) os.Exit(1) } c.Disconnect(250) // define a function for the default message handler var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %s\n", msg.Payload()) }
简单的通过用户名和密码使用:
addr := fmt.Sprintf("tcp://%s:%d", m.MqttIp, m.MqttPort) opts := MQTT.NewClientOptions().AddBroker(addr) ////设置用户名 opts.SetUsername(m.UserName) opts.SetPassword(m.PassWord) opts.SetKeepAlive(180 * time.Second) opts.SetDefaultPublishHandler(f) opts.SetPingTimeout(180 * time.Second) opts.SetAutoReconnect(true) //生成客户端对象 o := MQTT.NewClient(opts) if token := o.Connect(); token.Wait() && token.Error() != nil { log.Println(token.Error()) } //连接mqtt后,发布消息体 if token := o.Publish("DEVICE_INFO_REPORT", 2, false, data); token.Wait() && token.Error() != nil { log.Println(token.Error()) } log.Println("TOPIC: DEVICE_INFO_REPORT") log.Printf("MSG: %s\n", string(data)) o.Disconnect(250)
参考:
3. MQTT协议及EMQ应用