MQTT 客戶端源碼分析


參看:逍遙子_mosquitto源碼分析系列

參看:MQTT libmosquitto源碼分析

參看:Mosquitto學習筆記

一、目錄結構

首先我們還是來看一下 mosquitto-1.4.14 的源碼目錄結構

我們主要關注 client、lib、src 這三個目錄。其中 src 和 lib 目錄下主要放置 mosquitto 的實現代碼以及部分底層與網絡相關的操作,client 目錄主要為兩個客戶端程序的實現源碼。

我們主要就是來看看,這兩個客戶端的實現源碼。

二、SUB 客戶端源碼

首先我們先看 sub_client.c 

我們從 main 函數開始。
查看結構體:
結構體 struct mosq_config 主要為 MQTT 的配置信息
[cpp]  view plain  copy
 
  1. struct mosq_config {  
  2.     char *id;  
  3.     char *id_prefix;  
  4.     int protocol_version;  
  5.     int keepalive;  
  6.     char *host;  
  7.     int port;  
  8.     int qos;  
  9.     bool retain;  
  10.     int pub_mode; /* pub */  
  11.     char *file_input; /* pub */  
  12.     char *message; /* pub */  
  13.     long msglen; /* pub */  
  14.     char *topic; /* pub */  
  15.     char *bind_address;  
  16. #ifdef WITH_SRV  
  17.     bool use_srv;  
  18. #endif  
  19.     bool debug;  
  20.     bool quiet;  
  21.     unsigned int max_inflight;  
  22.     char *username;  
  23.     char *password;  
  24. char *will_topic;  
  25.     char *will_payload;  
  26.     long will_payloadlen;  
  27.     int will_qos;  
  28.     bool will_retain;  
  29. #ifdef WITH_TLS  
  30.     char *cafile;  
  31.     char *capath;  
  32.     char *certfile;  
  33.     char *keyfile;  
  34.     char *ciphers;  
  35.     bool insecure;  
  36.     char *tls_version;  
  37. #  ifdef WITH_TLS_PSK  
  38.     char *psk;  
  39.     char *psk_identity;  
  40. #  endif  
  41. #endif  
  42.     bool clean_session; /* sub */  
  43.     char **topics; /* sub */  
  44.     int topic_count; /* sub */  
  45.     bool no_retain; /* sub */  
  46.     char **filter_outs; /* sub */  
  47.     int filter_out_count; /* sub */  
  48. bool verbose; /* sub */  
  49.     bool eol; /* sub */  
  50.     int msg_count; /* sub */  
  51. #ifdef WITH_SOCKS  
  52.     char *socks5_host;  
  53.     int socks5_port;  
  54.     char *socks5_username;  
  55.     char *socks5_password;  
  56. #endif  
  57. };  
 
