Cowboy 源碼分析(八)


  大家好,這篇文章又晚了幾天,這幾天公司的事情比較多,每天晚上都3點多才到家,今天終於稍微不忙,回到家吃個飯,繼續為大家帶來這個系列的第八篇。

  上一篇中,我們講到了 cowboy_acceptor:acceptor/7 方法里的這一行代碼:

  cowboy_listener:add_connection(ListenerPid, default, Pid, OptsVsn),

  我們來看下這個方法的這個參數分別是什么:

  ListenerPid 是cowboy_listener工作進程的進程標識;

  Pid 則是cowboy_requests_sup督程的子進程標識符;

  OptsVsn = 1;

  我們看下 cowboy_listener:add_connection/4 方法:

-spec add_connection(pid(), atom(), pid(), non_neg_integer())
    -> ok | {upgrade, any(), non_neg_integer()}.
add_connection(ServerPid, Pool, ConnPid, OptsVsn) ->
    gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn},
        infinity).

  ServerPid = ListenerPid;這里就是 cowboy_listener工作進程的進程標識,也就是當前模塊。

  Pool = default;

  ConnPid = Pid;

  OptsVsn = 1;

  我們看下 cowboy_listener:handle_call/3 對 上面發出的消息的處理:

%% @private
-spec handle_call(_, _, State)
    -> {reply, ignored, State} | {stop, normal, stopped, State}.
handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{
        req_pools=Pools, reqs_table=ReqsTable,
        queue=Queue, max_conns=MaxConns,
        proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) ->
    {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),
    State2 = State#state{req_pools=Pools2},
    if    AccOptsVsn =/= LisOptsVsn ->
            {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2};
        NbConns > MaxConns ->
            Queue2 = queue:in(From, Queue),
            {noreply, State2#state{queue=Queue2}};
        true ->
            {reply, ok, State2}
    end;

  不知道大家還記得 這個 gen_server 工作進程,是在什么時候啟動的?並且 init/1 又做了些什么,我們在之前的 Cowboy 源碼分析(五) 已經很詳細的介紹了,大家可以看下,回憶回憶,這里我就不重復再說了,我把初始化的方法再貼下,方便看:

%% @private
-spec init(list()) -> {ok, #state{}}.
init([MaxConns, ProtoOpts]) ->
    ReqsTable = ets:new(requests_table, [set, private]),
    Queue = queue:new(),
    {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns,
        proto_opts=ProtoOpts, queue=Queue}}.

  我們回到 cowboy_listener:handle_call/3 來分析下具體的邏輯:這個方法接受三個參數:

  如果你了解 gen_server OTP設計原則,你應該能知道這三個參數,分別是什么意思,{add_connection, Pool, ConnPid, AccOptsVsn} 這個參數是實際的請求發送過來的,也就是 gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, infinity). 的第二個參數;From 表示調用 gen_server:call/3 的進程標識符;State是gen_server的內部狀態,我們在init/1方法已經初始化了這個狀態。

  我們來重點看下這行代碼:

  {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),

  我們看下 add_pid/4 方法:

%% @private
-spec add_pid(pid(), atom(), pools(), ets:tid())
    -> {non_neg_integer(), pools()}.
add_pid(ConnPid, Pool, Pools, ReqsTable) ->
    MonitorRef = erlang:monitor(process, ConnPid),
    ConnPid ! {shoot, self()},
    {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of
        false ->
            {1, [{Pool, 1}|Pools]};
        {Pool, NbConns} ->
            NbConns2 = NbConns + 1,
            {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]}
    end,
    ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}),
    {NbConnsRet, Pools2}.

  首先是MonitorRef = erlang:monitor(process, ConnPid),

  erlang doc:http://www.erlang.org/doc/man/erlang.html#monitor-2

  這邊,我還查看了下 《erlang 編程指南》 148頁,關於這個函數的用法如下:

    為了單向監控進程,可以把內置函數 erlang:monitor/2 添加到 Erlang中,如下調用:

      erlang:monitor(process, Proc) 

    這樣就生成了一個調用進程到另一個標記為 Proc 進程的監控進程,Proc 可以是進程標識符也可以是注冊的名字。當帶有進程標識符的進程終止的時候,消息:

      {'DOWN', MonitorRef, Type, Object, Info} 會發送到監控進程。這個消息包含一個對監控進程的引用。

  這個方法就到這,大家具體看看書,我之前看了一遍,沒有理解,抽空再好好看看。

  我們接着看ConnPid ! {shoot, self()},還記得我們上一篇文章中,調用:cowboy_http_protocol:start_link/4 會新生成一個進程,並鏈接到調用進程,這個進程會 調用 init/4 方法,然后這個方法會停在 ok = cowboy:accept_ack(ListenerPid) 方法里,等待 {shoot, ListenerPid} 消息,同時 cowboy_http_protocol:start_link/4 這個方法會返回 {ok, Pid}.  這個 Pid 也就是后來傳遞到 cowboy_listener:add_pid/4 方法中的參數:ConnPid。當我們往這個進程標識發送一條消息:{shoot, self()}

  Pid 停在ok = cowboy:accept_ack(ListenerPid) 的方法,就會接收到消息 {shoot, ListenerPid},那么它就會繼續執行,也就是執行:cowboy_http_protocol:init/4 方法的這一行代碼:

wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport,
        dispatch=Dispatch, max_empty_lines=MaxEmptyLines,
        max_keepalive=MaxKeepalive, max_line_length=MaxLineLength,
        timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse,
        urldecode=URLDec}).

  其他參照代碼如下:

  cowboy_http_protocol

%% @doc Start an HTTP protocol process.
-spec start_link(pid(), inet:socket(), module(), any()) -> {ok, pid()}.
start_link(ListenerPid, Socket, Transport, Opts) ->
    Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]),
    {ok, Pid}.

%% FSM.

%% @private
-spec init(pid(), inet:socket(), module(), any()) -> ok.
init(ListenerPid, Socket, Transport, Opts) ->
    Dispatch = proplists:get_value(dispatch, Opts, []),
    MaxEmptyLines = proplists:get_value(max_empty_lines, Opts, 5),
    MaxKeepalive = proplists:get_value(max_keepalive, Opts, infinity),
    MaxLineLength = proplists:get_value(max_line_length, Opts, 4096),
    OnRequest = proplists:get_value(onrequest, Opts),
    OnResponse = proplists:get_value(onresponse, Opts),
    Timeout = proplists:get_value(timeout, Opts, 5000),
    URLDecDefault = {fun cowboy_http:urldecode/2, crash},
    URLDec = proplists:get_value(urldecode, Opts, URLDecDefault),
    ok = cowboy:accept_ack(ListenerPid),
    wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport,
        dispatch=Dispatch, max_empty_lines=MaxEmptyLines,
        max_keepalive=MaxKeepalive, max_line_length=MaxLineLength,
        timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse,
        urldecode=URLDec}).
-spec wait_request(#state{}) -> ok.
wait_request(State=#state{socket=Socket, transport=Transport,
        timeout=T, buffer=Buffer}) ->
    case Transport:recv(Socket, 0, T) of
        {ok, Data} -> parse_request(State#state{
            buffer= << Buffer/binary, Data/binary >>});
        {error, _Reason} -> terminate(State)
    end.

  cowboy

%% @doc Acknowledge the accepted connection.
%%
%% Effectively used to make sure the socket control has been given to
%% the protocol process before starting to use it.
-spec accept_ack(pid()) -> ok.
accept_ack(ListenerPid) ->
    receive {shoot, ListenerPid} -> ok end.

  好了,這篇文章,比較亂,沒關系,大家多看幾遍源碼,我會在以后的文章,嘗試用圖來描述,應該會更好一些,很抱歉,能力有限。

  最后,我將在下一篇繼續從 cowboy_http_protocol:wait_request/1 和 cowboy_listener:add_pid/4 來分解Cowboy代碼,敬請期待,謝謝。

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM