一、整體流程概覽
從GitHub下載源碼后,代理的源碼在src中,同時還用到了lib庫中的一些函數。對項目的工作流程有個大概理解是分析mosquitto的訪問控制權限的基礎,網絡上已有很多中文博客在介紹,如逍遙子,盡管比較老,但是主要結構體的意義沒有變;首先對結構體的含義有所理解對后面進一步看源碼是非常有幫助的,如struct mosquitto代表了一個客戶端,mosquitto_db代表代理內的一個倉庫來存儲各種東西。
因為是C語言編寫,首先尋找main函數,服務器從/src/mosquitto.c中的main函數開始啟動。注意,看的時候會發現有很多宏定義(如WIN32),我們選擇自己熟悉的一個平台開始看就好,把其他的折疊掉以免產生混亂。main函數進行了訂閱樹初始化和加載安全配置文件后,便進入mosquitto_main_loop主循環;該函數首先開始用epoll機制來監聽socket讀,之后便進入了真正的核心主循環while(run){},這里也才是服務器運行真正邏輯開始的地方。
從上至下流程概括如下:
- 檢查並釋放閑置的代表客戶端結構體context;
- 然后通過哈希表的形式遍歷客戶端(即context),發送客戶端context隊列里的數據包,並且把超時的斷掉。
- 通過epoll_wait循環處理socket事件,net__socket_accept里接受客戶端的連接同時創建了該客戶端的context;loop_handle_reads_writes根據讀寫事件發送或接收數據包。
- packet__read會讀取所有數據包內容,然后開始調用handle__packet開始根據數據幀的協議類型分開處理,特別注意一下服務器使用的是src文件夾下的read_handle.c里的函數,區別於客戶端使用的lib,有時候自動跳轉會坑。根據handle__packet函數里的switch case,就可以方便的進行更詳細的跟進。

while(run){//進入主死循環 context__free_disused(db); #ifdef WITH_SYS_TREE if(db->config->sys_interval > 0){ sys_tree__update(db, db->config->sys_interval, start_time); } #endif #ifndef WITH_EPOLL memset(pollfds, -1, sizeof(struct pollfd)*pollfd_max); pollfd_index = 0; for(i=0; i<listensock_count; i++){ pollfds[pollfd_index].fd = listensock[i]; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; pollfd_index++; } #endif now_time = time(NULL); time_count = 0; HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){//遍歷哈希表 if(time_count > 0){ time_count--; }else{ time_count = 1000; now = mosquitto_time(); } context->pollfd_index = -1; if(context->sock != INVALID_SOCKET){ #ifdef WITH_BRIDGE if(context->bridge){ mosquitto__check_keepalive(db, context); if(context->bridge->round_robin == false && context->bridge->cur_address != 0 && context->bridge->primary_retry && now > context->bridge->primary_retry){ if(context->bridge->primary_retry_sock == INVALID_SOCKET){ rc = net__try_connect(context, context->bridge->addresses[0].address, context->bridge->addresses[0].port, &context->bridge->primary_retry_sock, NULL, false); if(rc == 0){ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = 0; net__socket_close(db, context); context->bridge->cur_address = 0; } }else{ len = sizeof(int); if(!getsockopt(context->bridge->primary_retry_sock, SOL_SOCKET, SO_ERROR, (char *)&err, &len)){ if(err == 0){ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = 0; net__socket_close(db, context); context->bridge->cur_address = context->bridge->address_count-1; }else{ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = now+5; } }else{ COMPAT_CLOSE(context->bridge->primary_retry_sock); context->bridge->primary_retry_sock = INVALID_SOCKET; context->bridge->primary_retry = now+5; } } } } #endif /* Local bridges never time out in this fashion. */ if(!(context->keepalive) || context->bridge || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ //判斷當客戶端在線時,給客戶端發送inflight的數據包 if(db__message_write(db, context) == MOSQ_ERR_SUCCESS){ #ifdef WITH_EPOLL if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ if(!(context->events & EPOLLOUT)) { ev.data.fd = context->sock; ev.events = EPOLLIN | EPOLLOUT; if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLOUT: %s", strerror(errno)); } } context->events = EPOLLIN | EPOLLOUT; } context->ws_want_write = false; } else{ if(context->events & EPOLLOUT) { ev.data.fd = context->sock; ev.events = EPOLLIN; if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering to EPOLLIN: %s", strerror(errno)); } } context->events = EPOLLIN; } } #else pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; if(context->current_out_packet || context->state == mosq_cs_connect_pending || context->ws_want_write){ pollfds[pollfd_index].events |= POLLOUT; context->ws_want_write = false; } context->pollfd_index = pollfd_index; pollfd_index++; #endif }else{ do_disconnect(db, context); } }else{//客戶端超時 if(db->config->connection_messages == true){ if(context->id){ id = context->id; }else{ id = "<unknown>"; } log__printf(NULL, MOSQ_LOG_NOTICE, "Client %s has exceeded timeout, disconnecting.", id); } /* Client has exceeded keepalive*1.5 */ do_disconnect(db, context); } } } #ifdef WITH_BRIDGE time_count = 0; for(i=0; i<db->bridge_count; i++){ if(!db->bridges[i]) continue; context = db->bridges[i]; if(context->sock == INVALID_SOCKET){ if(time_count > 0){ time_count--; }else{ time_count = 1000; now = mosquitto_time(); } /* Want to try to restart the bridge connection */ if(!context->bridge->restart_t){ context->bridge->restart_t = now+context->bridge->restart_timeout; context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } }else{ if((context->bridge->start_type == bst_lazy && context->bridge->lazy_reconnect) || (context->bridge->start_type == bst_automatic && now > context->bridge->restart_t)){ #if defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ /* Connection attempted, waiting on DNS lookup */ rc = gai_error(context->adns); if(rc == EAI_INPROGRESS){ /* Just keep on waiting */ }else if(rc == 0){ rc = bridge__connect_step2(db, context); if(rc == MOSQ_ERR_SUCCESS){ #ifdef WITH_EPOLL ev.data.fd = context->sock; ev.events = EPOLLIN; if(context->current_out_packet){ ev.events |= EPOLLOUT; } if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); } }else{ context->events = ev.events; } #else pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; if(context->current_out_packet){ pollfds[pollfd_index].events |= POLLOUT; } context->pollfd_index = pollfd_index; pollfd_index++; #endif }else if(rc == MOSQ_ERR_CONN_PENDING){ context->bridge->restart_t = 0; }else{ context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } context->bridge->restart_t = 0; } }else{ /* Need to retry */ if(context->adns->ar_result){ freeaddrinfo(context->adns->ar_result); } mosquitto__free(context->adns); context->adns = NULL; context->bridge->restart_t = 0; } }else{ rc = bridge__connect_step1(db, context); if(rc){ context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } }else{ /* Short wait for ADNS lookup */ context->bridge->restart_t = 1; } } #else { rc = bridge__connect(db, context); context->bridge->restart_t = 0; if(rc == MOSQ_ERR_SUCCESS){ if(context->bridge->round_robin == false && context->bridge->cur_address != 0){ context->bridge->primary_retry = now + 5; } #ifdef WITH_EPOLL ev.data.fd = context->sock; ev.events = EPOLLIN; if(context->current_out_packet){ ev.events |= EPOLLOUT; } if(epoll_ctl(db->epollfd, EPOLL_CTL_ADD, context->sock, &ev) == -1) { if((errno != EEXIST)||(epoll_ctl(db->epollfd, EPOLL_CTL_MOD, context->sock, &ev) == -1)) { log__printf(NULL, MOSQ_LOG_DEBUG, "Error in epoll re-registering bridge: %s", strerror(errno)); } }else{ context->events = ev.events; } #else pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; if(context->current_out_packet){ pollfds[pollfd_index].events |= POLLOUT; } context->pollfd_index = pollfd_index; pollfd_index++; #endif }else{ context->bridge->cur_address++; if(context->bridge->cur_address == context->bridge->address_count){ context->bridge->cur_address = 0; } } } #endif } } } } #endif now_time = time(NULL); if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){ HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ if(context->sock == INVALID_SOCKET && context->clean_session == 0){ /* This is a persistent client, check to see if the * last time it connected was longer than * persistent_client_expiration seconds ago. If so, * expire it and clean up. */ if(now_time > context->disconnect_t+db->config->persistent_client_expiration){ if(context->id){ id = context->id; }else{ id = "<unknown>"; } log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id); G_CLIENTS_EXPIRED_INC(); context->clean_session = true; context->state = mosq_cs_expiring; do_disconnect(db, context); } } } expiration_check_time = time(NULL) + 3600; } #ifndef WIN32 sigprocmask(SIG_SETMASK, &sigblock, &origsig); #ifdef WITH_EPOLL //監聽socket事件 fdcount = epoll_wait(db->epollfd, events, MAX_EVENTS, 100); #else fdcount = poll(pollfds, pollfd_index, 100); #endif sigprocmask(SIG_SETMASK, &origsig, NULL); #else fdcount = WSAPoll(pollfds, pollfd_index, 100); #endif #ifdef WITH_EPOLL switch(fdcount){ case -1: if(errno != EINTR){ log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll waiting: %s.", strerror(errno)); } break; case 0: break; default: //循環處理socket事件 for(i=0; i<fdcount; i++){ for(j=0; j<listensock_count; j++){ if (events[i].data.fd == listensock[j]) { if (events[i].events & (EPOLLIN | EPOLLPRI)){ //接受客戶端的連接,net__socket_accept里同時創建了該客戶端的context while((ev.data.fd = net__socket_accept(db, listensock[j])) != -1){ ev.events = EPOLLIN; if (epoll_ctl(db->epollfd, EPOLL_CTL_ADD, ev.data.fd, &ev) == -1) { log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: %s", strerror(errno)); } context = NULL; HASH_FIND(hh_sock, db->contexts_by_sock, &(ev.data.fd), sizeof(mosq_sock_t), context); if(!context) { log__printf(NULL, MOSQ_LOG_ERR, "Error in epoll accepting: no context"); } context->events = EPOLLIN; } } break; } } if (j == listensock_count) { loop_handle_reads_writes(db, events[i].data.fd, events[i].events); } } } #else if(fdcount == -1){ log__printf(NULL, MOSQ_LOG_ERR, "Error in poll: %s.", strerror(errno)); }else{ loop_handle_reads_writes(db, pollfds); for(i=0; i<listensock_count; i++){ if(pollfds[i].revents & (POLLIN | POLLPRI)){ while(net__socket_accept(db, listensock[i]) != -1){ } } } } #endif #ifdef WITH_PERSISTENCE if(db->config->persistence && db->config->autosave_interval){ if(db->config->autosave_on_changes){ if(db->persistence_changes >= db->config->autosave_interval){ persist__backup(db, false); db->persistence_changes = 0; } }else{ if(last_backup + db->config->autosave_interval < mosquitto_time()){ persist__backup(db, false); last_backup = mosquitto_time(); } } } #endif #ifdef WITH_PERSISTENCE if(flag_db_backup){ persist__backup(db, false); flag_db_backup = false; } #endif if(flag_reload){ log__printf(NULL, MOSQ_LOG_INFO, "Reloading config."); config__read(db, db->config, true); mosquitto_security_cleanup(db, true); mosquitto_security_init(db, true); mosquitto_security_apply(db); log__close(db->config); log__init(db->config); flag_reload = false; } if(flag_tree_print){ sub__tree_print(db->subs, 0); flag_tree_print = false; } #ifdef WITH_WEBSOCKETS for(i=0; i<db->config->listener_count; i++){ /* Extremely hacky, should be using the lws provided external poll * interface, but their interface has changed recently and ours * will soon, so for now websockets clients are second class * citizens. */ if(db->config->listeners[i].ws_context){ libwebsocket_service(db->config->listeners[i].ws_context, 0); } } if(db->config->have_websockets_listener){ temp__expire_websockets_clients(db); } #endif }//end while(run)
二、mosquitto原生權限功能
在mosquitto_plugin.h中唯一一次出現了對這幾個權限宏定義的說明:
/* * Function: mosquitto_auth_acl_check * * Called by the broker when topic access must be checked. access will be one * of: * MOSQ_ACL_SUBSCRIBE when a client is asking to subscribe to a topic string. * This differs from MOSQ_ACL_READ in that it allows you to * deny access to topic strings rather than by pattern. For * example, you may use MOSQ_ACL_SUBSCRIBE to deny * subscriptions to '#', but allow all topics in * MOSQ_ACL_READ. This allows clients to subscribe to any * topic they want, but not discover what topics are in use * on the server. * MOSQ_ACL_READ when a message is about to be sent to a client (i.e. whether * it can read that topic or not). * MOSQ_ACL_WRITE when a message has been received from a client (i.e. whether * it can write to that topic or not). *
后面的解釋說明了實現時要在哪些位置檢查這個權限。執行檢查的函數是
int mosquitto_acl_check(struct mosquitto_db *db, struct mosquitto *context, const char *topic, long payloadlen, void* payload, int qos, bool retain, int access)
其中context就是就是被檢查的客戶端信息,topic、payload、retain等是當前消息的屬性,access是要檢查的具體權限。通過這個函數參數的接口設計可以猜測其是根據客戶端的context來進行檢查,也就是根據客戶端的事件(ps.不然怎么知道要傳入哪個context?一般都是哪個context有行為用哪個吧)。那么是不是所有消息都能找到對應的客戶端context呢?請繼續看下文分解。
- WRITE權限是當代理收到客戶端的消息時進行檢查的,特別注意,last will消息存儲在了客戶端的context里,因此是當do_disconnect的時候代理才根據這個context發送last will消息;但是,按照這個定義retain消息顯然是不在這個權限管轄范圍內的,因為代理可能已經很早就存儲了該消息,發送的客戶端的context早已經被清楚掉了。不過,mosquitto這個項目已經添加了在publish時限制retain的功能限制,可以在這里看到討論。此外還有很多郵件列有相關的權限設計討論(關於subscribe權限的提出 1 ,2:可見作者覺得設計在消息發出時檢查是因為不用考慮通配符的問題,實現簡單,而沒有考慮撤銷問題,后來補了訂閱權限是防止訂閱通配符,也能提高效率),有時觀察這些一線牛人的討論也能從中學到很多,可以直觀的看到這個項目的權限是如何一步一步建立起來的,為什么要這么建立。還能看到有論文的作者利用mosquitto實現方案時與作者的討論 。
- SUBSCRIBE權限是在客戶端訂閱時檢查,不同在於可以拒絕訂閱#。可見作者沒有考慮只有這一個權限會動態撤銷有問題。
- READ權限是在消息即將放入客戶端context的發送隊列時進行檢查的,包括subscribe時的retained消息,每個消息要發送的時候。這個實現的特點就允許管理員動態更新策略,取消掉客戶端接收某個主題消息的權限。
想要查看作者具體是在哪里檢查什么權限的可以全局搜索這個函數在哪里調用過。
三、對於mosquitto原生權限的改進
上節提到了,由於權限檢查函數需要context的特點,以及retain消息是保存在訂閱樹葉子節點上的特點,導致retain消息WRITE權限檢查丟失。本節討論如何加入檢查retained消息權限的功能。先來看代理是如何處理retained消息的。
- 代理接收並存儲retained消息:retained消息是PUBLISH發布到服務器的(last will也可設置,傳入的是一個函數)。通過PUBLISH對應函數,可以看到db__messages_easy_queue調用了db__message_store這個函數,將消息及各種屬性存入stored保存下來,然后調用sub__messages_queue將消息加入訂閱樹對應的結點中。最終是在subs__process將retain消息放入結點struct mosquitto__subhier *hier的retained中。
- 代理發布retained消息:handle__subscribe函數中,檢查完權限並加入訂閱樹中(sub__add)后,檢查該主題下是否有retained消息需要發送,調用sub__retain_queue、retain__search,再使用retain__process發送該消息。
所以修改思路就是在存入消息的時候,即db__message_store中,保存retained消息發送源的context(為了復用mosquitto_acl_check);在要發送給訂閱客戶端的時候,即retain__process中,檢查發送源的權限。雖然看似簡單但還是要考慮很多其他因素,尤其C語音要自己控制內存釋放與初始化,一不小心就會段錯誤。具體修改細節:
-
1.修改mosquitto_broker_internal.h里mosquitto結構體,加入該客戶端共注冊過多少個retained message的計數,以方便維護該客戶端context的消亡。加數:新retain消息入代理時;減:該retain消息被替代時。注意要初始化這個值!找到context第一次被創建在context.c中的context__init函數。
-
2.在handle_publish.c中可以看出,代理會存儲消息,使用了database.c中的db__message_store函數。應該修改這個函數,將發送源的context存入給mosquitto_msg_store。
-
3. mosquitto_msg_store肯定也要加入一個mosquitto結構體指針存儲context。在subs.c中的subs__process可以看到如果是retain消息就把這個結構體存入當前主題結點。
-
4. subs.c文件的retain__process,在把retained消息給客戶端之前檢查發送源權限。
-
5. 在loop.c文件的do_disconnect函數,在調用context__add_to_disused之前檢查是不是有retained message注冊過,即檢查計數。只有對該context調用do_disconnect才能銷毀context。
-
6. 因為怕contex最后沒有被調用do_disconnect釋放掉,所以要在保存消息被刪掉時專門檢查一下“已經不在線的客戶端是否還有retained message,沒有就減少計數,若計數為0,且不需要恢復會話context->clean_session==true(不能影響保存會話且沒有retain消息的人),且不在線context->state= = mosq_cs_disconnected,就調用釋放函數context__add_to_disused。
-
7.是否影響會話恢復?
-
8.注意msg_store也有維護,如database.c中的db__msg_store_deref后要釋放掉這個消息的時候db__msg_store_remove,就減少源context的引用。(因為這時候保存的消息要被清掉了)
-
10. 整個項目有很多地方調用了db__message_store,應該仔細考察到底什么時候存context!
-
11.store message也記得初始化!所有加入的東西都要記得初始化和釋放
-
12.一直保留着有retain消息的context,甚至保留其id,對系統有什么影響?再有人想使用相同的id會發生什么?區分出在線的該id和不在線的?
-
13.小心宏定義導致代碼實際沒有加入進去。