在對MQTT的學習過程中 一下的內容對我提供了幫助
https://www.runoob.com/w3cnote/mqtt-intro.html 對MQTT的入門級介紹 很基礎講解了什么是MQTT
https://mosquitto.org/api/files/mosquitto-h.html 這個網站記載了幾乎所有的mosquitto的接口 你想知道的函數接口他都有 函數簡介可能看不出來什么 點進去看一下詳細解釋還是很清晰的
以下的圖片能夠更加直觀地說明MQTT協議的通信方式
根據這兩張圖片談一下我對MQTT的了解:
在MQTT協議中 運行了broker的才算是真正的服務器 它掌控着所有消息的publish和subscribe 而他的客戶端程序可以有一下三重情況
1.運行了publisher的程序 做着publish的工作
2.運行了subscriber的程序 做着subscribe的工作
3.運行了subscriber和publisher的程序 既可以publish也可以subscribe 這並不意味着這樣的程序就是broker
這三種情況你可以根據自己實際的需要來確定自己的客戶端可以是哪一種
例如你的客戶端僅僅是需要收到來自broker的topic為"ycy"的消息然后解析處理 並不需要做任何的publish工作 那么你就可以選擇第2種客戶端
我的程序的功能是從雲端得到固定topic的消息之后就開始處理這個消息 處理完成后publish出去 由於我在Linux機上 我必須做一個仿broker來實現從雲端得到消息這一步
首先我的預想是這樣的
但是這樣就會導致我每次publish出去數據 我自己又會收到一份 這樣的話我又會再次掃描這個數據 以確定是不是需要解析處理 萬一的情況恰好是約定的需要解析的字符(這就誤會大了)
所以 在我這里沒有使用同一個topic 其實還有解決方法二(下文再說)
解決方法二:
就是在my_message_callback這個函數中過濾一下 將自己publish的topic過濾一下(在我下面的代碼中並沒有實現 只是提供這樣的思路 strcmp函數可以幫到很多 這個方法我另外試了一下 一定要注意多了一個'\n'字符 這是因為你輸入之后會按下回車發送 所以發送的字符中會帶有這個字符)
mqtt_server.c
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
#define TOPIC_NUM 3
bool session = true;
const static char* topic[TOPIC_NUM] =
{
"Gai爺:",
"ycy ",
"CCYY "
};
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if(message->payloadlen){
printf("%s %s", message->topic, (char *)message->payload);
}else{
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int i;
if(!result){
/* Subscribe to broker information topics on successful connect. */
mosquitto_subscribe(mosq, NULL, "CCYY ", 2);
}else{
fprintf(stderr, "Connect failed\n");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
/* Pring all log messages regardless of level. */
printf("%s\n", str);
}
int main()
{
struct mosquitto *mosq = NULL;
char buff[MSG_MAX_SIZE];
//libmosquitto 庫初始化
mosquitto_lib_init();
//創建mosquitto客戶端
mosq = mosquitto_new(NULL,session,NULL);
if(!mosq){
printf("create client failed..\n");
mosquitto_lib_cleanup();
return 1;
}
//設置回調函數,需要時可使用
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
//連接服務器
if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){
fprintf(stderr, "Unable to connect.\n");
return 1;
}
//開啟一個線程,在線程里不停的調用 mosquitto_loop() 來處理網絡信息
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS)
{
printf("mosquitto loop error\n");
return 1;
}
while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
{
/*發布消息*/
mosquitto_publish(mosq,NULL,"ycy ",strlen(buff)+1,buff,0,0);
memset(buff,0,sizeof(buff));
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
mqtt_client.c
#include <stdio.h>
#include <stdlib.h>
#include <mosquitto.h>
#include <string.h>
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
#define TOPIC_NUM 3
bool session = true;
const static char* topic[TOPIC_NUM] =
{
"Gai爺:",
"ycy ",
"CCYY "
};
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if(message->payloadlen){
printf("%s %s", message->topic, (char *)message->payload);
}else{
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int i;
if(!result){
/* Subscribe to broker information topics on successful connect. */
mosquitto_subscribe(mosq, NULL, "ycy ", 2);
}else{
fprintf(stderr, "Connect failed\n");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
/* Pring all log messages regardless of level. */
printf("%s\n", str);
}
int main()
{
struct mosquitto *mosq = NULL;
char buff[MSG_MAX_SIZE];
//libmosquitto 庫初始化
mosquitto_lib_init();
//創建mosquitto客戶端
mosq = mosquitto_new(NULL,session,NULL);
if(!mosq){
printf("create client failed..\n");
mosquitto_lib_cleanup();
return 1;
}
//設置回調函數,需要時可使用
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
//連接服務器
if(mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE)){
fprintf(stderr, "Unable to connect.\n");
return 1;
}
//開啟一個線程,在線程里不停的調用 mosquitto_loop() 來處理網絡信息
int loop = mosquitto_loop_start(mosq);
if(loop != MOSQ_ERR_SUCCESS)
{
printf("mosquitto loop error\n");
return 1;
}
while(fgets(buff, MSG_MAX_SIZE, stdin) != NULL)
{
/*發布消息*/
mosquitto_publish(mosq,NULL,"CCYY ",strlen(buff)+1,buff,0,0);
memset(buff,0,sizeof(buff));
}
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
makefile
#不是系統默認庫 要記得添加連接選項
all:Client
@echo ""
@echo "Start compiling......"
@echo ""
Client:Server
gcc -o Client mqtt_client.c -lmosquitto -lpthread
Server:
gcc -o Server mqtt_server.c -lmosquitto -lpthread
clean:
-rm Server Client
程序運行截圖如下:
Server
Client
以上