結構體 struct mosquito 主要用於保存一個客戶端連接的所有信息,例如用戶名、密碼、用戶ID、向該客戶端發送的消息等
[cpp]  view plain  copy
 
  1. struct mosquitto {  
  2.     mosq_sock_t sock;  
  3. #ifndef WITH_BROKER  
  4.     mosq_sock_t sockpairR, sockpairW;  
  5. #endif  
  6. #if defined(__GLIBC__) && defined(WITH_ADNS)  
  7.     struct gaicb *adns; /* For getaddrinfo_a */  
  8. #endif  
  9.     enum _mosquitto_protocol protocol;  
  10.     char *address;  
  11.     char *id;  
  12.     char *username;  
  13.     char *password;  
  14.     uint16_t keepalive;  
  15.     uint16_t last_mid;  
  16.     enum mosquitto_client_state state;  
  17.     time_t last_msg_in;  
  18.     time_t next_msg_out;  
  19.     time_t ping_t;  
  20.     struct _mosquitto_packet in_packet;  
  21.     struct _mosquitto_packet *current_out_packet;  
  22.     struct _mosquitto_packet *out_packet;  
  23.     struct mosquitto_message *will;  
  24. #ifdef WITH_TLS  
  25.     SSL *ssl;  
  26.     SSL_CTX *ssl_ctx;  
  27.     char *tls_cafile;  
  28.     char *tls_capath;  
  29.     char *tls_certfile;  
  30.     char *tls_keyfile;  
  31.     int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);  
  32.     char *tls_version;  
  33.     char *tls_ciphers;  
  34.     char *tls_psk;  
  35.     char *tls_psk_identity;  
  36.     int tls_cert_reqs;  
  37.     bool tls_insecure;  
  38. #endif  
  39.     bool want_write;  
  40.     bool want_connect;  
  41. #if defined(WITH_THREADING) && !defined(WITH_BROKER)  
  42.     pthread_mutex_t callback_mutex;  
  43.     pthread_mutex_t log_callback_mutex;  
  44.     pthread_mutex_t msgtime_mutex;  
  45.     pthread_mutex_t out_packet_mutex;  
  46.     pthread_mutex_t current_out_packet_mutex;  
  47.     pthread_mutex_t state_mutex;  
  48.     pthread_mutex_t in_message_mutex;  
  49.     pthread_mutex_t out_message_mutex;  
  50.     pthread_mutex_t mid_mutex;  
  51.     pthread_t thread_id;  
  52. #endif  
  53.     bool clean_session;  
  54. #ifdef WITH_BROKER  
  55.     bool is_dropping;  
  56.     bool is_bridge;  
  57.     struct _mqtt3_bridge *bridge;  
  58.     struct mosquitto_client_msg *msgs;  
  59.     struct mosquitto_client_msg *last_msg;  
  60.     int msg_count;  
  61.     int msg_count12;  
  62.     struct _mosquitto_acl_user *acl_list;  
  63.     struct _mqtt3_listener *listener;  
  64.     time_t disconnect_t;  
  65.     struct _mosquitto_packet *out_packet_last;  
  66.     struct _mosquitto_subhier **subs;  
  67.     int sub_count;  
  68.     int pollfd_index;  
  69. #  ifdef WITH_WEBSOCKETS  
  70. #    if defined(LWS_LIBRARY_VERSION_NUMBER)  
  71.     struct lws *wsi;  
  72. #    else  
  73.     struct libwebsocket_context *ws_context;  
  74.     struct libwebsocket *wsi;  
  75. #    endif  
  76. #  endif  
  77.     bool ws_want_write;  
  78. #else  
  79. #  ifdef WITH_SOCKS  
  80.     char *socks5_host;  
  81.     int socks5_port;  
  82.     char *socks5_username;  
  83.     char *socks5_password;  
  84. #  endif  
  85.     void *userdata;  
  86.     bool in_callback;  
  87.     unsigned int message_retry;  
  88.     time_t last_retry_check;  
  89.     struct mosquitto_message_all *in_messages;  
  90.     struct mosquitto_message_all *in_messages_last;  
  91.     struct mosquitto_message_all *out_messages;  
  92.     struct mosquitto_message_all *out_messages_last;  
  93.     void (*on_connect)(struct mosquitto *, void *userdata, int rc);  
  94.     void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);  
  95.     void (*on_publish)(struct mosquitto *, void *userdata, int mid);  
  96.     void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);  
  97.     void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);  
  98.     void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);  
  99.     void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);  
  100.     //void (*on_error)();  
  101.     char *host;  
  102.     int port;  
  103.     int in_queue_len;  
  104.     int out_queue_len;  
  105.     char *bind_address;  
  106.     unsigned int reconnect_delay;  
  107.     unsigned int reconnect_delay_max;  
  108.     bool reconnect_exponential_backoff;  
  109.     char threaded;  
  110.     struct _mosquitto_packet *out_packet_last;  
  111.     int inflight_messages;  
  112.     int max_inflight_messages;  
  113. #  ifdef WITH_SRV  
  114.     ares_channel achan;  
  115. #  endif  
  116. #endif  
  117.   
  118. #ifdef WITH_BROKER  
  119.     UT_hash_handle hh_id;  
  120.     UT_hash_handle hh_sock;  
  121.     struct mosquitto *for_free_next;  
  122. #endif  
  123. };  
 

client_config_load 客戶端配置負載
第二個參數,可選擇選擇是 PUB 還是 SUB
 
然后看到 init_config 函數


可以看到一些初始化配置
[cpp]  view plain  copy
 
  1. void init_config(struct mosq_config *cfg)  
  2. {  
  3.     memset(cfg, 0, sizeof(*cfg));  
  4.     cfg->port = 1883;  
  5.     cfg->max_inflight = 20;   
  6.     cfg->keepalive = 60;   
  7.     cfg->clean_session = true;  
  8.     cfg->eol = true;  
  9.     cfg->protocol_version = MQTT_PROTOCOL_V31;  
  10. }  
 
mosquitto_lib_init 初始化  (重點)

