emqtt 4 (我要publish消息了)


這次,分析處理publish msg的流程。

由protocol開始

publish 類型的packet的處理是:

 1 process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
 2     %% ACL check
 3     ...
 4     publish(Packet, State);
 5     ...
 6 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
 7         #proto_state{client_id = ClientId, username = Username, session = Session}) ->
 8     %% 處理packet 獲得msg
 9     Msg = emqttd_message:from_packet(Username, ClientId, Packet),
10     %% 調用emqttd_session module的publish/2 函數
11     %% subscribe的時候,也是由protocol 進入的emqttd_session module
12     emqttd_session:publish(Session, Msg);

1、ACL 檢查

2、處理packet 獲得msg

3、調用session module進行處理

emqttd_session 模塊處理

和subscribe的處理流程類似,emqttd_session:publish/2 也只是一個接口函數,該函數要根據QoS的不同,決定是

  1. 自己調用后續函數完成處理
  2. call session process 完成后續處理
 1 %% @doc Publish message
 2 -spec(publish(pid(), mqtt_message()) -> ok | {error, any()}).
 3 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
 4     %% publish qos0 directly
 5     emqttd:publish(Msg);
 6 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
 7     %% publish qos1 directly, and client will puback automatically
 8     emqttd:publish(Msg);
 9 publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
10     %% publish qos2 by session 
11     gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT). 

直接處理

如果是自己調用后續函數完成處理的話,則繼續調用emqttd:publish/2,則在emqttd module 中繼續調用emqttd_server:publish/1:

%% @doc Publish a Message
-spec(publish(Msg :: mqtt_message()) -> any()).
publish(Msg = #mqtt_message{from = From}) ->
    ...
    %% 處理topic
    ...
    %% pulish
    emqttd_pubsub:publish(Topic, Msg2),
    ...

 

還是subscribe處理套路:

emqttd_protocol ---> emqttd_session ---> emqttd ---> emqttd_server ---> emqttd_pubsub

在emqttd_pubsub module中的處理是:

 1 %% @doc Publish message to Topic.
 2 -spec(publish(binary(), any()) -> any()).
 3 publish(Topic, Msg) ->
 4     lists:foreach(
 5         fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
 6             ?MODULE:dispatch(To, Msg);
 7            (#mqtt_route{topic = To, node = Node}) ->
 8             rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
 9         end, emqttd_router:lookup(Topic)).
10 
11 dispatch(Topic, Msg) ->
12     case subscribers(Topic) of
13         [] ->
14             dropped(Topic);
15         [SubPid] ->
16             SubPid ! {dispatch, Topic, Msg};
17         SubPids ->
18             lists:foreach(fun(SubPid) ->
19                 SubPid ! {dispatch, Topic, Msg}
20             end, SubPids)
21     end.
22 %% @private
23 %% @doc Find all subscribers
24 subscribers(Topic) ->
25     case ets:member(subscriber, Topic) of
26         true -> %% faster then lookup?
27             try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end;
28         false ->
29             []
30     end.

 

至此,msg 就已經以{dispatch, Topic, Msg}的形式發送給 clientid 對應的session process了。

那么,就需要在emqttd_session module中的handle_info callback 函數處進行處理:

 1 %% Dispatch Message
 2 handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
 3     when is_record(Msg, mqtt_message) ->
 4     dispatch(tune_qos(Topic, Msg, Subscriptions), Session);
 5 
 6 %% Deliver qos0 message directly to client
 7 dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
 8     ClientPid ! {deliver, Msg},
 9     hibernate(Session);
10 dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
11     when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
12     case check_inflight(Session) of
13         true  ->
14             noreply(deliver(Msg, Session));
15         false ->
16             hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
17     end.

 

繼而,將信息發送給socket controlling process,然后根據QoS的不同,判斷是否需要等待ack。

總結

(流程示意圖待補)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM