一、目錄結構
首先我們還是來看一下 mosquitto-1.4.14 的源碼目錄結構
我們主要關注 client、lib、src 這三個目錄。其中 src 和 lib 目錄下主要放置 mosquitto 的實現代碼以及部分底層與網絡相關的操作,client 目錄主要為兩個客戶端程序的實現源碼。
我們主要就是來看看,這兩個客戶端的實現源碼。
二、SUB 客戶端源碼

首先我們先看 sub_client.c
我們從 main 函數開始。

查看結構體:
結構體 struct mosq_config 主要為 MQTT 的配置信息
- struct mosq_config {
- char *id;
- char *id_prefix;
- int protocol_version;
- int keepalive;
- char *host;
- int port;
- int qos;
- bool retain;
- int pub_mode; /* pub */
- char *file_input; /* pub */
- char *message; /* pub */
- long msglen; /* pub */
- char *topic; /* pub */
- char *bind_address;
- #ifdef WITH_SRV
- bool use_srv;
- #endif
- bool debug;
- bool quiet;
- unsigned int max_inflight;
- char *username;
- char *password;
- char *will_topic;
- char *will_payload;
- long will_payloadlen;
- int will_qos;
- bool will_retain;
- #ifdef WITH_TLS
- char *cafile;
- char *capath;
- char *certfile;
- char *keyfile;
- char *ciphers;
- bool insecure;
- char *tls_version;
- # ifdef WITH_TLS_PSK
- char *psk;
- char *psk_identity;
- # endif
- #endif
- bool clean_session; /* sub */
- char **topics; /* sub */
- int topic_count; /* sub */
- bool no_retain; /* sub */
- char **filter_outs; /* sub */
- int filter_out_count; /* sub */
- bool verbose; /* sub */
- bool eol; /* sub */
- int msg_count; /* sub */
- #ifdef WITH_SOCKS
- char *socks5_host;
- int socks5_port;
- char *socks5_username;
- char *socks5_password;
- #endif
- };
結構體 struct mosquito 主要用於保存一個客戶端連接的所有信息,例如用戶名、密碼、用戶ID、向該客戶端發送的消息等
- struct mosquitto {
- mosq_sock_t sock;
- #ifndef WITH_BROKER
- mosq_sock_t sockpairR, sockpairW;
- #endif
- #if defined(__GLIBC__) && defined(WITH_ADNS)
- struct gaicb *adns; /* For getaddrinfo_a */
- #endif
- enum _mosquitto_protocol protocol;
- char *address;
- char *id;
- char *username;
- char *password;
- uint16_t keepalive;
- uint16_t last_mid;
- enum mosquitto_client_state state;
- time_t last_msg_in;
- time_t next_msg_out;
- time_t ping_t;
- struct _mosquitto_packet in_packet;
- struct _mosquitto_packet *current_out_packet;
- struct _mosquitto_packet *out_packet;
- struct mosquitto_message *will;
- #ifdef WITH_TLS
- SSL *ssl;
- SSL_CTX *ssl_ctx;
- char *tls_cafile;
- char *tls_capath;
- char *tls_certfile;
- char *tls_keyfile;
- int (*tls_pw_callback)(char *buf, int size, int rwflag, void *userdata);
- char *tls_version;
- char *tls_ciphers;
- char *tls_psk;
- char *tls_psk_identity;
- int tls_cert_reqs;
- bool tls_insecure;
- #endif
- bool want_write;
- bool want_connect;
- #if defined(WITH_THREADING) && !defined(WITH_BROKER)
- pthread_mutex_t callback_mutex;
- pthread_mutex_t log_callback_mutex;
- pthread_mutex_t msgtime_mutex;
- pthread_mutex_t out_packet_mutex;
- pthread_mutex_t current_out_packet_mutex;
- pthread_mutex_t state_mutex;
- pthread_mutex_t in_message_mutex;
- pthread_mutex_t out_message_mutex;
- pthread_mutex_t mid_mutex;
- pthread_t thread_id;
- #endif
- bool clean_session;
- #ifdef WITH_BROKER
- bool is_dropping;
- bool is_bridge;
- struct _mqtt3_bridge *bridge;
- struct mosquitto_client_msg *msgs;
- struct mosquitto_client_msg *last_msg;
- int msg_count;
- int msg_count12;
- struct _mosquitto_acl_user *acl_list;
- struct _mqtt3_listener *listener;
- time_t disconnect_t;
- struct _mosquitto_packet *out_packet_last;
- struct _mosquitto_subhier **subs;
- int sub_count;
- int pollfd_index;
- # ifdef WITH_WEBSOCKETS
- # if defined(LWS_LIBRARY_VERSION_NUMBER)
- struct lws *wsi;
- # else
- struct libwebsocket_context *ws_context;
- struct libwebsocket *wsi;
- # endif
- # endif
- bool ws_want_write;
- #else
- # ifdef WITH_SOCKS
- char *socks5_host;
- int socks5_port;
- char *socks5_username;
- char *socks5_password;
- # endif
- void *userdata;
- bool in_callback;
- unsigned int message_retry;
- time_t last_retry_check;
- struct mosquitto_message_all *in_messages;
- struct mosquitto_message_all *in_messages_last;
- struct mosquitto_message_all *out_messages;
- struct mosquitto_message_all *out_messages_last;
- void (*on_connect)(struct mosquitto *, void *userdata, int rc);
- void (*on_disconnect)(struct mosquitto *, void *userdata, int rc);
- void (*on_publish)(struct mosquitto *, void *userdata, int mid);
- void (*on_message)(struct mosquitto *, void *userdata, const struct mosquitto_message *message);
- void (*on_subscribe)(struct mosquitto *, void *userdata, int mid, int qos_count, const int *granted_qos);
- void (*on_unsubscribe)(struct mosquitto *, void *userdata, int mid);
- void (*on_log)(struct mosquitto *, void *userdata, int level, const char *str);
- //void (*on_error)();
- char *host;
- int port;
- int in_queue_len;
- int out_queue_len;
- char *bind_address;
- unsigned int reconnect_delay;
- unsigned int reconnect_delay_max;
- bool reconnect_exponential_backoff;
- char threaded;
- struct _mosquitto_packet *out_packet_last;
- int inflight_messages;
- int max_inflight_messages;
- # ifdef WITH_SRV
- ares_channel achan;
- # endif
- #endif
- #ifdef WITH_BROKER
- UT_hash_handle hh_id;
- UT_hash_handle hh_sock;
- struct mosquitto *for_free_next;
- #endif
- };