[html]  view plain  copy
 
  1. int mosquitto_lib_init(void)  
  2. {  
  3. #ifdef WIN32          
  4.     srand(GetTickCount());  
  5. #else                 
  6.     struct timeval tv;  
  7.   
  8.     gettimeofday(&tv, NULL);  
  9.     srand(tv.tv_sec*1000 + tv.tv_usec/1000);  
  10. #endif    
  11.   
  12.     _mosquitto_net_init();  
  13.   
  14.     return MOSQ_ERR_SUCCESS;  
  15. }  
這里有個時間戳函數 gettimeofday,參看:C語言再學習 -- 時間函數
所在文件 mosquitto-1.4.14/lib/mosquitto.c 所以說需要鏈接動態庫 libmosquitto.so.1
 

client_id_generate 生成客戶端 ID 
其實就是我們講MQTT服務器的時候,訂閱主題然后在服務器上多出的那一行信息。
里面的 mosqsub|2431-ubuntu 就是客戶端 ID。這個函數就是干這個。
[cpp]  view plain  copy
 
  1. 1502159601: New client connected from 127.0.0.1 as mosqsub|2431-ubuntu (c1, k60)    
 
mosquitto_new 新建一個 mosq。 (重點)

看了一下這個函數里面就是一些初始化的東西
然后可以看到它也是在 lib 目錄下定義的。所以說需要鏈接動態庫 libmosquitto.so.1。其他不用改。
 
client_ipts_set 各種設置。懶得看...
 
一些調試信息
以及訂閱回調設置 mosquitto_subscribe_callback_set  (重要)
 
連接回調設置,和信息回調設置 (重點)
這兩個函數都是在lib目錄下定義的。里面都是有互斥鎖的。
 
client_connect 客戶端連接
int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
[cpp]  view plain  copy
 
  1. {  
  2.     char err[1024];  
  3.     int rc;  
  4.   
  5. #ifdef WITH_SRV  
  6.     if(cfg->use_srv){  
  7.         rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);  
  8.     }else{  
  9.         rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);  
  10.     }  
  11. #else  
  12.     rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);  
  13. #endif  
  14.     if(rc>0){  
  15.         if(!cfg->quiet){  
  16.             if(rc == MOSQ_ERR_ERRNO){  
  17. #ifndef WIN32  
  18.                 strerror_r(errno, err, 1024);  
  19. #else  
  20.                 FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);  
  21. #endif  
  22.                 fprintf(stderr, "Error: %s\n", err);  
  23.             }else{  
  24.                 fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc));  
  25.             }  
  26.         }  
  27.         mosquitto_lib_cleanup();  
  28.         return rc;  
  29.     }  
  30.     return MOSQ_ERR_SUCCESS;  
  31. }  
可以看到里面又有幾個重要函數
mosquitto_connect_srv
mosquitto_connect_bind --> _mosquitto_connect_init
 
然后不斷回環,里面的參數自己看。
 
最后是 mosq 的銷毀和庫的關閉。 (重點)
 
到此結束!!

三、PUB 客戶端源碼

接下來來看 pub_client.c 有一些相同部分我就不再重復了。
 
client_config_load 客戶端配置負載
第二個參數,可選擇選擇是 PUB 還是 SUB


一些配置信息的比較
 
mosquitto_lib_init 初始化  (重點)

 

client_id_generate 生成客戶端 ID 
 
mosquitto_new 新建一個 mosq。 (重點)

 
調試信息這里面就沒有了訂閱回調設置 mosquitto_subscribe_callback_set
 
然后這里看這里,是有區別的。 (重點)
connect、disconnect、publish.  這些回調設置
 
client_ipts_set 各種設置。懶得看...
 
client_connect 客戶端連接
 
回環開始、結束
 
最重要的來了,do while循環里的發布內容  (重點)
這里的 qos 就是消息發布服務質量級別。
 
然后還有 retain 用於區分新老訂閱者
RETAIN標志位只用於 PUBLISH 消息,當服務器收到某個主題的 PUBLISH 消息時,如果RETAIN標志位為1,則表示服務在將該消息發送給所有的已訂閱該主題的訂閱者后(發送前服務器將RETAIN標志置為0),還需保持這條消息,當有新增的訂閱者時,再將這條消息發給新增的訂閱者;如果RETAIN標志位為0,則不保持消息,也不用發給新增的訂閱者。
目的:
1.將RETAIN標志位置為1,可使新的訂閱者收到之前保持的或上一個確定有效的消息。
2.區分新訂閱者(RETAIN標志為1)和老訂閱者(RETAIN標志為0)
 
源碼中這兩個參數的設置都是 0
 
最后是 mosq 的銷毀和庫的關閉。 (重點)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM