這一次,主要分析client subscribe 某個topic 的處理流程。
由protocol開始
是的,還是要從protocol開始,至於為什么,之前就說過了。
subscribe 類型的packet的處理是:
1 %% 直接過濾掉topic 為空的情況 2 process(?SUBSCRIBE_PACKET(PacketId, []), State) -> 3 send(?SUBACK_PACKET(PacketId, []), State); 4 5 process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) -> 6 %% 組裝client 信息 7 Client = client(State), 8 %% 檢查ACL 9 ... 10 %% session 為clientid 對應的session pid 11 %% TopicTable 為 [{TopicName, QoS}] 12 emqttd_session:subscribe(Session, PacketId, TopicTable) 13 ... 14 ; 15
1、過濾掉topictable 為空的情況
2、組裝必要的client 信息,完成ACL檢查
3、獲取clientid 對應的session pid,並調用emqttd_session:subscribe/3 函數
emqttd_session 模塊處理
emqttd_session:subscribe/3 只是一個接口函數,實際的處理邏輯是在emqttd_session 模塊的handle_cast callback 中實現。
-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok). subscribe(SessPid, PacketId, TopicTable) -> From = self(), %%這里的self 是client process id AckFun = fun(GrantedQos) -> From ! {suback, PacketId, GrantedQos} end,
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).
接口函數的定義如上。
handle_cast callback 的實現如下:
handle_cast({subscribe, TopicTable0, AckFun}, Session = #session{client_id = ClientId,
%% subscription 是dict subscriptions = Subscriptions}) -> %% rewrite topic name 對topic name 做一些處理 Subscriptions1 = lists:foldl( fun({Topic, Qos}, SubDict) -> case dict:find(Topic, SubDict) of {ok, Qos} -> %% 已經存在,並且QoS 未更新,所以什么都不需要做 SubDict; {ok, OldQos} -> %% 已經存在,但是QoS 更新,所以,需要更新一下 emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos), dict:store(Topic, Qos, SubDict); error -> %% 不存在,直接添加 emqttd:subscribe(ClientId, Topic, Qos), dict:store(Topic, Qos, SubDict) end end, Subscriptions, TopicTable),
更新subscribe
更新subscribe,也就是調用emqttd_server:update_subscription/4 。
emqttd_server 也是由pool 組織的gen_server進程,主要作用是subscription 的增刪改查,subscription 信息是保存在 subscription mnesia table 中的,subscription mnesia table的字段信息如下:
-record(mqtt_subscription, {subid :: binary() | atom(), topic :: binary(), qos = 0 :: 0 | 1 | 2 }).
其中,subid 即為subscriber id,也就是clientid,topic 即為topic的名稱。
而,update subscription 的邏輯:
1 %% 外部接口 2 update_subscription(ClientId, Topic, OldQos, NewQos) -> 3 call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}). 4 5 handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) -> 6 if_subsciption(State, fun() -> 7 OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos}, 8 NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos}, 9 %% 使用事物 10 mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]), 11 set_subscription_stats() 12 end), ok(State); 13 14 update_subscription_(OldSub, NewSub) -> 15 %% 刪除舊的 subscription 16 mnesia:delete_object(subscription, OldSub, write), 17 %% 寫入新的 subscription 18 mnesia:write(subscription, NewSub, write).
因為 subscription mnesia table的類型為bag,也就是一個clientid 可能會和多個topic 相對應,所以,不能依據key 進行delete,必須使用delete_object的方式。
創建subscribe
create subscribe的處理略微有些繞,不知道是作者有意而為之,還是其他什么原因。
首先,create subscribe的入口函數在emqttd module中,
-spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}). subscribe(ClientId, Topic, Qos) -> emqttd_server:subscribe(ClientId, Topic, Qos).
在此調用emqttd_server:subscribe/3 函數,並請求emqttd_server 進程,emqttd_server 進程調用handle_call callback 函數,處理請求。
1 %% 外部接口 2 -spec(subscribe(binary(), binary(), mqtt_qos()) -> ok). 3 subscribe(ClientId, Topic, Qos) -> 4 %% 這里的self 是emqttd_session 進程,這個調用是在emqttd_session 5 %% module 中的 handle_cast callback 發起的 6 From = self(), 7 call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}). 8 9 handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) -> 10 %% call pubsub process 11 pubsub_subscribe_(SubPid, Topic), 12 if_subsciption(State, fun() -> 13 %% 將subscription 信息寫入到 subscription mnesia table 中 14 add_subscription_(ClientId, Topic, Qos), 15 set_subscription_stats() 16 end), 17 %% monitor session pid,當起DOWN 之后,去掉subscribe 並移除相關信息 18 ok(monitor_subscriber_(ClientId, SubPid, State)); 19 20 %% @private 21 %% @doc Call pubsub to subscribe 22 pubsub_subscribe_(SubPid, Topic) -> 23 case ets:match(subscribed, {SubPid, Topic}) of 24 [] -> 25 emqttd_pubsub:async_subscribe(Topic, SubPid), 26 ets:insert(subscribed, {SubPid, Topic}); 27 [_] -> 28 false 29 end.
L26處,用了subscribed ets table,記錄session pid subscribe 的所有topic,這樣在 session pid DOWN的時候,就可以移除所有的topic 中session pid 相關的信息了。
而,emqttd_pubsub 同樣是由pool 組織的gen_server 進程。
1 %% 外部接口,發起請求 2 -spec(async_subscribe(binary(), pid()) -> ok). 3 async_subscribe(Topic, SubPid) when is_binary(Topic) -> 4 cast(pick(Topic), {subscribe, Topic, SubPid}). 5 6 handle_cast({subscribe, Topic, SubPid}, State) -> 7 %% 實際的處理函數 8 add_subscriber_(Topic, SubPid), 9 {noreply, setstats(State)}; 10 11 add_subscriber_(Topic, SubPid) -> 12 %% 檢查該Topic 是否已經存在 13 %% 若不存在,則先增加{Topic,Node}信息,為多node 場景服務 14 ... 15 ets:insert(subscriber, {Topic, SubPid}). %% 這里的subscriber 是一張ets table,接下來的publish 主要就是用的這張表 ********
至此,subscribe 操作的處理邏輯就ok了。
總結
應該也不需要,只是這代碼貼的有點多了。(示意圖待補)