集群為了保證數據一致性,在同步數據的同時也會通過節點之間的心跳通信來保證對方存活。那如果集群節點通信異常會發生什么,系統如何保障正常提供服務,使用何種策略回復呢?
rabbitmq提供的處理腦裂的方法有兩種:autoheal、pause_minority.
autoheal指的是在出現腦裂且恢復時采用分區中與客戶端連接數最多的一個分區來作為winner,並將所有的losers分區重啟。
pause_miniroty指的是在出現腦裂后判斷自己是否為眾數者majority,即自己所在分區是否為總節點數的一半以上length(AliveNodes) / length(Nodes) > 0.5,如果屬於眾數者則正常工作,否則做rabbit:stop()操作,並以1秒周期查詢自己是否屬於眾數者。
下面主要介紹下autoheal的實現原理。
首先在rabbit.erl中根據啟動流程中可以看到
-rabbit_boot_step({rabbit_node_monitor, [{description, "node monitor"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_node_monitor]}}, {requires, [rabbit_alarm, guid_generator]}, {enables, core_initialized}]}).
啟動並注冊系統事件來獲取(rabbit_node_monitor.erl)
{ok, _} = mnesia:subscribe(system),
當環境中出現分區恢復后,mneisa會收到{nodeup, Node},並根據此節點來獲取遠端有記錄mnesia down過的節點,當兩個節點都認為對方出現過down的情況下,即會發送{inconsistent_database, Context, Node}系統事件【1】,並在所有節點都起來的情況下開始autoheal, rabbit_node_monitor.erl。
handle_info({mnesia_system_event, {inconsistent_database, running_partitioned_network, Node}}, State = #state{partitions = Partitions, monitors = Monitors}) -> %% We will not get a node_up from this node - yet we should treat it as %% up (mostly). State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of true -> State; false -> State#state{ monitors = pmon:monitor({rabbit, Node}, Monitors)} end, ok = handle_live_rabbit(Node), Partitions1 = lists:usort([Node | Partitions]), {noreply, maybe_autoheal(State1#state{partitions = Partitions1})};
maybe_autoheal(State = #state{autoheal = AState}) -> case all_nodes_up() of true -> State#state{autoheal = rabbit_autoheal:maybe_start(AState)}; false -> State end.
利用autoheal處理腦裂的時候,先在節點中找到一個leader,然后這個leader來公正地決定勝負。
maybe_start(not_healing) -> case enabled() of true -> Leader = leader(), send(Leader, {request_start, node()}), rabbit_log:info("Autoheal request sent to ~p~n", [Leader]), not_healing; false -> not_healing end;
針對獲取到的所有分區開始做決定以選擇分區中的winner和losers,決策原則是根據各個分區中所擁有的連接數的個數來確定的,如果連接數相同,則選擇分區內節點最多的分區。
make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), {Winner, lists:append(Rest)}. partition_value(Partition) -> Connections = [Res || Node <- Partition, Res <- [rpc:call(Node, rabbit_networking, connections_local, [])], is_list(Res)], {length(lists:append(Connections)), length(Partition)}.
之后loser會做重啟,可能會因此而導致數據丟失。
參考文獻
1. mnesia之inconsistent_database. http://my.oschina.net/hncscwc/blog/174416
2. rabbitmq對network partition的處理. http://my.oschina.net/hncscwc/blog/174417