emqtt 3 (我要subscribe 這個topic)


這一次,主要分析client subscribe 某個topic 的處理流程。

由protocol開始

是的,還是要從protocol開始,至於為什么,之前就說過了。

subscribe 類型的packet的處理是:

 1 %% 直接過濾掉topic 為空的情況
 2 process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
 3     send(?SUBACK_PACKET(PacketId, []), State);
 4 
 5 process(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{session = Session}) ->
 6     %% 組裝client 信息
 7     Client = client(State),
 8     %% 檢查ACL
 9     ...
10     %% session 為clientid 對應的session pid
11     %% TopicTable 為 [{TopicName, QoS}] 
12     emqttd_session:subscribe(Session, PacketId, TopicTable)
13     ...
14     ;
15     

1、過濾掉topictable 為空的情況

2、組裝必要的client 信息,完成ACL檢查

3、獲取clientid 對應的session pid,並調用emqttd_session:subscribe/3 函數

emqttd_session 模塊處理

emqttd_session:subscribe/3 只是一個接口函數,實際的處理邏輯是在emqttd_session 模塊的handle_cast callback 中實現。

-spec(subscribe(pid(), mqtt_packet_id(), [{binary(), mqtt_qos()}]) -> ok).
subscribe(SessPid, PacketId, TopicTable) ->
    From   = self(), %%這里的self 是client process id
    AckFun = fun(GrantedQos) ->
               From ! {suback, PacketId, GrantedQos}
             end,
gen_server2:cast(SessPid, {subscribe, TopicTable, AckFun}).

接口函數的定義如上。

handle_cast callback 的實現如下:

handle_cast({subscribe, TopicTable0, AckFun},
            Session = #session{client_id     = ClientId,
                   %% subscription 是dict subscriptions
= Subscriptions}) -> %% rewrite topic name 對topic name 做一些處理 Subscriptions1 = lists:foldl( fun({Topic, Qos}, SubDict) -> case dict:find(Topic, SubDict) of {ok, Qos} -> %% 已經存在,並且QoS 未更新,所以什么都不需要做 SubDict; {ok, OldQos} -> %% 已經存在,但是QoS 更新,所以,需要更新一下 emqttd_server:update_subscription(ClientId, Topic, OldQos, Qos), dict:store(Topic, Qos, SubDict); error -> %% 不存在,直接添加 emqttd:subscribe(ClientId, Topic, Qos), dict:store(Topic, Qos, SubDict) end end, Subscriptions, TopicTable),

更新subscribe

更新subscribe,也就是調用emqttd_server:update_subscription/4 。

emqttd_server 也是由pool 組織的gen_server進程,主要作用是subscription 的增刪改查,subscription 信息是保存在 subscription mnesia table 中的,subscription mnesia table的字段信息如下:

-record(mqtt_subscription,
        {subid   :: binary() | atom(),
         topic   :: binary(),
         qos = 0 :: 0 | 1 | 2
        }).

其中,subid 即為subscriber id,也就是clientid,topic 即為topic的名稱。

而,update subscription 的邏輯:

 1 %% 外部接口
 2 update_subscription(ClientId, Topic, OldQos, NewQos) ->
 3     call(server(self()), {update_subscription, ClientId, Topic, ?QOS_I(OldQos), ?QOS_I(NewQos)}).
 4 
 5 handle_call({update_subscription, ClientId, Topic, OldQos, NewQos}, _From, State) ->
 6     if_subsciption(State, fun() ->
 7         OldSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = OldQos},
 8         NewSub = #mqtt_subscription{subid = ClientId, topic = Topic, qos = NewQos},
 9         %% 使用事物
10         mnesia:transaction(fun update_subscription_/2, [OldSub, NewSub]),
11         set_subscription_stats()
12     end), ok(State);
13 
14 update_subscription_(OldSub, NewSub) ->
15     %% 刪除舊的 subscription
16     mnesia:delete_object(subscription, OldSub, write),
17     %% 寫入新的 subscription
18     mnesia:write(subscription, NewSub, write).

因為 subscription mnesia table的類型為bag,也就是一個clientid 可能會和多個topic 相對應,所以,不能依據key 進行delete,必須使用delete_object的方式。

創建subscribe

create subscribe的處理略微有些繞,不知道是作者有意而為之,還是其他什么原因。

首先,create subscribe的入口函數在emqttd module中,

-spec(subscribe(binary(), binary(), mqtt_qos()) -> {ok, mqtt_qos()}).
subscribe(ClientId, Topic, Qos) ->
    emqttd_server:subscribe(ClientId, Topic, Qos).

在此調用emqttd_server:subscribe/3 函數,並請求emqttd_server 進程,emqttd_server 進程調用handle_call callback 函數,處理請求。

 1 %% 外部接口
 2 -spec(subscribe(binary(), binary(), mqtt_qos()) -> ok).
 3 subscribe(ClientId, Topic, Qos) ->
 4     %% 這里的self 是emqttd_session 進程,這個調用是在emqttd_session 
 5     %% module 中的 handle_cast callback 發起的
 6     From = self(),
 7     call(server(From), {subscribe, From, ClientId, Topic, ?QOS_I(Qos)}).
 8 
 9 handle_call({subscribe, SubPid, ClientId, Topic, Qos}, _From, State) ->
10     %% call pubsub process
11     pubsub_subscribe_(SubPid, Topic),
12     if_subsciption(State, fun() ->
13         %% 將subscription 信息寫入到 subscription mnesia table 中
14         add_subscription_(ClientId, Topic, Qos),
15         set_subscription_stats()
16     end),
17     %% monitor session pid,當起DOWN 之后,去掉subscribe 並移除相關信息
18     ok(monitor_subscriber_(ClientId, SubPid, State));
19 
20 %% @private
21 %% @doc Call pubsub to subscribe
22 pubsub_subscribe_(SubPid, Topic) ->
23     case ets:match(subscribed, {SubPid, Topic}) of
24         [] ->
25             emqttd_pubsub:async_subscribe(Topic, SubPid),
26             ets:insert(subscribed, {SubPid, Topic});
27         [_] ->
28             false
29     end.

L26處,用了subscribed ets table,記錄session pid subscribe 的所有topic,這樣在 session pid DOWN的時候,就可以移除所有的topic 中session pid 相關的信息了。

而,emqttd_pubsub 同樣是由pool 組織的gen_server 進程。

 1 %% 外部接口,發起請求
 2 -spec(async_subscribe(binary(), pid()) -> ok).
 3 async_subscribe(Topic, SubPid) when is_binary(Topic) ->
 4     cast(pick(Topic), {subscribe, Topic, SubPid}).
 5 
 6 handle_cast({subscribe, Topic, SubPid}, State) ->
 7     %% 實際的處理函數
 8     add_subscriber_(Topic, SubPid),
 9     {noreply, setstats(State)};
10 
11 add_subscriber_(Topic, SubPid) ->
12     %% 檢查該Topic 是否已經存在
13     %% 若不存在,則先增加{Topic,Node}信息,為多node 場景服務
14     ...
15     ets:insert(subscriber, {Topic, SubPid}). %% 這里的subscriber 是一張ets table,接下來的publish 主要就是用的這張表 ********

至此,subscribe 操作的處理邏輯就ok了。

總結

應該也不需要,只是這代碼貼的有點多了。(示意圖待補)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM