大家好,這篇文章又晚了幾天,這幾天公司的事情比較多,每天晚上都3點多才到家,今天終於稍微不忙,回到家吃個飯,繼續為大家帶來這個系列的第八篇。
上一篇中,我們講到了 cowboy_acceptor:acceptor/7 方法里的這一行代碼:
cowboy_listener:add_connection(ListenerPid, default, Pid, OptsVsn),
我們來看下這個方法的這個參數分別是什么:
ListenerPid 是cowboy_listener工作進程的進程標識;
Pid 則是cowboy_requests_sup督程的子進程標識符;
OptsVsn = 1;
我們看下 cowboy_listener:add_connection/4 方法:
-spec add_connection(pid(), atom(), pid(), non_neg_integer()) -> ok | {upgrade, any(), non_neg_integer()}. add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, infinity).
ServerPid = ListenerPid;這里就是 cowboy_listener工作進程的進程標識,也就是當前模塊。
Pool = default;
ConnPid = Pid;
OptsVsn = 1;
我們看下 cowboy_listener:handle_call/3 對 上面發出的消息的處理:
%% @private -spec handle_call(_, _, State) -> {reply, ignored, State} | {stop, normal, stopped, State}. handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ req_pools=Pools, reqs_table=ReqsTable, queue=Queue, max_conns=MaxConns, proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable), State2 = State#state{req_pools=Pools2}, if AccOptsVsn =/= LisOptsVsn -> {reply, {upgrade, ProtoOpts, LisOptsVsn}, State2}; NbConns > MaxConns -> Queue2 = queue:in(From, Queue), {noreply, State2#state{queue=Queue2}}; true -> {reply, ok, State2} end;
不知道大家還記得 這個 gen_server 工作進程,是在什么時候啟動的?並且 init/1 又做了些什么,我們在之前的 Cowboy 源碼分析(五) 已經很詳細的介紹了,大家可以看下,回憶回憶,這里我就不重復再說了,我把初始化的方法再貼下,方便看:
%% @private -spec init(list()) -> {ok, #state{}}. init([MaxConns, ProtoOpts]) -> ReqsTable = ets:new(requests_table, [set, private]), Queue = queue:new(), {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns, proto_opts=ProtoOpts, queue=Queue}}.
我們回到 cowboy_listener:handle_call/3 來分析下具體的邏輯:這個方法接受三個參數:
如果你了解 gen_server OTP設計原則,你應該能知道這三個參數,分別是什么意思,{add_connection, Pool, ConnPid, AccOptsVsn} 這個參數是實際的請求發送過來的,也就是 gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, infinity). 的第二個參數;From 表示調用 gen_server:call/3 的進程標識符;State 則是gen_server的內部狀態,我們在init/1方法已經初始化了這個狀態。
我們來重點看下這行代碼:
{NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable),
我們看下 add_pid/4 方法:
%% @private -spec add_pid(pid(), atom(), pools(), ets:tid()) -> {non_neg_integer(), pools()}. add_pid(ConnPid, Pool, Pools, ReqsTable) -> MonitorRef = erlang:monitor(process, ConnPid), ConnPid ! {shoot, self()}, {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of false -> {1, [{Pool, 1}|Pools]}; {Pool, NbConns} -> NbConns2 = NbConns + 1, {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} end, ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), {NbConnsRet, Pools2}.
首先是:MonitorRef = erlang:monitor(process, ConnPid),
erlang doc:http://www.erlang.org/doc/man/erlang.html#monitor-2
這邊,我還查看了下 《erlang 編程指南》 148頁,關於這個函數的用法如下:
為了單向監控進程,可以把內置函數 erlang:monitor/2 添加到 Erlang中,如下調用:
erlang:monitor(process, Proc)
這樣就生成了一個調用進程到另一個標記為 Proc 進程的監控進程,Proc 可以是進程標識符也可以是注冊的名字。當帶有進程標識符的進程終止的時候,消息:
{'DOWN', MonitorRef, Type, Object, Info} 會發送到監控進程。這個消息包含一個對監控進程的引用。
這個方法就到這,大家具體看看書,我之前看了一遍,沒有理解,抽空再好好看看。
我們接着看:ConnPid ! {shoot, self()},還記得我們上一篇文章中,調用:cowboy_http_protocol:start_link/4 會新生成一個進程,並鏈接到調用進程,這個進程會 調用 init/4 方法,然后這個方法會停在 ok = cowboy:accept_ack(ListenerPid) 方法里,等待 {shoot, ListenerPid} 消息,同時 cowboy_http_protocol:start_link/4 這個方法會返回 {ok, Pid}. 這個 Pid 也就是后來傳遞到 cowboy_listener:add_pid/4 方法中的參數:ConnPid。當我們往這個進程標識發送一條消息:{shoot, self()}。
Pid 停在ok = cowboy:accept_ack(ListenerPid) 的方法,就會接收到消息 {shoot, ListenerPid},那么它就會繼續執行,也就是執行:cowboy_http_protocol:init/4 方法的這一行代碼:
wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport, dispatch=Dispatch, max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, max_line_length=MaxLineLength, timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse, urldecode=URLDec}).
其他參照代碼如下:
cowboy_http_protocol:
%% @doc Start an HTTP protocol process. -spec start_link(pid(), inet:socket(), module(), any()) -> {ok, pid()}. start_link(ListenerPid, Socket, Transport, Opts) -> Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]), {ok, Pid}. %% FSM. %% @private -spec init(pid(), inet:socket(), module(), any()) -> ok. init(ListenerPid, Socket, Transport, Opts) -> Dispatch = proplists:get_value(dispatch, Opts, []), MaxEmptyLines = proplists:get_value(max_empty_lines, Opts, 5), MaxKeepalive = proplists:get_value(max_keepalive, Opts, infinity), MaxLineLength = proplists:get_value(max_line_length, Opts, 4096), OnRequest = proplists:get_value(onrequest, Opts), OnResponse = proplists:get_value(onresponse, Opts), Timeout = proplists:get_value(timeout, Opts, 5000), URLDecDefault = {fun cowboy_http:urldecode/2, crash}, URLDec = proplists:get_value(urldecode, Opts, URLDecDefault), ok = cowboy:accept_ack(ListenerPid), wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport, dispatch=Dispatch, max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, max_line_length=MaxLineLength, timeout=Timeout, onrequest=OnRequest, onresponse=OnResponse, urldecode=URLDec}).
-spec wait_request(#state{}) -> ok. wait_request(State=#state{socket=Socket, transport=Transport, timeout=T, buffer=Buffer}) -> case Transport:recv(Socket, 0, T) of {ok, Data} -> parse_request(State#state{ buffer= << Buffer/binary, Data/binary >>}); {error, _Reason} -> terminate(State) end.
cowboy:
%% @doc Acknowledge the accepted connection. %% %% Effectively used to make sure the socket control has been given to %% the protocol process before starting to use it. -spec accept_ack(pid()) -> ok. accept_ack(ListenerPid) -> receive {shoot, ListenerPid} -> ok end.
好了,這篇文章,比較亂,沒關系,大家多看幾遍源碼,我會在以后的文章,嘗試用圖來描述,應該會更好一些,很抱歉,能力有限。
最后,我將在下一篇繼續從 cowboy_http_protocol:wait_request/1 和 cowboy_listener:add_pid/4 來分解Cowboy代碼,敬請期待,謝謝。