client_config_load 客戶端配置負載
第二個參數,可選擇選擇是 PUB 還是 SUB
然后看到 init_config 函數

可以看到一些初始化配置
- void init_config(struct mosq_config *cfg)
- {
- memset(cfg, 0, sizeof(*cfg));
- cfg->port = 1883;
- cfg->max_inflight = 20;
- cfg->keepalive = 60;
- cfg->clean_session = true;
- cfg->eol = true;
- cfg->protocol_version = MQTT_PROTOCOL_V31;
- }
mosquitto_lib_init 初始化 (重點)

- int mosquitto_lib_init(void)
- {
- #ifdef WIN32
- srand(GetTickCount());
- #else
- struct timeval tv;
- gettimeofday(&tv, NULL);
- srand(tv.tv_sec*1000 + tv.tv_usec/1000);
- #endif
- _mosquitto_net_init();
- return MOSQ_ERR_SUCCESS;
- }
所在文件 mosquitto-1.4.14/lib/mosquitto.c 所以說需要鏈接動態庫 libmosquitto.so.1

client_id_generate 生成客戶端 ID
其實就是我們講MQTT服務器的時候,訂閱主題然后在服務器上多出的那一行信息。
里面的 mosqsub|2431-ubuntu 就是客戶端 ID。這個函數就是干這個。
- 1502159601: New client connected from 127.0.0.1 as mosqsub|2431-ubuntu (c1, k60)
mosquitto_new 新建一個 mosq。
(重點)

