查看Curator
框架 為實現對 連接狀態ConnectionState
的管理與監聽是怎么構造的。后面我們也可以應用到業務的各種監聽中。
Curator2.13實現
接口 Listener
Listener
接口,給用戶實現stateChange()
傳入新的狀態,用戶實現對這新的狀態要做什么邏輯處理。
public interface ConnectionStateListener
{
/**
* Called when there is a state change in the connection
* @param client the client
* @param newState the new state
*/
public void stateChanged(CuratorFramework client, ConnectionState newState);
}
接口 Listenable
提供一個監聽對象容器的接口
// Abstracts a listenable object
public interface Listenable<T>
{
/**
* Add the given listener. The listener will be executed in the containing instance's thread.
*
* @param listener listener to add
*/
public void addListener(T listener);
public void addListener(T listener, Executor executor);
public void removeListener(T listener);
}
ListenerContainer<T> implements Listenable<T>
/**
* Abstracts an object that has listeners 裝Listener的容器
* <T> Listener類型
*/
public class ListenerContainer<T> implements Listenable<T>
{
private final Map<T, ListenerEntry<T>> listeners = Maps.newConcurrentMap();
@Override
public void addListener(T listener)
{
addListener(listener, MoreExecutors.sameThreadExecutor());
}
@Override
public void addListener(T listener, Executor executor)
{
listeners.put(listener, new ListenerEntry<T>(listener, executor));
}
/**
* 對 Listener 列表的遍歷進行封裝
* Utility - apply the given function to each listener.
* @param function function to call for each listener
*/
public void forEach(final Function<T, Void> function)
{
for ( final ListenerEntry<T> entry : listeners.values() )
{
entry.executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
function.apply(entry.listener);
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
}
}
}
);
}
}
public void clear()
{
listeners.clear();
}
public int size()
{
return listeners.size();
}
}
ConnectionStateManager
// to manage connection state
public class ConnectionStateManager {
// 又是隊列? 玩消息什么的都是用隊列。現在是存放 ConnectionState
BlockingQueue<ConnectionState> eventQueue = new ArrayBlockingQueue<ConnectionState>(QUEUE_SIZE);
// 持有 ListenerContainer
private final ListenerContainer<ConnectionStateListener> listeners = new ListenerContainer<ConnectionStateListener>();
/**
* Start the manager,起一個線程去執行 processEvents(),要是這線程掛了怎么辦?異常怎么處理的?框架怎么處理的。。
*/
public void start()
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
processEvents();
return null;
}
}
);
}
@Override
public void close()
{
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
service.shutdownNow();
listeners.clear();
}
}
// 對不斷產生的 ConnectionState 進行處理,生產者?
private void processEvents(){
// 當 ConnectionStateManager 啟動完成
while ( state.get() == State.STARTED )
{
// 不斷從隊列拿 Conection 狀態
final ConnectionState newState = eventQueue.take();
// 對每個 狀態監聽接口 應用 Function, 狀態監聽接口作為 主語
// forEach 是 listeners封裝的 遍歷所有 listener 的方法而已。。。
listeners.forEach(
new Function<ConnectionStateListener, Void>() {
// ConnectionStateListener是我們自己要實現的接口,stateChanged是要實現的方法
@Override
public Void apply(ConnectionStateListener listener)
{
listener.stateChanged(client, newState);
return null;
}
}
);
/**
上面這段
如果沒有封裝 Listener 到 ListenerContainer 的話, 所有 Listener 就是個 List列表,就直接調 Listener 的 stateChanged 方法了吧。
for Listener {
listener.stateChanged(client, newState);
}
因為 封裝 Listener 到 ListenerContainer了, 上面的 forEach 方法內部就可以有些內部實現,比如 對每個 Listener 都是用對應的 executor 來執行。
**/
}
}
// 上面的方法是處理 ConnectionState 的,那 ConnectionState 是怎么傳進來的呢? 生產者?
/**
* Post a state change. If the manager is already in that state the change
* is ignored. Otherwise the change is queued for listeners.
*
* @param newConnectionState new state
* @return true if the state actually changed, false if it was already at that state
*/
public synchronized boolean addStateChange(ConnectionState newConnectionState)
{
// 先判斷 ConnectionStateManager 是否已經啟動好, state 是內部 Enum
if ( state.get() != State.STARTED )
{
return false;
}
ConnectionState previousState = currentConnectionState;
if ( previousState == newConnectionState )
{
return false;
}
ConnectionState localState = newConnectionState;
// !!!
notifyAll();
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
log.warn("ConnectionStateManager queue full - dropping events to make room");
}
return true;
}
}
調用
啟動
// 啟動 connectionStateManager,不斷檢測 connectionState 變化
connectionStateManager.start(); // must be called before client.start()
// 來個匿名默認的 ConnectionStateListener
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
};
this.getConnectionStateListenable().addListener(listener);
生產 ConnectionState
,把zk
那里拿到的state
轉一下,然后addStateChange
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
suspendConnection();
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
復用?
還有其他各種Listener
,都可以放到 ListenerContainer
private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
/**
* Receives notifications about errors and background events
*/
public interface CuratorListener {
/**
* Called when a background task has completed or a watch has triggered
* @param event the event
* @throws Exception any errors
*/
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}
public interface UnhandledErrorListener
{
/**
* Called when an exception is caught in a background thread, handler, etc. Before this
* listener is called, the error will have been logged and a {@link ConnectionState#LOST}
* event will have been queued for any {@link ConnectionStateListener}s.
* @param message Source message
* @param e exception
*/
public void unhandledError(String message, Throwable e);
}
總結一下源碼技巧
ConnectionState
的監聽和管理在類ConnectionStateManager
中, 就是個 生產者消費者模式的代碼,特點就是:public addStateChange()
暴露給外部用戶生產ConnectionState
,通過隊列eventQueue
傳遞,private processEvents()
在內部對ConnectionState
進行消費。- 直接
new
匿名類,對接口進行默認實現。 - 對
Listener
列表對象進行Container
封裝,然后 封裝foreach
方法,傳入Function
接口 就是foreach
每個元素要執行的業務邏輯,方法體就可以加一些其他福利。
Curator4.2.0實現
curator4.2.0對ConnectionStateListener
進行了一些改進,如下:
1. 裝飾ConnectionStateListener,Ccurator-505
當網絡斷開時,zk會傳送很多 connection/disconnection event
,為防止 Curator
因此而不斷reseting state
,該issue提供了 一種對監聽回路
進行切斷或者連接的ConnectionStateListener
, 它會對ConnectionStateListeners
進行裝飾代理。當它接受到 ConnectionState.SUSPENDED
,監聽回路會切斷變成open
狀態,這段時間會組織發送state
給所有Listener
,就忽略了對connection state
的改變。時間到期后監聽回路會變成close
狀態,並且發送當前的connection state
。簡而言之,當不斷出現up/down/up/down/up/down
的中間狀態,使用者將只看到first down
,然后N ms之后連接能夠修復的話將只看到重連狀態。
Create a circuit breaker style
ConnectionStateListener
. It would proxy any ConnectionStateListeners used by Curator recipe/classes such that when the connection is lost the circuit would open for a period of time and, while open, ignore any changes in state. After the time period expires the circuit would close and send whatever the current connection state is. This way, if the connection is going up/down/up/down/up/down, the application would only see the first down and then N ms later hopefully the connection is repaired and the application would only see the reconnection.
curator-505,結合源碼中的testcase更易於理解。
接口ConnectionStateListenerDecorator
該接口用來統一返回是否裝飾的ConnectionStateListener
public interface ConnectionStateListenerDecorator
{
ConnectionStateListener decorateListener(CuratorFramework client, ConnectionStateListener actual);
// 不對 ConnectionStateListener 進行裝飾
ConnectionStateListenerDecorator standard = (__, actual) -> actual;
/**
* Decorates the listener with circuit breaking behavior
* @param retryPolicy the circuit breaking policy to use
* @return new decorator
*/
static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy) {
return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy);
}
/**
* Decorates the listener with circuit breaking behavior
* @param retryPolicy the circuit breaking policy to use
* @param service the scheduler to use
* @return new decorator
*/
static ConnectionStateListenerDecorator circuitBreaking(RetryPolicy retryPolicy, ScheduledExecutorService service)
{
return (client, actual) -> new CircuitBreakingConnectionStateListener(client, actual, retryPolicy, service);
}
}
CircuitBreakingConnectionStateListener
對ConnectionStateListener
進行裝飾:正常情況下,decorator
為close
為連接狀態,會將state
傳給所有Listener
。當decorator
接收到第一個disconnected state
,監聽回路變為open
即斷開狀態,不再將state
傳給各Listener
(第一個disconnected state
狀態會傳),而是decorator
起線程來通過RetryPolicy
進行重試,當重試次數達到一定程度會將close
斷路 。
public class CircuitBreakingConnectionStateListener implements ConnectionStateListener
{
// 對該 Listener 進行裝飾
private final ConnectionStateListener listener;
private final CircuitBreaker circuitBreaker;
// guarded by sync
private boolean circuitLostHasBeenSent;
private ConnectionState circuitLastState;
private ConnectionState circuitInitialState;
// 同步方法?
@Override
public synchronized void stateChanged(CuratorFramework client, ConnectionState newState)
{
// 斷路器被打開了,newState 將不傳給 Listener 處理
if ( circuitBreaker.isOpen() ) {
handleOpenStateChange(newState);
} else {
handleClosedStateChange(newState);
}
}
private synchronized void handleOpenStateChange(ConnectionState newState)
{
if ( circuitLostHasBeenSent || (newState != ConnectionState.LOST) )
{
// Circuit is open. Ignoring state change !!狀態變化都自己收了。。
circuitLastState = newState;
}
else
{
// Circuit is open. State changed to LOST. Sending to listener (第一次 Lost 信息還是會傳給 Listener,也就是用個boolean來判斷)
circuitLostHasBeenSent = true;
circuitLastState = circuitInitialState = ConnectionState.LOST;
callListener(ConnectionState.LOST);
}
}
// 正常狀態下走這里,將state傳給Listener
private synchronized void handleClosedStateChange(ConnectionState newState)
{
// 接收到不是connection狀態的就嘗試打開斷路器
if ( !newState.isConnected() ) {
// checkCloseCircuit 作為 調Runnable的匿名函數
if ( circuitBreaker.tryToOpen(this::checkCloseCircuit) )
{
circuitLastState = circuitInitialState = newState;
circuitLostHasBeenSent = (newState == ConnectionState.LOST);
}
else{
log.debug("Could not open circuit breaker. State: {}", newState);
}
}
callListener(newState);
}
// 通過線程池執行該方法,作為Runnable 匿名函數傳入
private synchronized void checkCloseCircuit() {
// 連接上了就關閉斷路
if ( (circuitLastState == null) || circuitLastState.isConnected() ) {
closeCircuit();
}
// 沒連接上,還在 retry 的時候就 保持斷路
else if ( circuitBreaker.tryToRetry(this::checkCloseCircuit) ) {
// Circuit open is continuing due to retry
}
// 經過 retryPolicy 的 不斷 retry后還沒連上,累了,就關閉斷路
else {
// Circuit is closing due to retries exhausted
closeCircuit();
}
}
}
2. 重構 ListenerContainer
ListenerManager
這個ListenerManager
就是從ListenerContainer
演進而來的,PR上是說為了不再依賴Guava
並且支持mappingmapping
。和原來一樣也封裝了對Listener
列表遍歷的forEach(Consumer<V> function)
/**
* Upgraded version of {@link org.apache.curator.framework.listen.ListenerContainer} that
* doesn't leak Guava's internals and also supports mapping/wrapping of listeners
*/
public class MappingListenerManager<K, V> implements ListenerManager<K, V> {
@Override
public void forEach(Consumer<V> function)
{
for ( ListenerEntry<V> entry : listeners.values() )
{
entry.executor.execute(() -> {
try {
function.accept(entry.listener);
}
catch ( Throwable e ) {
ThreadUtils.checkInterrupted(e);
log.error(String.format("Listener (%s) threw an exception", entry.listener), e);
}
});
}
}
}
源碼技巧
- 斷路器?就是裝飾器模式,增加了一些額外的
if-else
邏輯。 - 將原來Guava的
Function
換成了原生的Consumer
,實現傳入function
進行處理的邏輯。