emqtt 1 (初初初初稿)


第一篇,先簡單分析一下整個emqtt 的大致結構,包括兩個部分:

1、message packet 類型

2、message 流向

message packet 類型

 P1:mqtt_packet 的基本結構,其中header 中的type 與variable 的mqtt_packet_* 一一對應。

emqtt 的packet 定義如下:

1 -record(mqtt_packet,
2         {header    :: #mqtt_packet_header{},
3          variable  :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
4                     | #mqtt_packet_publish{} | #mqtt_packet_puback{}
5                     | #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
6                     | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
7                     | mqtt_packet_id() | undefined,
8          payload   :: binary() | undefined }).

包括包頭、包的類型、實際的數據消息。

在emqtt 中,message packet 的類型主要分為一下幾類(這些類型,基本上定義在emqttd_protocol.hrl 文件中):

1、connect/connack

P2:connect packet的基本結構

用於客戶端向server端建立鏈接以及server 端向client 確定鏈接。

 1 -record(mqtt_packet_connect,
 2         {client_id   = <<>>              :: mqtt_client_id(),
 3          proto_ver   = ?MQTT_PROTO_V311  :: mqtt_vsn(),
 4          proto_name  = <<"MQTT">>        :: binary(),
 5          will_retain = false             :: boolean(),
 6          will_qos    = ?QOS_0            :: mqtt_qos(),
 7          will_flag   = false             :: boolean(),
 8          clean_sess  = false             :: boolean(),
 9          keep_alive  = 60                :: non_neg_integer(),
10          will_topic  = undefined         :: undefined | binary(),
11          will_msg    = undefined         :: undefined | binary(),
12          username    = undefined         :: undefined | binary(),
13          password    = undefined         :: undefined | binary()}).
14 
15 -record(mqtt_packet_connack,
16         {ack_flags = ?RESERVED   :: 0 | 1,
17          return_code             :: mqtt_connack() }).

其中,主要的幾個字段,client_id、proto_ver、proto_name以及keep_alive。

client_id在username undefined的情況下,充當username的角色,是客戶端的唯一標識,在接下來的session manage以及 subscription 管理中,具有非常重要的作用。

2、subscribe/suback

 

P3:subscribe packet的基本結構

 

用於處理客戶端 subscribe 某個topic,已經server端向客戶端的確認。

1 -record(mqtt_packet_subscribe,
2         {packet_id   :: mqtt_packet_id(),
3          topic_table :: list({binary(), mqtt_qos()}) }).
4 -record(mqtt_packet_suback,
5         {packet_id   :: mqtt_packet_id(),
6          qos_table   :: list(mqtt_qos() | 128) }).

topic_table 字段表示的是所要subscribe的topic 以及對應的QoS。

3、unsubscribe/unsuback

subscribe的反向操作

4、publish/puback

P4: publish packet的基本結構

1 -record(mqtt_packet_publish,
2         {topic_name  :: binary(),
3          packet_id   :: mqtt_packet_id() }).
4 
5 -record(mqtt_packet_puback,
6         {packet_id   :: mqtt_packet_id() }).

topic_name指定了將要publish到的topic的名字。

 

在emqtt中,用Erlanger中record數據類型,定義了以上一種message packet的類型,而且,這些類型的packet的作用都顯而易見。

message 流向

首先,mqtt是基於TCP協議的,因此,emqtt本身也是架構在TCP server上的service。其底層,基於esockd(主要借鑒RabbitMQ networking的實現)。socket controlling 進程的執行邏輯是emqttd_client module。emqttd_client module主要與esockd application 中的esockd_connection module 相關聯,並存在必要的callback 關系。

由於socket controlling 進程的入口以及主要執行邏輯由emqttd_client module實現。

因此:

  • message進入到server后,首先由emqttd_client module進行處理

進入到emqtt server的message,主要是二進制socket數據,想要轉換成Erlang的內部數據結構,就必要進行必要的數據解析。

1、socket 數據包接收

1 handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
2     Size = size(Data),
3     ?LOG(debug, "RECV ~p", [Data], State),
4     emqttd_metrics:inc('bytes/received', Size),
5     received(Data, rate_limit(Size, State#client_state{await_recv = false}));

L2,L3,L4這三行代碼主要用於調試和metric。

2、socket 數據包解析

socket 二進制數據包的解析,主要是由emqttd_parser module進行處理,包括二進制協議的解析以及socket 粘包的處理。

  • message 由emqttd_parser module進行二進制數據協議的解析

解析成為mqtt_pakcet record 類型的數據結構。

3、mqtt packet 處理

emqtt中,mqtt packet的處理是由emqtt_protocol module完成,其入口函數為received/2:

 1 %% A Client can only send the CONNECT Packet once over a Network Connection. 
 2 -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
 3 received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
 4     process(Packet, State#proto_state{connected = true});
 5 
 6 received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
 7     {error, protocol_bad_connect, State};
 8 
 9 %% Received other packets when CONNECT not arrived.
10 received(_Packet, State = #proto_state{connected = false}) ->
11     {error, protocol_not_connected, State};
12 
13 received(Packet = ?PACKET(_Type), State) ->
14     trace(recv, Packet, State),
15     case validate_packet(Packet) of
16         ok ->
17             process(Packet, State);
18         {error, Reason} ->
19             {error, Reason, State}
20     end.

前三個子函數,主要處理"是否已經login"相關的packet,而最后一個子函數則是在login 之后,處理正常的packet。

在emqttd_protocol module 中,由入口函數received 做簡單的處理之后,則交由本module 中的process/2 函數進行處理,並最后交由后續的實際業務module 進行處理。

  • mqtt packet 由emqttd_protocol module進行處理,並交由后續的module 

綜上,在emqtt中,message 基本的流向為:

  1. message進入到server后,首先由emqttd_client module進行處理
  2. message 由emqttd_parser module進行二進制數據協議的解析
  3. mqtt packet 由emqttd_protocol module進行處理,並交由后續的module 
  4. 后續根據不同類型的packet,再做不同的處理

圖示如下:

P5: message基本流向示意圖

總結

需要?

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM