開源地址
https://github.com/jiejieTop/mqttclient
mqttclient
一個高性能、高穩定性的跨平台MQTT客戶端
一個高性能、高穩定性的跨平台MQTT客戶端,基於socket API之上開發,可以在嵌入式設備(FreeRTOS/LiteOS/RT-Thread/TencentOS tiny)、Linux、Windows、Mac上使用,擁有非常簡潔的API接口,以極少的資源實現QOS2的服務質量,並且無縫銜接了mbedtls加密庫。
優勢:
-
基於標准BSD socket之上開發,只要是兼容BSD socket的系統均可使用。
-
穩定:無論是
掉線重連
,丟包重發
,都是嚴格遵循MQTT協議標准
執行,除此之外對大數據量的測試無論是收是發,都是非常穩定(一次發送135K
數據,3秒一次),高頻測試也是非常穩定(7個主題同時收發,每秒一次,也就是1秒14個mqtt報文,服務質量QoS0、QoS1、QoS2都有)。因為作者以極少的資源設計了記錄機制
,對采用QoS1服務質量的報文必須保證到達一次,當發布的主題(qos1、qos2都適用)沒有被服務器收到時會自動重發,而對QoS2服務質量的報文保證有且只有處理一次(如果不相信它穩定性的同學可以自己去修改源碼,專門為QoS2服務質量去做測試,故意不回復PUBREC
包,讓服務器重發QoS2報文,且看看客戶端是否有且只有處理一次),而對於掉線重連的穩定性,這種則是基本操作了,沒啥好說的,在自動重連后還會自動重新訂閱主題,保證主題不會丟失,因此在測試中穩定性極好。 -
輕量級:整個代碼工程極其簡單,不使用mbedtls情況下,占用資源極少,作者曾使用esp8266模組與雲端通信,整個工程代碼消耗的RAM不足15k(包括系統占用的開銷,對數據的處理開銷,而此次還是未優化的情況下,還依舊完美保留了掉線重連的穩定性,但是對應qos1、qos2服務質量的報文則未做測試,因為STM32F103C8T6芯片資源實在是太少了,折騰不起)。
-
無縫銜接mbedtls加密傳輸,讓網絡傳輸更加安全,而且接口層完全不需要用戶理會,無論是否加密,mqttclient對用戶提供的API接口是沒有變化的,這就很好的兼容了一套代應用層的碼可以加密傳輸也可以不加密傳輸。
-
擁有極簡的API接口,總的來說,mqttclient的配置都有默認值,基本無需配置都能使用的,也可以隨意配置,對配置都有健壯性檢測,這樣子設計的API接口也是非常簡單。
-
有非常好的代碼風格與思想:整個代碼采用分層式設計,代碼實現采用異步處理的思想,降低耦合,提高性能,具體體現在什么地方呢?很簡單,目前市面上很多MQTT客戶端發布主題都是要阻塞等待ack,這是非常暴力的行為,阻塞當前線程等待服務器的應答,那如果我想要發送數據怎么辦,或者我要重復檢測數據怎么辦,你可能會說,指定阻塞時間等待,那如果網絡延遲,ack遲遲不來,我就白等了嗎,對於qos1、qos2的服務質量怎么辦,所以說這種還是要異步處理的思想,我發布主題,那我發布出去就好了,不需要等待,對於qos1、qos2服務質量的MQTT報文,如果服務器沒收到,那我重發就可以,這種重發也是異步的處理,完全不會阻塞當前線程。
-
MQTT協議支持主題通配符
“#”、“+”
。 -
訂閱的主題與消息處理完全分離,讓編程邏輯更加簡單易用,用戶無需理會錯綜復雜的邏輯關系。
-
mqttclient內部已實現保活處理機制,無需用戶過多關心理會,用戶只需專心處理應用功能即可。
-
無縫銜接salof:它是一個同步異步日志輸出框架,在空閑時候輸出對應的日志信息,也可以將信息寫入flash中保存,方便調試。
-
不對外產生依賴。
-
使用 paho mqtt 庫
整體框架
擁有非常明確的分層框架。
目前已實現了Linux、TencentOS tiny、FreeRTOS、RT-Thread平台(已做成軟件包,名字為kawaii-mqtt
),除此之外TencentOS tiny的AT框架亦可以使用(RAM消耗不足15K),並且穩定性極好!
平台 | 代碼位置 |
---|---|
Linux | https://github.com/jiejieTop/mqttclient |
TencentOS tiny | https://github.com/Tencent/TencentOS-tiny/tree/master/board/Fire_STM32F429 |
TencentOS tiny AT 框架 | https://github.com/jiejieTop/gokit3-board-mqttclient |
RT-Thread | https://github.com/jiejieTop/kawaii-mqtt |
FreeRTOS | https://github.com/jiejieTop/freertos-mqttclient |
版本
發布版本 | 描述 |
---|---|
[v1.0.0] | 初次發布,完成基本框架及其穩定性驗證 |
[v1.0.1] | 修復主動與服務器斷開連接時的邏輯處理 |
[v1.0.2] | 添加新特性——攔截器,修復一些小bug |
[v1.0.3] | 避免造成全局污染修改了log、list相關函數的命名 |
問題
歡迎以 GitHub Issues 的形式提交問題和bug報告
版權和許可
mqttclient 遵循 Apache License v2.0 開源協議。鼓勵代碼共享和尊重原作者的著作權,可以自由的使用、修改源代碼,也可以將修改后的代碼作為開源或閉源軟件發布,但必須保留原作者版權聲明。
linux平台下測試使用
安裝cmake:
sudo apt-get install cmake
配置
在mqttclient/test/test.c
文件中修改以下內容:
init_params.connect_params.network_params.network_ssl_params.ca_crt = test_ca_get(); /* CA證書 */
init_params.connect_params.network_params.addr = "xxxxxxx"; /* 服務器域名 */
init_params.connect_params.network_params.port = "8883"; /* 服務器端口號 */
init_params.connect_params.user_name = "xxxxxxx"; /* 用戶名 */
init_params.connect_params.password = "xxxxxxx"; /* 密碼 */
init_params.connect_params.client_id = "xxxxxxx"; /* 客戶端id */
mbedtls
默認打開mbedtls。
salof 全稱是:Synchronous Asynchronous Log Output Framework
(同步異步日志輸出框架),它是一個同步異步日志輸出框架,在空閑時候輸出對應的日志信息,並且該庫與mqttclient無縫銜接。
配置對應的日志輸出級別:
#define BASE_LEVEL (0)
#define ASSERT_LEVEL (BASE_LEVEL + 1) /* 日志輸出級別:斷言級別(非常高優先級) */
#define ERR_LEVEL (ASSERT_LEVEL + 1) /* 日志輸出級別:錯誤級別(高優先級) */
#define WARN_LEVEL (ERR_LEVEL + 1) /* 日志輸出級別:警告級別(中優先級) */
#define INFO_LEVEL (WARN_LEVEL + 1) /* 日志輸出級別:信息級別(低優先級) */
#define DEBUG_LEVEL (INFO_LEVEL + 1) /* 日志輸出級別:調試級別(更低優先級) */
#define LOG_LEVEL WARN_LEVEL /* 日志輸出級別 */
日志其他選項:
- 終端帶顏色
- 時間戳
- 標簽
mqttclient的配置
配置mqtt等待應答列表的最大值,對於qos1 qos2服務質量有要求的可以將其設置大一點,當然也必須資源跟得上,它主要是保證qos1 qos2的mqtt報文能准確到達服務器。
#define MQTT_ACK_HANDLER_NUM_MAX 64
選擇MQTT協議的版本,默認為4,表示使用MQTT 3.1.1版本,而3則表示為MQTT 3.1版本。
#define MQTT_VERSION 4 // 4 is mqtt 3.1.1
設置默認的保活時間,它主要是保證MQTT客戶端與服務器的保持活性連接,單位為 秒 ,比如MQTT客戶端與服務器100S沒有發送數據了,有沒有接收到數據,此時MQTT客戶端會發送一個ping包,確認一下這個會話是否存在,如果收到服務器的應答,那么說明這個會話還是存在的,可以隨時收發數據,而如果不存在了,就清除會話。
#define MQTT_KEEP_ALIVE_INTERVAL 100 // unit: second
默認的命令超時,它主要是用於socket讀寫超時,在MQTT初始化時可以指定:
#define MQTT_DEFAULT_CMD_TIMEOUT 4000
默認主題的長度,主題是支持通配符的,如果主題太長則會被截斷:
#define MQTT_TOPIC_LEN_MAX 64
默認的算法數據緩沖區的大小,如果要發送大量數據則修改大一些,在MQTT初始化時可以指定:
#define MQTT_DEFAULT_BUF_SIZE 1024
線程相關的配置,如線程棧,線程優先級,線程時間片等:
在linux環境下可以是不需要理會這些參數的,而在RTOS平台則需要配置,如果不使用mbedtls,線程棧2048字節已足夠,而使用mbedtls加密后,需要配置4096字節以上。
#define MQTT_THREAD_STACK_SIZE 2048 // 線程棧
#define MQTT_THREAD_PRIO 5 // 線程優先級
#define MQTT_THREAD_TICK 50 // 線程時間片
默認的重連時間間隔,當發生掉線時,會以這個時間間隔嘗試重連:
#define MQTT_RECONNECT_DEFAULT_DURATION 1000
其他不需要怎么配置的東西:
#define MQTT_MAX_PACKET_ID (0xFFFF - 1) // mqtt報文id
#define MQTT_MAX_CMD_TIMEOUT 20000 //最大的命令超時參數
#define MQTT_MIN_CMD_TIMEOUT 1000 //最小的命令超時參數
ps:以上參數基本不需要怎么配置的,直接用即可~
編譯 & 運行
./build.sh
運行build.sh
腳本后會在 ./build/bin/
目錄下生成可執行文件mqtt-client
,直接運行即可。
編譯成動態庫libmqttclient.so
./make-libmqttclient.sh
運行make-libmqttclient.sh
腳本后會在 ./libmqttclient/lib
目錄下生成一個動態庫文件libmqttclient.so
,並安裝到系統的/usr/lib
目錄下,相關頭文件已經拷貝到./libmqttclient/include
目錄下,編譯應用程序的時候只需要鏈接動態庫即可-lmqttclient
,動態庫的配置文件根據./test/mqtt_config.h
配置的。
設計思想
- 整體采用分層式設計,代碼實現采用異步設計方式,降低耦合。
- 消息的處理使用回調的方式處理:用戶指定
[訂閱的主題]
與指定[消息的處理函數]
- 不對外產生依賴
API
mqttclient
擁有非常簡潔的api
接口
int mqtt_keep_alive(mqtt_client_t* c);
int mqtt_init(mqtt_client_t* c, client_init_params_t* init);
int mqtt_release(mqtt_client_t* c);
int mqtt_connect(mqtt_client_t* c);
int mqtt_disconnect(mqtt_client_t* c);
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t msg_handler);
int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter);
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg);
int mqtt_list_subscribe_topic(mqtt_client_t* c);
int mqtt_set_interceptor_handler(mqtt_client_t* c, interceptor_handler_t handler);
核心
mqtt_client_t 結構
typedef struct mqtt_client {
unsigned short packet_id;
unsigned char ping_outstanding;
unsigned char ack_handler_number;
unsigned char *read_buf;
unsigned char *write_buf;
unsigned int cmd_timeout;
unsigned int read_buf_size;
unsigned int write_buf_size;
unsigned int reconnect_try_duration;
void *reconnect_date;
reconnect_handler_t reconnect_handler;
client_state_t client_state;
platform_mutex_t write_lock;
platform_mutex_t global_lock;
mqtt_list_t msg_handler_list;
mqtt_list_t ack_handler_list;
network_t *network;
platform_thread_t *thread;
platform_timer_t reconnect_timer;
platform_timer_t last_sent;
platform_timer_t last_received;
connect_params_t *connect_params;
interceptor_handler_t interceptor_handler;
} mqtt_client_t;
該結構主要維護以下內容:
- 讀寫數據緩沖區
read_buf、write_buf
- 命令超時時間
cmd_timeout
(主要是讀寫阻塞時間、等待響應的時間、重連等待時間) - 維護
ack
鏈表ack_handler_list
,這是異步實現的核心,所有等待響應的報文都會被掛載到這個鏈表上 - 維護消息處理列表
msg_handler_list
,這是mqtt
協議必須實現的內容,所有來自服務器的publish
報文都會被處理(前提是訂閱了對應的消息) - 維護一個網卡接口
network
- 維護一個內部線程
thread
,所有來自服務器的mqtt包都會在這里被處理! - 兩個定時器,分別是掉線重連定時器與保活定時器
reconnect_timer、last_sent、last_received
- 一些連接的參數
connect_params
mqttclient實現
以下是整個框架的實現方式,方便大家更容易理解mqttclient的代碼與設計思想,讓大家能夠修改源碼與使用,還可以提交pr或者issues,開源的世界期待各位大神的參與,感謝!
除此之外以下代碼的記錄機制
與其超時處理機制
是非常好的編程思想,大家有興趣一定要看源代碼!
初始化
int mqtt_init(mqtt_client_t* c, client_init_params_t* init)
主要是配置mqtt_client_t
結構的相關信息,如果沒有指定初始化參數,則系統會提供默認的參數。
但連接部分的參數則必須指定:
init_params.connect_params.network_params.addr = "[你的mqtt服務器IP地址或者是域名]";
init_params.connect_params.network_params.port = 1883; //端口號
init_params.connect_params.user_name = "jiejietop";
init_params.connect_params.password = "123456";
init_params.connect_params.client_id = "clientid";
mqtt_init(&client, &init_params);
連接服務器
int mqtt_connect(mqtt_client_t* c);
參數只有 mqtt_client_t
類型的指針,字符串類型的主題
(支持通配符"#" "+"),主題的服務質量
,以及收到報文的處理函數
,如不指定則有默認處理函數。連接服務器則是使用非異步的方式設計,因為必須等待連接上服務器才能進行下一步操作。
過程如下:
- 調用底層的連接函數連接上服務器:
c->network->connect(c->network);
- 序列化
mqtt
的CONNECT
報文並且發送
MQTTSerialize_connect(c->write_buf, c->write_buf_size, &connect_data)
mqtt_send_packet(c, len, &connect_timer)
- 等待來自服務器的
CONNACK
報文
mqtt_wait_packet(c, CONNACK, &connect_timer)
- 連接成功后創建一個內部線程
mqtt_yield_thread
,並在合適的時候啟動它:
platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK)
if (NULL != c->thread) {
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
platform_thread_startup(c->thread);
platform_thread_start(c->thread); /* start run mqtt thread */
}
- 而對於重連來說則不會重新創建線程,直接改變客戶端狀態為連接狀態即可:
mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
訂閱報文
int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
訂閱報文使用異步設計來實現的:
過程如下:
- 序列化訂閱報文並且發送給服務器
MQTTSerialize_subscribe(c->write_buf, c->write_buf_size, 0, mqtt_get_next_packet_id(c), 1, &topic, (int*)&qos)
mqtt_send_packet(c, len, &timer)
- 創建對應的消息處理節點,這個消息節點在收到服務器的
SUBACK
訂閱應答報文后會掛載到消息處理列表msg_handler_list
上
mqtt_msg_handler_create(topic_filter, qos, handler)
- 在發送了報文給服務器那就要等待服務器的響應了,先記錄這個等待
SUBACK
mqtt_ack_list_record(c, SUBACK, mqtt_get_next_packet_id(c), len, msg_handler)
取消訂閱
與訂閱報文的邏輯基本差不多的~
- 序列化訂閱報文並且發送給服務器
MQTTSerialize_unsubscribe(c->write_buf, c->write_buf_size, 0, packet_id, 1, &topic)
mqtt_send_packet(c, len, &timer)
- 創建對應的消息處理節點,這個消息節點在收到服務器的
UNSUBACK
取消訂閱應答報文后將消息處理列表msg_handler_list
上的已經訂閱的主題消息節點銷毀
mqtt_msg_handler_create((const char*)topic_filter, QOS0, NULL)
- 在發送了報文給服務器那就要等待服務器的響應了,先記錄這個等待
UNSUBACK
mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler)
發布報文
int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg)
參數只有 mqtt_client_t
類型的指針,字符串類型的主題
(支持通配符),要發布的消息(包括服務質量
、消息主體
)。
mqtt_message_t msg;
msg.qos = 2;
msg.payload = (void *) buf;
mqtt_publish(&client, "testtopic1", &msg);
核心思想都差不多,過程如下:
- 先序列化發布報文,然后發送到服務器
MQTTSerialize_publish(c->write_buf, c->write_buf_size, 0, msg->qos, msg->retained, msg->id,
topic, (unsigned char*)msg->payload, msg->payloadlen);
mqtt_send_packet(c, len, &timer)
- 對於QOS0的邏輯,不做任何處理,對於QOS1和QOS2的報文則需要記錄下來,在沒收到服務器應答的時候進行重發
if (QOS1 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBACK, mqtt_get_next_packet_id(c), len, NULL);
} else if (QOS2 == msg->qos) {
rc = mqtt_ack_list_record(c, PUBREC, mqtt_get_next_packet_id(c), len, NULL);
}
- 還有非常重要的一點,重發報文的MQTT報文頭部需要設置DUP標志位,這是MQTT協議的標准,因此,在重發的時候作者直接操作了報文的DUP標志位,因為修改DUP標志位的函數我沒有從MQTT庫中找到,所以我封裝了一個函數,這與LwIP中的交叉存取思想是一個道理,它假設我知道MQTT報文的所有操作,所以我可以操作它,這樣子可以提高很多效率:
mqtt_set_publish_dup(c,1); /* may resend this data, set the udp flag in advance */
內部線程
static void mqtt_yield_thread(void *arg)
主要是對mqtt_yield
函數的返回值做處理,比如在disconnect
的時候銷毀這個線程。
核心的處理函數
- 數據包的處理
mqtt_packet_handle
static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
對不同的包使用不一樣的處理:
switch (packet_type) {
case 0: /* timed out reading packet */
break;
case CONNACK:
break;
case PUBACK:
case PUBCOMP:
rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
break;
case SUBACK:
rc = mqtt_suback_packet_handle(c, timer);
break;
case UNSUBACK:
rc = mqtt_unsuback_packet_handle(c, timer);
break;
case PUBLISH:
rc = mqtt_publish_packet_handle(c, timer);
break;
case PUBREC:
case PUBREL:
rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
break;
case PINGRESP:
c->ping_outstanding = 0;
break;
default:
goto exit;
}
並且做保活的處理:
mqtt_keep_alive(c)
當發生超時后
if (platform_timer_is_expired(&c->last_sent) || platform_timer_is_expired(&c->last_received))
序列號一個心跳包並且發送給服務器
MQTTSerialize_pingreq(c->write_buf, c->write_buf_size);
mqtt_send_packet(c, len, &timer);
當再次發生超時后,表示與服務器的連接已斷開,需要重連的操作,設置客戶端狀態為斷開連接
mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
ack
鏈表的掃描,當收到服務器的報文時,對ack列表進行掃描操作
mqtt_ack_list_scan(c);
當超時后就銷毀ack鏈表節點:
mqtt_ack_handler_destroy(ack_handler);
當然下面這幾種報文則需要重發操作:(PUBACK 、PUBREC、 PUBREL 、PUBCOMP
,保證QOS1 QOS2的服務質量)
if ((ack_handler->type == PUBACK) || (ack_handler->type == PUBREC) || (ack_handler->type == PUBREL) || (ack_handler->type == PUBCOMP))
mqtt_ack_handler_resend(c, ack_handler);
- 保持活性的時間過去了,可能掉線了,需要重連操作
mqtt_try_reconnect(c);
重連成功后嘗試重新訂閱報文,保證恢復原始狀態~
mqtt_try_resubscribe(c)
發布應答與發布完成報文的處理
static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化報文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
- 取消對應的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
訂閱應答報文的處理
static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化報文
MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->read_buf, c->read_buf_size)
- 取消對應的ack記錄
mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
- 安裝對應的訂閱消息處理函數,如果是已存在的則不會安裝
mqtt_msg_handlers_install(c, msg_handler);
取消訂閱應答報文的處理
static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化報文
MQTTDeserialize_unsuback(&packet_id, c->read_buf, c->read_buf_size)
- 取消對應的ack記錄,並且獲取到已經訂閱的消息處理節點
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)
- 銷毀對應的訂閱消息處理函數
mqtt_msg_handler_destory(msg_handler);
來自服務器的發布報文的處理
static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化報文
MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
(unsigned char**)&msg.payload, (int*)&msg.payloadlen, c->read_buf, c->read_buf_size)
- 對於QOS0、QOS1的報文,直接去處理消息
mqtt_deliver_message(c, &topic_name, &msg);
- 對於QOS1的報文,還需要發送一個
PUBACK
應答報文給服務器
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBACK, 0, msg.id);
- 而對於QOS2的報文則需要發送
PUBREC
報文給服務器,除此之外還需要記錄PUBREL
到ack鏈表上,等待服務器的發布釋放報文,最后再去處理這個消息
MQTTSerialize_ack(c->write_buf, c->write_buf_size, PUBREC, 0, msg.id);
mqtt_ack_list_record(c, PUBREL, msg.id + 1, len, NULL)
mqtt_deliver_message(c, &topic_name, &msg);
說明:一旦注冊到ack列表上的報文,當具有重復的報文是不會重新被注冊的,它會通過
mqtt_ack_list_node_is_exist
函數判斷這個節點是否存在,主要是依賴等待響應的消息類型與msgid。
發布收到與發布釋放報文的處理
static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
- 反序列化報文
MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->read_buf, c->read_buf_size)
- 產生一個對應的應答報文
mqtt_publish_ack_packet(c, packet_id, packet_type);
- 取消對應的ack記錄
mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler)