第一篇,先簡單分析一下整個emqtt 的大致結構,包括兩個部分:
1、message packet 類型
2、message 流向
message packet 類型
P1:mqtt_packet 的基本結構,其中header 中的type 與variable 的mqtt_packet_* 一一對應。
emqtt 的packet 定義如下:
1 -record(mqtt_packet, 2 {header :: #mqtt_packet_header{}, 3 variable :: #mqtt_packet_connect{} | #mqtt_packet_connack{} 4 | #mqtt_packet_publish{} | #mqtt_packet_puback{} 5 | #mqtt_packet_subscribe{} | #mqtt_packet_suback{} 6 | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{} 7 | mqtt_packet_id() | undefined, 8 payload :: binary() | undefined }).
包括包頭、包的類型、實際的數據消息。
在emqtt 中,message packet 的類型主要分為一下幾類(這些類型,基本上定義在emqttd_protocol.hrl 文件中):
1、connect/connack
P2:connect packet的基本結構
用於客戶端向server端建立鏈接以及server 端向client 確定鏈接。
1 -record(mqtt_packet_connect, 2 {client_id = <<>> :: mqtt_client_id(), 3 proto_ver = ?MQTT_PROTO_V311 :: mqtt_vsn(), 4 proto_name = <<"MQTT">> :: binary(), 5 will_retain = false :: boolean(), 6 will_qos = ?QOS_0 :: mqtt_qos(), 7 will_flag = false :: boolean(), 8 clean_sess = false :: boolean(), 9 keep_alive = 60 :: non_neg_integer(), 10 will_topic = undefined :: undefined | binary(), 11 will_msg = undefined :: undefined | binary(), 12 username = undefined :: undefined | binary(), 13 password = undefined :: undefined | binary()}). 14 15 -record(mqtt_packet_connack, 16 {ack_flags = ?RESERVED :: 0 | 1, 17 return_code :: mqtt_connack() }).
其中,主要的幾個字段,client_id、proto_ver、proto_name以及keep_alive。
client_id在username undefined的情況下,充當username的角色,是客戶端的唯一標識,在接下來的session manage以及 subscription 管理中,具有非常重要的作用。
2、subscribe/suback
P3:subscribe packet的基本結構
用於處理客戶端 subscribe 某個topic,已經server端向客戶端的確認。
1 -record(mqtt_packet_subscribe, 2 {packet_id :: mqtt_packet_id(), 3 topic_table :: list({binary(), mqtt_qos()}) }). 4 -record(mqtt_packet_suback, 5 {packet_id :: mqtt_packet_id(), 6 qos_table :: list(mqtt_qos() | 128) }).
topic_table 字段表示的是所要subscribe的topic 以及對應的QoS。
3、unsubscribe/unsuback
subscribe的反向操作
4、publish/puback
P4: publish packet的基本結構
1 -record(mqtt_packet_publish, 2 {topic_name :: binary(), 3 packet_id :: mqtt_packet_id() }). 4 5 -record(mqtt_packet_puback, 6 {packet_id :: mqtt_packet_id() }).
topic_name指定了將要publish到的topic的名字。
在emqtt中,用Erlanger中record數據類型,定義了以上一種message packet的類型,而且,這些類型的packet的作用都顯而易見。
message 流向
首先,mqtt是基於TCP協議的,因此,emqtt本身也是架構在TCP server上的service。其底層,基於esockd(主要借鑒RabbitMQ networking的實現)。socket controlling 進程的執行邏輯是emqttd_client module。emqttd_client module主要與esockd application 中的esockd_connection module 相關聯,並存在必要的callback 關系。
由於socket controlling 進程的入口以及主要執行邏輯由emqttd_client module實現。
因此:
- message進入到server后,首先由emqttd_client module進行處理
進入到emqtt server的message,主要是二進制socket數據,想要轉換成Erlang的內部數據結構,就必要進行必要的數據解析。
1、socket 數據包接收
1 handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) -> 2 Size = size(Data), 3 ?LOG(debug, "RECV ~p", [Data], State), 4 emqttd_metrics:inc('bytes/received', Size), 5 received(Data, rate_limit(Size, State#client_state{await_recv = false}));
L2,L3,L4這三行代碼主要用於調試和metric。
2、socket 數據包解析
socket 二進制數據包的解析,主要是由emqttd_parser module進行處理,包括二進制協議的解析以及socket 粘包的處理。
- message 由emqttd_parser module進行二進制數據協議的解析
解析成為mqtt_pakcet record 類型的數據結構。
3、mqtt packet 處理
emqtt中,mqtt packet的處理是由emqtt_protocol module完成,其入口函數為received/2:
1 %% A Client can only send the CONNECT Packet once over a Network Connection. 2 -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}). 3 received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) -> 4 process(Packet, State#proto_state{connected = true}); 5 6 received(?PACKET(?CONNECT), State = #proto_state{connected = true}) -> 7 {error, protocol_bad_connect, State}; 8 9 %% Received other packets when CONNECT not arrived. 10 received(_Packet, State = #proto_state{connected = false}) -> 11 {error, protocol_not_connected, State}; 12 13 received(Packet = ?PACKET(_Type), State) -> 14 trace(recv, Packet, State), 15 case validate_packet(Packet) of 16 ok -> 17 process(Packet, State); 18 {error, Reason} -> 19 {error, Reason, State} 20 end.
前三個子函數,主要處理"是否已經login"相關的packet,而最后一個子函數則是在login 之后,處理正常的packet。
在emqttd_protocol module 中,由入口函數received 做簡單的處理之后,則交由本module 中的process/2 函數進行處理,並最后交由后續的實際業務module 進行處理。
- mqtt packet 由emqttd_protocol module進行處理,並交由后續的module
綜上,在emqtt中,message 基本的流向為:
- message進入到server后,首先由emqttd_client module進行處理
- message 由emqttd_parser module進行二進制數據協議的解析
- mqtt packet 由emqttd_protocol module進行處理,並交由后續的module
- 后續根據不同類型的packet,再做不同的處理
圖示如下:
P5: message基本流向示意圖
總結
需要?