看了一下這個函數里面就是一些初始化的東西
然后可以看到它也是在 lib 目錄下定義的。所以說需要鏈接動態庫 libmosquitto.so.1。其他不用改。

client_ipts_set 各種設置。懶得看...

一些調試信息
以及訂閱回調設置 mosquitto_subscribe_callback_set
(重要)
連接回調設置,和信息回調設置
(重點)

這兩個函數都是在lib目錄下定義的。里面都是有互斥鎖的。
client_connect 客戶端連接

int client_connect(struct mosquitto *mosq, struct mosq_config *cfg)
- {
- char err[1024];
- int rc;
- #ifdef WITH_SRV
- if(cfg->use_srv){
- rc = mosquitto_connect_srv(mosq, cfg->host, cfg->keepalive, cfg->bind_address);
- }else{
- rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
- }
- #else
- rc = mosquitto_connect_bind(mosq, cfg->host, cfg->port, cfg->keepalive, cfg->bind_address);
- #endif
- if(rc>0){
- if(!cfg->quiet){
- if(rc == MOSQ_ERR_ERRNO){
- #ifndef WIN32
- strerror_r(errno, err, 1024);
- #else
- FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);
- #endif
- fprintf(stderr, "Error: %s\n", err);
- }else{
- fprintf(stderr, "Unable to connect (%s).\n", mosquitto_strerror(rc));
- }
- }
- mosquitto_lib_cleanup();
- return rc;
- }
- return MOSQ_ERR_SUCCESS;
- }
mosquitto_connect_srv
mosquitto_connect_bind --> _mosquitto_connect_init
mosquitto_connect_bind --> _mosquitto_connect_init

然后不斷回環,里面的參數自己看。
最后是 mosq 的銷毀和庫的關閉。
(重點)

到此結束!!
三、PUB 客戶端源碼
接下來來看 pub_client.c 有一些相同部分我就不再重復了。
client_config_load 客戶端配置負載
第二個參數,可選擇選擇是 PUB 還是 SUB

一些配置信息的比較

mosquitto_lib_init 初始化 (重點)


client_id_generate 生成客戶端 ID
mosquitto_new 新建一個 mosq。
(重點)

調試信息這里面就沒有了訂閱回調設置 mosquitto_subscribe_callback_set

然后這里看這里,是有區別的。
(重點)

connect、disconnect、publish. 這些回調設置

client_ipts_set 各種設置。懶得看...
client_connect 客戶端連接

回環開始、結束

最重要的來了,do while循環里的發布內容
(重點)

這里的 qos 就是消息發布服務質量級別。

然后還有 retain 用於區分新老訂閱者
RETAIN標志位只用於 PUBLISH 消息,當服務器收到某個主題的 PUBLISH 消息時,如果RETAIN標志位為1,則表示服務在將該消息發送給所有的已訂閱該主題的訂閱者后(發送前服務器將RETAIN標志置為0),還需保持這條消息,當有新增的訂閱者時,再將這條消息發給新增的訂閱者;如果RETAIN標志位為0,則不保持消息,也不用發給新增的訂閱者。
目的:
1.將RETAIN標志位置為1,可使新的訂閱者收到之前保持的或上一個確定有效的消息。
2.區分新訂閱者(RETAIN標志為1)和老訂閱者(RETAIN標志為0)
1.將RETAIN標志位置為1,可使新的訂閱者收到之前保持的或上一個確定有效的消息。
2.區分新訂閱者(RETAIN標志為1)和老訂閱者(RETAIN標志為0)
源碼中這兩個參數的設置都是 0

最后是 mosq 的銷毀和庫的關閉。
(重點)
