物聯網網關開發:基於MQTT消息總線的設計過程(下)



道哥的第 022 篇原創

一、前言

在上一篇文章中物聯網網關開發:基於MQTT消息總線的設計過程(上),我們聊了在一個物聯網系統的網關中,如何利用 MQTT 消息總線,在嵌入式系統內部實現多個進程之間的相互通信問題。

這個通信模型的最大幾個優點是:

  1. 模塊之間解耦合;
  2. 各模塊之間可以並行開發;
  3. 把 TCP 鏈接和粘包問題交給消息總線處理,我們只需要處理業務層的東西;
  4. 調試方便;

以上只是描述了在一個嵌入式系統內部,進程之間的通信方式,那么網關如何與雲平台進行交互呢?

在上一篇文章中已經提到過:網關與雲平台之間的通信方式一般都是客戶指定的,就那么幾種(阿里雲、華為雲、騰訊雲、亞馬遜AWS平台)。一般都要求網關與雲平台之間處於長連接的狀態,這樣雲端的各種指令就可以隨時發送到網關。

這一篇文章,我們就來聊一聊這部分內容。

公眾號回復:mqtt,獲取示例代碼的網盤地址。

二、與雲平台之間的 MQTT 連接

目前的幾大物聯網雲平台,都提供了不同的接入方式。對於網關來說,應用最多的就是 MQTT 接入。

我們知道,MQTT 只是一個協議而已,不同的編程語言中都有實現,在 C 語言中也有好幾個實現。

在網關內部,運行着一個后台 deamon: MQTT Broker,其實就是 mosquitto 這個可執行程序,它充當着消息總線的功能。這里請大家注意:因為這個消息總線是運行在嵌入式系統的內部,接入總線的客戶端就是需要相互通信的那些進程。這些進程的數量是有限的,即使是一個比較復雜的系統,最多十幾個進程也就差不多了。因此,mosquitto 這個實現是完全可以支撐系統負載的

那么,如果在雲端部署一個 MQTT Broker,理論上是可以直接使用 mosquitto 這個實現來作為消息總線的,但是你要評估接入的客戶端(也就是網關)在一個什么樣的數量級,考慮到並發的問題,一定要做壓力測試。

對於后台開發,我的經驗不多,不敢(也不能)多言,誤導大家就罪過了。不過,對於一般的學習和測試來說,在雲端直接部署 mosquitto 作為消息總線,是沒有問題的。

三、Proc_Bridge 進程:外部和內部消息總線之間的橋接器

下面這張圖,說明了 Proc_Bridge 進程在這個模型中的作用:

  1. 從雲平台消息總線接收到的消息,需要轉發到內部的消息總線;
  2. 從內部消息總線接收到的消息,需要轉發到雲平台的消息總線;

如果用 mosquitto 來實現,應該如何來實現呢?

1. mosquitto 的 API 接口

mosquitto 這個實現是基於回調函數的機制來運行的,例如:

// 連接成功時的回調函數
void my_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
    // ...
}

// 連接失敗時的回調函數
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result)
{
    // ...
}

// 接收到消息時的回調函數
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
	// ..
}

int main()
{
    // 其他代碼
    // ...
    
    // 創建一個 mosquitto 對象
    struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL);
    
    // 注冊回調函數
    mosquitto_connect_callback_set(g_mosq, my_connect_callback);
	mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback);
	mosquitto_message_callback_set(g_mosq, my_message_callback);
	// 這里還有其他的回調函數設置
	
	// 開始連接到消息總線
	mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60);
	
	while(1)
	{
		int rc = mosquitto_loop(g_mosq, -1, 1);
		if (rc) {
			printf("mqtt_portal: mosquitto_loop rc = %d \n", rc);
			sleep(1);
			mosquitto_reconnect(g_mosq);
		}
	}

	mosquitto_destroy(g_mosq);
	mosquitto_lib_cleanup();
	return 0;
}

以上代碼就是一個 mosquitto 客戶端的最簡代碼了,使用回調函數的機制,讓程序的開發非常簡單。

mosquitto 把底層的細節問題都幫助我們處理了,只要我們注冊的函數被調用了,就說明發生了我們感興趣的事件

這樣的回調機制在各種開源軟件中使用的比較多,比如:glib 里的定時器、libevent通訊處理、libmodbus 里的數據處理、linux 內核中的驅動開發和定時器,都是這個套路,一通百通!

在網關中的每個進程,只需要添加上面這部分代碼,就可以掛載到消息總線上,從而可以與其它進程進行收發數據了。

2. 利用 UserData 指針,實現多個 MQTT 連接

上面的實例僅僅是連接到一個消息總線上,對於一個普通的進程來說,達到了通信的目的。

但是對於 Proc_Bridge 進程來說,還沒有達到目的,因為這個進程處於橋接的位置,需要同時連接到遠程和本地這兩個消息總線上。那么應該如何實現呢?

看一下 mosquitto_new 這個函數的簽名:

/*
 * obj - A user pointer that will be passed as an argument to any
 *      callbacks that are specified.
*/
libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);

最后一個參數的作用是:可以設置一個用戶自己的數據(作為指針傳入),那么 mosquitto 在回調我們的注冊的任何一個函數時,都會把這個指針傳入。因此,我們可以利用這個參數來區分這個連接是遠程連接?還是本地連接。

所以,我們可以定義一個結構體變量,把一個 MQTT 連接的所有信息都記錄在這里,然后注冊給 mosquitto。當 mosquitto 回調函數時,把這個結構體變量的指針回傳給我們,這樣就拿到了這個連接的所有數據,在某種程度上來說,這也是一種面向對象的思想。

// 從來表示一個 MQTT 連接的結構體
typedef struct{
	char *id;
	char *name;
	char *pw;
	char *host;
	int port;
	pthread_t tHandle;
	struct mosquitto *mosq;
	int mqtt_num;
}MQData;

完整的代碼已經放到網盤里了,為了讓你先從原理上看明白,我把關鍵幾個地方的代碼貼在這里:

// 分配結構體變量
MQData userData = (MQData *)malloc(sizeof(MQData));

// 設置屬於這里連接的參數: id, name 等等

// 創建 mosquitto 對象時,傳入 userData。
struct mosquitto *mosq = mosquitto_new(userData->id, true, userData);

// 在回調函數中,把 obj 指針前轉成 MQData 指針
static void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
	MQData *userData = (MQData *)obj;
	
	// 此時就可以根據 userData 指針中的內容分辨出這是哪一個鏈接了
}

另外一個問題:不知道你是否注意到示例中的 mosquitto_loop() 這個函數?這個函數需要放在 while 死循環中不停的調用,才能出發 mosuiqtto 內部的事件。(其實在 mosuiqtto 中,還提供了另一個簡化的函數 mosquitto_loop_forever)。

也就是說:在每個連接中,需要持續的觸發 mosquitto 底層的事件,才能讓消息系統順利的收發。因此,在示例代碼中,使用兩個線程分別連接到雲平台的總線和內部的總線。

四、總結

經過這兩篇文章,基本上把一個物聯網系統的網關中,最基本的通信模型聊完了,相當於是一個程序的骨架吧,剩下的事情就是處理業務層的細節問題了。

萬里長征,這才是第一步!

對於一個網關來說,還有其他更多的問題需要處理,比如:MQTT 連接的鑒權(用戶名+密碼,證書)、通信數據的序列化和反序列化、加密和解密等等,以后慢慢聊吧,希望我們一路前行!

公眾號回復:mqtt,獲取示例代碼的網盤地址。


【原創聲明】

轉載:歡迎轉載,但未經作者同意,必須保留此段聲明,必須在文章中給出原文連接。


不吹噓,不炒作,不浮誇,認真寫好每一篇文章!

歡迎 轉發、分享給身邊的技術朋友,道哥在此表示衷心的感謝!

推薦閱讀

我最喜歡的進程之間通信方式-消息總線
C語言指針-從底層原理到花式技巧,用圖文和代碼幫你講解透徹
一步步分析-如何用C實現面向對象編程
提高代碼逼格的利器:宏定義-從入門到放棄
原來gdb的底層調試原理這么簡單
利用C語言中的setjmp和longjmp,來實現異常捕獲和協程
關於加密、證書的那些事
深入LUA腳本語言,讓你徹底明白調試原理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM