文/朱季謙
Dubbo如何實現優雅下線?
這個問題困擾了我一陣,既然有優雅下線這種說法,那么,是否有非優雅下線的說法呢?
這,還真有。
可以從linux進程關閉說起,其實,我們經常使用到殺進程的指令背后,就涉及到是否優雅下線的理念。
在日常開發當中,經常用到kill來關掉正在運行的進程,可能你曾看到過一些文章是不推薦使用kill -9 pid的指令來刪除進程。當執行該執行時,系統會發出一個SIGKILL信號給將被關掉的進程,接收到該信號的進程,都立即結束運行,假如此時內部仍有請求還沒有執行完,那怎么辦?你想,整個進程都被立即殺死了,線程作為進程里的某一部分,還能活嗎?
打個比方,假如你正在吃東西,物業突然打電話給你,說房子立馬就要被炸掉了,你必須立馬關門離開,這時,你只能把還沒吃完的飯丟下,什么貴重的東西都來不及打理,立馬就被迫關門跑路了。
這樣強制執行的后果,可能就會造成一些貴重東西的丟失。
這種,就屬於非優雅下線,簡單,粗暴,不管三七二十一,統統停止關閉。
一般而言,是不推薦使用kill -9 pid來強制殺死進程。
在線上環境,用到更多的,是kill pid指令,這個指令,等同於kill -15 pid指令,因此,當你在網上看到一些介紹kill -15 pid指令時,不用糾結好像沒用到過,其實,就是你用到最多的kill pid指令。使用這個指令時,系統會對pid進程發送一個SIGTERM信號,就像給pid打了一個電話,告訴他,你的房子就要到期了,麻煩快點清理好東西搬走。這時,你仍有充裕的時間,把自己的東西打包好,好好清理下房間,沒問題了,再搬出去。
換到具體程序代碼中,就是執行kill pid指令后,該程序不會立馬被強制關閉,而是會接受到一個通知,可以在這個通知方法內,做一些清理操作,若是Dubbo容器,則可以關閉zookeeper注冊,暫停新的請求,可以把已經執行一半的請求先執行完成,等等。
這種下線操作,就屬於優雅下線。
指令kill -15 pid是操作系統級別的優雅下線操作,那么,在具體進程當中,是如何根據SIGTERM信號來進行具體的優雅下線處理呢?
在Dubbo官網上,關於優雅停機的操作有相關介紹:
優雅停機
Dubbo 是通過 JDK 的 ShutdownHook 來完成優雅停機的,所以如果用戶使用 kill -9 PID
等強制關閉指令,是不會執行優雅停機的,只有通過 kill PID
時,才會執行。
原理
服務提供方
- 停止時,先標記為不接收新請求,新請求過來時直接報錯,讓客戶端重試其它機器。
- 然后,檢測線程池中的線程是否正在運行,如果有,等待所有線程執行完成,除非超時,則強制關閉。
服務消費方
- 停止時,不再發起新的調用請求,所有新的調用在客戶端即報錯。
- 然后,檢測有沒有請求的響應還沒有返回,等待響應返回,除非超時,則強制關閉。
設置方式
設置優雅停機超時時間,缺省超時時間是 10 秒,如果超時則強制關閉。
# dubbo.properties
dubbo.service.shutdown.wait=15000
如果 ShutdownHook 不能生效,可以自行調用,使用tomcat等容器部署的場景,建議通過擴展ContextListener等自行調用以下代碼實現優雅停機:
ProtocolConfig.destroyAll();
根據以上信息可以得知,其實Dubbo的優雅實現其實是依賴了JVM的ShutdownHook來實現的,JDK提供了一個在JVM關閉時會執行的方法,可以在該方法當中,執行ProtocolConfig.destroyAll()來實現Dubbo的優雅停機操作,而這個JDK的 ShutdownHook方法,正是在系統執行kill -15 pid時,會執行的方法,這樣,我們就可以在該方法里做一些關閉前的清理工作了。
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
ProtocolConfig.destroyAll();
}));
這幾行代碼具體都實現了什么呢?
簡單而言,這里通過JDK注冊了一個shutdownHook鈎子函數,一旦應用停機就會觸發該方法,進而執行ProtocolConfig.destroyAll()。
這個ProtocolConfig.destroyAll()源碼如下:
public static void destroyAll() {
//1.注銷注冊中心
AbstractRegistryFactory.destroyAll();
ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
Iterator var1 = loader.getLoadedExtensions().iterator();
// 2.循環獲取存活的協議
while(var1.hasNext()) {
String protocolName = (String)var1.next();
try {
Protocol protocol = (Protocol)loader.getLoadedExtension(protocolName);
if (protocol != null) {
//關閉暴露協議
protocol.destroy();
}
} catch (Throwable var4) {
logger.warn(var4.getMessage(), var4);
}
這個destroyAll()里邊主要做了兩件事:
- 首先注銷注冊中心,即斷開與注冊中心的連接,Dubbo注冊到ZK的是臨時節點,故而當連接斷開后,臨時節點及底下的數據就會被自動刪除;
- 關閉provider和consumer暴露的協議接口,這樣,新的請求就無法再繼續進行;
下面主要按照這兩個模塊大體介紹下其底層邏輯:
一、注銷注冊中心
public static void destroyAll() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Close all registries " + getRegistries());
}
//加鎖,防止關閉多次
LOCK.lock();
try {
Iterator var0 = getRegistries().iterator();
//關閉所有已創建的注冊中心
while(var0.hasNext()) {
Registry registry = (Registry)var0.next();
try {
registry.destroy();
} catch (Throwable var6) {
LOGGER.error(var6.getMessage(), var6);
}
}
REGISTRIES.clear();
} finally {
//釋放鎖
LOCK.unlock();
}
}
首先獲取到所有的注冊中心連接,封裝成迭代器模式
Iterator var0 = getRegistries().iterator();
接下來,迭代獲取每一個注冊連接對象進行關閉:
registry.destroy();
該destroy方法定義在接口Node當中,其具體實現將會在對應的Dubbo注冊對象里:
public interface Node {
URL getUrl();
boolean isAvailable();
void destroy();
}
這里Dubbo使用的注冊中心是Zookeeper,故而destroy會在ZookeeperRegistry類中具體實現:
進入到ZookeeperRegistry類,找到registry.destroy()對應的destroy()方法,可以看到,調用destroy(),其本質是關閉zk客戶端連接,當客戶端關閉之后,其注冊到zk里的生產者或者消費者信息,都會被自動刪除。
public void destroy() {
super.destroy();
try {
// 關閉zk客戶端
this.zkClient.close();
} catch (Exception var2) {
logger.warn("Failed to close zookeeper client " + this.getUrl() + ", cause: " + var2.getMessage(), var2);
}
}
在這里,還有一個需要進一步研究的地方,即 super.destroy(),這個方法實現了什么功能呢?從源碼當中,可以看出,其有一行這樣的 this.retryFuture.cancel(true)代碼,這行代碼大概意思是,將失敗重試取消方式設置為true,即取消了失敗重試的操作,我的理解是,這里是關閉了失敗重試,可以在下線過程當中,避免出現因RPC生產者接口缺少而發生反復的失敗重試操作,因為到這一步,已經不需要再有失敗重試的操作了。
public void destroy() {
//移除內存中已經注冊的服務,取消所有服務訂閱
super.destroy();
try {
//取消失敗重試
this.retryFuture.cancel(true);
} catch (Throwable var2) {
this.logger.warn(var2.getMessage(), var2);
}
}
注意一點,這里在取消失敗重試機制之前,還執行了一行 super.destroy()代碼,這行代碼的主要功能包括兩個:
第一是移除內存中已經注冊的服務,第二是取消所有服務訂閱。
我們先來看一下其方法詳情:
public void destroy() {
if (this.logger.isInfoEnabled()) {
this.logger.info("Destroy registry:" + this.getUrl());
}
// 1.移除內存中已經注冊的服務
Set<URL> destroyRegistered = new HashSet(this.getRegistered());
if (!destroyRegistered.isEmpty()) {
Iterator var2 = (new HashSet(this.getRegistered())).iterator();
while(var2.hasNext()) {
URL url = (URL)var2.next();
if (url.getParameter("dynamic", true)) {
try {
this.unregister(url);
if (this.logger.isInfoEnabled()) {
this.logger.info("Destroy unregister url " + url);
}
} catch (Throwable var10) {
this.logger.warn("Failed to unregister url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var10.getMessage(), var10);
}
}
}
}
//2.取消所有的服務訂閱
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap(this.getSubscribed());
if (!destroySubscribed.isEmpty()) {
Iterator var12 = destroySubscribed.entrySet().iterator();
while(var12.hasNext()) {
Map.Entry<URL, Set<NotifyListener>> entry = (Map.Entry)var12.next();
URL url = (URL)entry.getKey();
Iterator var6 = ((Set)entry.getValue()).iterator();
while(var6.hasNext()) {
NotifyListener listener = (NotifyListener)var6.next();
try {
this.unsubscribe(url, listener);
if (this.logger.isInfoEnabled()) {
this.logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable var9) {
this.logger.warn("Failed to unsubscribe url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var9.getMessage(), var9);
}
}
}
}
}
1.移除內存中已經注冊的服務
// 1.移除內存中已經注冊的服務
Set<URL> destroyRegistered = new HashSet(this.getRegistered());
if (!destroyRegistered.isEmpty()) {
Iterator var2 = (new HashSet(this.getRegistered())).iterator();
while(var2.hasNext()) {
URL url = (URL)var2.next();
if (url.getParameter("dynamic", true)) {
try {
this.unregister(url);
if (this.logger.isInfoEnabled()) {
this.logger.info("Destroy unregister url " + url);
}
} catch (Throwable var10) {
this.logger.warn("Failed to unregister url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var10.getMessage(), var10);
}
}
}
}
這部分代碼主要是將內存當中的注冊信息移除,這部分緩存記錄,是在容器啟動時,當向注冊中心訂閱成功后,會同步緩存一份到內存當中。可見,若注冊中心掛掉了,Dubbo仍然可以通過緩存獲取到遠程RPC服務,但是無法獲取到新增的RPC服務。
這里主要分析兩個方法:this.getRegistered()和 this.unregister(url)。
this.getRegistered()——
private final Set<URL> registered = new ConcurrentHashSet();
public Set<URL> getRegistered() {
return this.registered;
}
這是獲取緩存URL的集合。
this.unregister(url)——
public void unregister(URL url) {
if (url == null) {
throw new IllegalArgumentException("unregister url == null");
} else {
if (this.logger.isInfoEnabled()) {
this.logger.info("Unregister: " + url);
}
this.registered.remove(url);
}
}
這是將URL從Set集合當中移除的操作。這部分代碼其實我有點想明白,為何還需要從Set獲取到所有URL,然后再通過迭代器方式一個一個取出去進行移除,直接將Set置空不是更好些嗎?當然,這里面應該還有一些我沒有考慮到的細節,還有待進一步進行研究。
2.取消所有服務訂閱
//2.取消所有的服務訂閱
Map<URL, Set<NotifyListener>> destroySubscribed = new HashMap(this.getSubscribed());
if (!destroySubscribed.isEmpty()) {
Iterator var12 = destroySubscribed.entrySet().iterator();
while(var12.hasNext()) {
Map.Entry<URL, Set<NotifyListener>> entry = (Map.Entry)var12.next();
URL url = (URL)entry.getKey();
Iterator var6 = ((Set)entry.getValue()).iterator();
while(var6.hasNext()) {
NotifyListener listener = (NotifyListener)var6.next();
try {
this.unsubscribe(url, listener);
if (this.logger.isInfoEnabled()) {
this.logger.info("Destroy unsubscribe url " + url);
}
} catch (Throwable var9) {
this.logger.warn("Failed to unsubscribe url " + url + " to registry " + this.getUrl() + " on destroy, cause: " + var9.getMessage(), var9);
}
}
}
}
這部分邏輯與移除內存url都很類型,都是先從緩存里把所有訂閱信息都取出來,然后再跌代移除。
二、關閉protocol協議
這部分個關閉,主要是關閉provider和consumer,即對應前邊提到的,服務提供方會先標記不再接受新請求,新請求過來直接報錯,然后,檢查線程池中的線程是否還在運行,如果有,等待線程完成,若超時,則強制關閉;服務消費者則不再發起新請求,同時檢測看還有沒有請求的響應沒有返回,若有,等待返回,若超時,則強制關閉。
下面大概分析一下其源碼邏輯。
protocol.destroy(),其方法在接口里定義,具體實現是在RegistryProtocol當中。
@SPI("dubbo")
public interface Protocol {
int getDefaultPort();
@Adaptive
<T> Exporter<T> export(Invoker<T> var1) throws RpcException;
@Adaptive
<T> Invoker<T> refer(Class<T> var1, URL var2) throws RpcException;
void destroy();
}
RegistryProtocol的具體實現如下:
public void destroy() {
List<Exporter<?>> exporters = new ArrayList(this.bounds.values());
Iterator var2 = exporters.iterator();
while(var2.hasNext()) {
Exporter<?> exporter = (Exporter)var2.next();
exporter.unexport();
}
this.bounds.clear();
}
這里的核心方法是exporter.unexport(),根據命名就可以推測出,大概就是說不暴露對外接口協議的方法,也就是關閉那些對外暴露的服務。
該exporter.unexport()方法具體實現有兩類,一個是DubboExporter
AbstractExporter
public void unexport() {
if (!this.unexported) {
this.unexported = true;
this.getInvoker().destroy();
}
}
this.getInvoker().destroy()的實現如下:
public void destroy() {
Iterator var1 = (new ArrayList(this.serverMap.keySet())).iterator();
String key;
//關停所有的Server,provider不再接收新的請求
while(var1.hasNext()) {
key = (String)var1.next();
ExchangeServer server = (ExchangeServer)this.serverMap.remove(key);
if (server != null) {
try {
if (this.logger.isInfoEnabled()) {
this.logger.info("Close dubbo server: " + server.getLocalAddress());
}
// HeaderExchangeServer中會停止發送心態的任務,關閉channel
server.close(getServerShutdownTimeout());
} catch (Throwable var7) {
this.logger.warn(var7.getMessage(), var7);
}
}
}
var1 = (new ArrayList(this.referenceClientMap.keySet())).iterator();
ExchangeClient client;
//關停所有Client,consumer將不再發送新的請求
while(var1.hasNext()) {
key = (String)var1.next();
client = (ExchangeClient)this.referenceClientMap.remove(key);
if (client != null) {
try {
if (this.logger.isInfoEnabled()) {
this.logger.info("Close dubbo connect: " + client.getLocalAddress() + "-->" + client.getRemoteAddress());
}
// HeaderExchangeClient中會停止發送心態的任務,關閉channel
client.close();
} catch (Throwable var6) {
this.logger.warn(var6.getMessage(), var6);
}
}
}
......
}
總結一下,Dubbo的優雅下線,若是通過JDK的shutdownHook來完成優雅停機的,這時當用戶對該Dubbo進行執行kill pid后,在關閉JVM時會發起一個線程執行ShutdownHook,進而執行 ProtocolConfig.destroyAll()方法,該方法在關掉進行前,主要做了以下一些清理工作:
1、關閉zk客戶端
2、 客戶端斷開ZK連接后,ZK會自動刪除臨時注冊節點
3、 取消重試機制
4 、清除內存中已經注冊的服務
5、 取消所有的服務訂閱
6、關閉provider和consumer,停止新的請求
后面還有一步沒分析到,是若仍有在執行的線程,會等待其執行完成。
最后,在清理完一系列工作后,就可以關閉該進程了。
這就是Dubbo的優雅下線大概的原理。