這次,分析處理publish msg的流程。
由protocol開始
publish 類型的packet的處理是:
1 process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) -> 2 %% ACL check 3 ... 4 publish(Packet, State); 5 ... 6 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId), 7 #proto_state{client_id = ClientId, username = Username, session = Session}) -> 8 %% 處理packet 獲得msg 9 Msg = emqttd_message:from_packet(Username, ClientId, Packet), 10 %% 調用emqttd_session module的publish/2 函數 11 %% subscribe的時候,也是由protocol 進入的emqttd_session module 12 emqttd_session:publish(Session, Msg);
1、ACL 檢查
2、處理packet 獲得msg
3、調用session module進行處理
emqttd_session 模塊處理
和subscribe的處理流程類似,emqttd_session:publish/2 也只是一個接口函數,該函數要根據QoS的不同,決定是
- 自己調用后續函數完成處理
- call session process 完成后續處理
1 %% @doc Publish message 2 -spec(publish(pid(), mqtt_message()) -> ok | {error, any()}). 3 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) -> 4 %% publish qos0 directly 5 emqttd:publish(Msg); 6 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) -> 7 %% publish qos1 directly, and client will puback automatically 8 emqttd:publish(Msg); 9 publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) -> 10 %% publish qos2 by session 11 gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT).
直接處理
如果是自己調用后續函數完成處理的話,則繼續調用emqttd:publish/2,則在emqttd module 中繼續調用emqttd_server:publish/1:
%% @doc Publish a Message -spec(publish(Msg :: mqtt_message()) -> any()). publish(Msg = #mqtt_message{from = From}) -> ... %% 處理topic ... %% pulish emqttd_pubsub:publish(Topic, Msg2), ...
還是subscribe處理套路:
emqttd_protocol ---> emqttd_session ---> emqttd ---> emqttd_server ---> emqttd_pubsub
在emqttd_pubsub module中的處理是:
1 %% @doc Publish message to Topic. 2 -spec(publish(binary(), any()) -> any()). 3 publish(Topic, Msg) -> 4 lists:foreach( 5 fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() -> 6 ?MODULE:dispatch(To, Msg); 7 (#mqtt_route{topic = To, node = Node}) -> 8 rpc:cast(Node, ?MODULE, dispatch, [To, Msg]) 9 end, emqttd_router:lookup(Topic)). 10 11 dispatch(Topic, Msg) -> 12 case subscribers(Topic) of 13 [] -> 14 dropped(Topic); 15 [SubPid] -> 16 SubPid ! {dispatch, Topic, Msg}; 17 SubPids -> 18 lists:foreach(fun(SubPid) -> 19 SubPid ! {dispatch, Topic, Msg} 20 end, SubPids) 21 end. 22 %% @private 23 %% @doc Find all subscribers 24 subscribers(Topic) -> 25 case ets:member(subscriber, Topic) of 26 true -> %% faster then lookup? 27 try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end; 28 false -> 29 [] 30 end.
至此,msg 就已經以{dispatch, Topic, Msg}的形式發送給 clientid 對應的session process了。
那么,就需要在emqttd_session module中的handle_info callback 函數處進行處理:
1 %% Dispatch Message 2 handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions}) 3 when is_record(Msg, mqtt_message) -> 4 dispatch(tune_qos(Topic, Msg, Subscriptions), Session); 5 6 %% Deliver qos0 message directly to client 7 dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) -> 8 ClientPid ! {deliver, Msg}, 9 hibernate(Session); 10 dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ}) 11 when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 -> 12 case check_inflight(Session) of 13 true -> 14 noreply(deliver(Msg, Session)); 15 false -> 16 hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)}) 17 end.
繼而,將信息發送給socket controlling process,然后根據QoS的不同,判斷是否需要等待ack。
總結
(流程示意圖待補)