從這篇開始,這一系列主要分析在開源社區中,Erlang 相關pool 的管理和使用.
在開源社區,Emysql 是Erlang 較為受歡迎的一個MySQL 驅動. Emysql 對pool 的管理和使用是非常典型的,pool 的管理角色中,主要有available(記錄當前pool 中可供使用的成員),locked(記錄當前pool 中正在被使用的成員),waiting(記錄當前正在處理等待該pool 的用戶).用戶進程在使用pool 過程中, pool 中的成員在這三個角色中來回遷移.
pool 數據結構
Emysql pool 的數據結構如下:
1 -record(pool, {pool_id :: atom(), 2 size :: number(), 3 user :: string(), 4 password :: string(), 5 host :: string(), 6 port :: number(), 7 database :: string(), 8 encoding :: utf8 | latin1 | {utf8, utf8_unicode_ci} | {utf8, utf8_general_ci}, 9 available=queue:new() :: queue(), 10 locked=gb_trees:empty() :: gb_tree(), 11 waiting=queue:new() :: queue(), 12 start_cmds=[] :: string(), 13 conn_test_period=0 :: number(), 14 connect_timeout=infinity :: number() | infinity, 15 warnings=false :: boolean()}).
L1 處的pool_id 為pool 的標識
L2 定義了pool 中成員的數量
L3 L4 為用以連接MySQL 數據庫的用戶名和密碼
L5 L6 L7 L8 為連接MySQL 數據庫的IP, 端口, 默認數據庫, 編碼
L9 用以記錄當前pool 中可用的成員
L10 用以記錄當前pool 中正在被使用的成員
L11 用以記錄當前等待pool 中成員的用戶
L12 為在與數據庫建立連接后的初始化命令
L14 是用於gen_tcp:connect 時的超時參數
pool 添加操作
在Emysql 項目中,emysql module 定義了所有外部操作的API, 其中添加操作的API有:
1, add_pool/2
2, add_pool/8
3, add_pool/9
以下代碼片段為add_pool 的實質性處理邏輯:
1 add_pool(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,port=Port, 2 database=Database,encoding=Encoding,start_cmds=StartCmds, 3 connect_timeout=ConnectTimeout,warnings=Warnings}=PoolSettings)-> 4 config_ok(PoolSettings), 5 case emysql_conn_mgr:has_pool(PoolId) of 6 true -> 7 {error,pool_already_exists}; 8 false -> 9 Pool = #pool{ 10 pool_id = PoolId, 11 size = Size, 12 user = User, 13 password = Password, 14 host = Host, 15 port = Port, 16 database = Database, 17 encoding = Encoding, 18 start_cmds = StartCmds, 19 connect_timeout = ConnectTimeout, 20 warnings = Warnings 21 }, 22 Pool2 = case emysql_conn:open_connections(Pool) of 23 {ok, Pool1} -> Pool1; 24 {error, Reason} -> throw(Reason) 25 end, 26 emysql_conn_mgr:add_pool(Pool2) 27 end.
處理邏輯主要有:
1, 確認參數的數據類型
2, 檢查當前是否已經有相同ID的 pool
3, 與MySQL server 建立connection
4, 在emysql_conn_mgr 中添加 該pool
在當前的Emysql 項目中,emysql_conn_mgr 是用來管理所有pool 的gen_server 進程. 對於所有的pool 而言,其內部成員的狀態管理, 都是由emysql_conn_mgr 調度的, 包括某個pool 中connection 成員的使用,歸還等.
確認參數的數據類型
確認參數的數據類型主要使用erlang:is_{{type}} bif func, 在Erlang VM 內存,對於每種數據類型都是以后綴來識別的.
1 config_ok(#pool{pool_id=PoolId,size=Size,user=User,password=Password,host=Host,port=Port, 2 database=Database,encoding=Encoding,start_cmds=StartCmds, 3 connect_timeout=ConnectTimeout,warnings=Warnings}) 4 when is_atom(PoolId), 5 is_integer(Size), 6 is_list(User), 7 is_list(Password), 8 is_list(Host), 9 is_integer(Port), 10 is_list(Database) orelse Database == undefined, 11 is_list(StartCmds), 12 is_integer(ConnectTimeout) orelse ConnectTimeout == infinity, 13 is_boolean(Warnings) -> 14 encoding_ok(Encoding); 15 config_ok(_BadOptions) -> 16 erlang:error(badarg). 17 18 encoding_ok(Enc) when is_atom(Enc) -> ok; 19 encoding_ok({Enc, Coll}) when is_atom(Enc), is_atom(Coll) -> ok; 20 encoding_ok(_) -> erlang:error(badarg).
因此, 這部分的操作足夠高效.以 is_list/1為例:
1 #define TAG_PRIMARY_LIST 0x1 2 #define is_list(x) (((x) & _TAG_PRIMARY_MASK) == TAG_PRIMARY_LIST) 3 #define is_not_list(x) (!is_list((x)))
與MySQL server 建立鏈接
此處與MySQL server 建立鏈接是調用emysql_conn module 中的open_connections 函數.
1 %% @doc Opens connections for the necessary pool. 2 %% 3 %% If connection opening fails, removes all connections from the pool 4 %% Does not remove pool from emysql_conn_mgr due to a possible deadlock. 5 %% Caller must do it by itself. 6 open_connections(Pool) -> 7 %-% io:format("open connections loop: .. "), 8 case (queue:len(Pool#pool.available) + gb_trees:size(Pool#pool.locked)) < Pool#pool.size of 9 true -> 10 case catch open_connection(Pool) of 11 #emysql_connection{} = Conn -> 12 open_connections(Pool#pool{available = queue:in(Conn, Pool#pool.available)}); 13 {'EXIT', Reason} -> 14 AllConns = lists:append( 15 queue:to_list(Pool#pool.available), 16 gb_trees:values(Pool#pool.locked) 17 ), 18 lists:foreach(fun emysql_conn:close_connection/1, AllConns), 19 {error, Reason} 20 end; 21 false -> 22 {ok, Pool} 23 end.
鏈接的總數為pool 結構中的size 字段(L8),成功建立鏈接后,將Conn 放入 available queue 中(L12).
在emysql_conn_mgr 中添加 pool
當pool 與MySQL server 建立鏈接完成后,需要將pool 添加到emysql_conn_mgr 中, 以便emysql_conn_mgr gen_server 進程對pool 進行管理.
添加pool add_pool/1 的操作:
1 add_pool(Pool) -> 2 do_gen_call({add_pool, Pool}). 3 4 ... 5 6 handle_call({add_pool, Pool}, _From, State) -> 7 case find_pool(Pool#pool.pool_id, State#state.pools) of 8 {_, _} -> 9 {reply, {error, pool_already_exists}, State}; 10 undefined -> 11 {reply, ok, State#state{pools = [Pool|State#state.pools]}} 12 end;
如果當前emysql_conn_mgr gen_server 進程中,並未記錄(L8)該pool 的話,就將該pool 添加到emysql_conn_mgr gen_server 進程的state 數據中(L11).
has_pool/1 的操作:
1 has_pool(Pool) -> 2 do_gen_call({has_pool, Pool}). 3 4 .... 5 6 handle_call({has_pool, PoolID}, _From, State) -> 7 case find_pool(PoolID, State#state.pools) of 8 {_, _} -> 9 {reply, true, State}; 10 undefined -> 11 {reply, false, State} 12 end;
以上,即為add_pool 操作的整個流程. emysql_conn_mgr gen_server 進程是管理pool 的非常重要的進程.
pool 使用管理
取出connection
在execute 執行一條SQL語句時, 用戶進程需要先請求emysql_conn_mgr gen_server 進程從給定pool_id 的pool 中取出一個成員.
1 execute(PoolId, Query, Args, Timeout) when (is_list(Query) orelse is_binary(Query)) andalso is_list(Args) andalso (is_integer(Timeout) orelse Timeout == infinity) -> 2 Connection = emysql_conn_mgr:wait_for_connection(PoolId), 3 monitor_work(Connection, Timeout, [Connection, Query, Args]);
1 wait_for_connection(PoolId ,Timeout)-> 2 %% try to lock a connection. if no connections are available then 3 %% wait to be notified of the next available connection 4 %-% io:format("~p waits for connection to pool ~p~n", [self(), PoolId]), 5 case do_gen_call({lock_connection, PoolId, true, self()}) of 6 unavailable -> 7 %-% io:format("~p is queued~n", [self()]), 8 receive 9 {connection, Connection} -> Connection 10 after Timeout -> 11 do_gen_call({abort_wait, PoolId}), 12 receive 13 {connection, Connection} -> Connection 14 after 15 0 -> exit(connection_lock_timeout) 16 end 17 end; 18 Connection -> 19 %-% io:format("~p gets connection~n", [self()]), 20 Connection 21 end.
然后對於wait_for_connection/2 函數的操作, 首先會調用emysql_conn_mgr gen_server handle_call 操作 lock_connection
而lock_next_connection 函數的主要功能是從pool 的available queue 中out 一個元素, 並monitor 調用進程(以防調用進程異常退出而沒有歸還conn).
1 lock_next_connection(Available ,Locked, Who) -> 2 case queue:out(Available) of 3 {{value, Conn}, OtherAvailable} -> 4 MonitorRef = erlang:monitor(process, Who), 5 NewConn = connection_locked_at(Conn, MonitorRef), 6 MonitorTuple = {MonitorRef, 7 {NewConn#emysql_connection.pool_id, NewConn#emysql_connection.id}}, 8 NewLocked = gb_trees:enter(NewConn#emysql_connection.id, NewConn, Locked), 9 {ok, NewConn, OtherAvailable, NewLocked, MonitorTuple}; 10 {empty, _} -> 11 unavailable 12 end.
L2 處會嘗試從available queue 中取出元素conn, 如果queue 此時不為空, emysql_conn_mgr 進程就會monitor (L4)用戶進程,然后將該conn gb_tress enter 到locked tree 中(L8).
在"無可用的conn"的情況下, emysql_conn_mgr gen_server 進程會將用戶進程寫入到pool 的waiting queue中, 並且返回'unavailable', 用戶進程就會等待conn 的其他使用者歸還conn .
歸還connection
在用戶使用完conn 之后,應該及時歸還給pool, 以防鏈接資源泄露.
在 emysql_conn_mgr module 中定義了pass_connection/1 函數以及在 handl_call callback 中實現了 handle_call({{replace_connection, Kind}, OldConn, NewConn}, _From, State).
在handle_call callback 函數中, 首先會從locked tree 中delete 掉該conn.然后從waiting queue 中取出之前等待的用戶進程ID, 將conn 發送給alive 的等待進程, 並更新locked tree waiting queue. 如果waiting queue 中無alive 等待進程, 就將conn 還回給available queue, 並更新相關的管理角色.
1 serve_waiting_pids(Waiting, Available, Locked, MonitorRefs) -> 2 case queue:is_empty(Waiting) of 3 false -> 4 Who = queue:get(Waiting), 5 case lock_next_connection(Available, Locked, Who) of 6 {ok, Connection, OtherAvailable, NewLocked, NewRef} -> 7 {{value, Pid}, OtherWaiting} = queue:out(Waiting), 8 case erlang:is_process_alive(Pid) of 9 true -> 10 erlang:send(Pid, {connection, Connection}), 11 serve_waiting_pids(OtherWaiting, OtherAvailable, NewLocked, [NewRef | MonitorRefs]); 12 _ -> 13 serve_waiting_pids(OtherWaiting, Available, Locked, MonitorRefs) 14 end; 15 unavailable -> 16 {Waiting, Available, Locked, MonitorRefs} 17 end; 18 true -> 19 {Waiting, Available, Locked, MonitorRefs} 20 end.
conn 使用進程退出
如果某用戶進程在從pool 中取出conn 使用, 但是在使用過程中, 用戶進程異常退出, 無法調用pass_connection/1 函數歸還conn , 就會出現資源泄露的問題.
在emysql_conn_mgr module 中, 是使用monitor 用戶進程的方式處理的. 在用戶進程獲得一個conn 之后, emysql_conn_mgr 會使用BIF erlang:monitor/2 函數 monitor 用戶進程.當用戶進程異常退出后,emysql_conn_mgr 進程就會收到'DOWN' 的message.然后在emysql_conn_mgr module 中的handle_info callback 函數中處理:
1 handle_info({'DOWN', MonitorRef, _, _, _}, State) -> 2 case dict:find(MonitorRef, State#state.lockers) of 3 {ok, {PoolId, ConnId}} -> 4 case find_pool(PoolId, State#state.pools) of 5 {Pool, _} -> 6 case gb_trees:lookup(ConnId, Pool#pool.locked) of 7 {value, Conn} -> async_reset_conn(State#state.pools, Conn); 8 _ -> ok 9 end; 10 _ -> 11 ok 12 end; 13 _ -> 14 ok 15 end, 16 {noreply, State};
可以看出,在emysql_conn_mgr 進程接收到'DOWN' 的message 之后, 會在進程dict 的locker 中 查找poolID和 connID, 繼而重置conn .
等待進程 abort_wait
某進程在獲取pool 中 conn 時, 在Timeout 之后, 用戶進程會調用abort_wait, emysql_conn_mgr 進程就會從waiting queue 中, 將不再等待的用戶進程remove
1 handle_call({abort_wait, PoolId}, {From, _Mref}, State) -> 2 case find_pool(PoolId, State#state.pools) of 3 {Pool, OtherPools} -> 4 %% Remove From from the wait queue 5 QueueNow = queue:filter( 6 fun(Pid) -> Pid =/= From end, 7 Pool#pool.waiting), 8 PoolNow = Pool#pool{ waiting = QueueNow }, 9 %% See if the length changed to know if From was removed. 10 OldLen = queue:len(Pool#pool.waiting), 11 NewLen = queue:len(QueueNow), 12 if 13 OldLen =:= NewLen -> 14 Reply = not_waiting; 15 true -> 16 Reply = ok 17 end, 18 {reply, Reply, State#state{pools=[PoolNow|OtherPools]}}; 19 undefined -> 20 {reply, {error, pool_not_found}, State} 21 end;
這樣的設計同樣是為了盡可能的保證避免資源的泄露,試想如果emysql_conn_mgr 進程將conn 發送給已經不再等待(不再需要)的進程,那該conn 就不可能再被歸還.
在大多數情況下,這種設計是可以保證conn 不會被發送給不再等待的用戶進程,但是在達成"gen_server 進程的處理是順序性"這樣共識的前提下考慮以下情況:
也就是在處理pass_connection , emysql_conn_mgr 進程從waiting queue 取出了ProcessA進程的同時, ProcessA 進程因為Timeout 調用了abort_wait 且exit 退出. emysql_conn_mgr 的順序性處理,使得只有在處理完pass_connection 之后, 才能處理abort_wait 操作. 最終導致的結果就是將conn 發送給已經exit (但是還沒有從waiting queue remove) 的用戶進程ProcessA, 是conn 不能再被歸還.
當這種情況出現的時候,就應該在發送conn 給 ProcessA 進程之前, 判斷ProcessA 是否alive(這個pr 已經被merged了).
總結
在Emysql 的pool 管理中,主要使用了:
1, available queue 記錄所有可用的pool 成員
2, locked tree 記錄所有正在被使用的pool 成員
3, waiting queue 記錄所有正在等待的用戶進程
4, monitor 所有正在使用pool 成員的用戶進程, 處理異常退出的case
5, 處理等待進程的abort_wait 請求, 更新waiting queue
6, 在發送pool 成員之前, 應該判斷用戶進程是否alive, 防止資源泄露
遺漏
現在的emysql_conn_mgr gen_server 進程屬於單點,也就是所有的pool 的管理調度都是由一個進程來完成.
------------------------------
覺得寫的還可以,就掃個碼,打個賞唄。