Spark學習:ShutdownHookManager虛擬機關閉鈎子管理器


  Java程序經常也會遇到進程掛掉的情況,一些狀態沒有正確的保存下來,這時候就需要在JVM關掉的時候執行一些清理現場的代碼。

  JAVA中的ShutdownHook提供了比較好的方案。

  JDK提供了Java.Runtime.addShutdownHook(Thread hook)方法,可以注冊一個JVM關閉的鈎子,這個鈎子可以在一下幾種場景中被調用:

1. 程序正常退出
2. 使用System.exit()
3. 終端使用Ctrl+C觸發的中斷
4. 系統關閉
5. OutOfMemory宕機
6. 使用Kill pid命令干掉進程(注:在使用kill -9 pid時,是不會被調用的)

下面是JDK1.7中關於鈎子的定義:

   public void addShutdownHook(Thread hook) 參數: hook - An initialized but unstarted Thread object 拋出: IllegalArgumentException - If the specified hook has already been registered, or if it can be determined that the hook is already running or has already been run IllegalStateException - If the virtual machine is already in the process of shutting down SecurityException - If a security manager is present and it denies RuntimePermission("shutdownHooks") 從以下版本開始: 1.3 另請參見: removeShutdownHook(java.lang.Thread), halt(int), exit(int)

首先來測試第一種,程序正常退出的情況:

package com.hook; import java.util.concurrent.TimeUnit; public class HookTest { public void start() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println("Execute Hook....."); } })); } public static void main(String[] args) { new HookTest().start(); System.out.println("The Application is doing something"); try { TimeUnit.MILLISECONDS.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }

運行結果:

The Application is doing something Execute Hook.....

如上可以看到,當main線程運行結束之后就會調用關閉鈎子。

下面再來測試第五種情況(順序有點亂,表在意這些細節):

package com.hook; import java.util.concurrent.TimeUnit; public class HookTest2 { public void start() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println("Execute Hook....."); } })); } public static void main(String[] args) { new HookTest().start(); System.out.println("The Application is doing something"); byte[] b = new byte[500*1024*1024]; try { TimeUnit.MILLISECONDS.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }

運行參數設置為:-Xmx20M 這樣可以保證會有OutOfMemoryError的發生。

運行結果:

The Application is doing something Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at com.hook.HookTest2.main(HookTest2.java:22) Execute Hook.....

可以看到程序遇到內存溢出錯誤后調用關閉鈎子,與第一種情況中,程序等待5000ms運行結束之后推出調用關閉鈎子不同。

接下來再來測試第三種情況:

package com.hook; import java.util.concurrent.TimeUnit; public class HookTest3 { public void start() { Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { System.out.println("Execute Hook....."); } })); } public static void main(String[] args) { new HookTest3().start(); Thread thread = new Thread(new Runnable(){ @Override public void run() { while(true) { System.out.println("thread is running...."); try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } }); thread.start(); } }

在命令行中編譯:javac com/hook/HookTest3.java

在命令行中運行:Java com.hook.HookTest3 (之后按下Ctrl+C)

運行結果:

上面是java的,下面來看看spark的ShutdownHookManager

ShutdownHookManager的創建是在SparkContext中,為了在Spark程序掛掉的時候,處理一些清理工作

/** ShutdownHookManager的創建,為了在Spark程序掛掉的時候,處理一些清理工作 */ _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") // 這調用停止方法。關閉SparkContext,我就搞不懂了
 stop() }

來看看整體代碼

package org.apache.spark.util import java.io.File import java.util.PriorityQueue import scala.util.Try import org.apache.hadoop.fs.FileSystem import org.apache.spark.internal.Logging /** * Various utility methods used by Spark. * * Spark使用的各種實用方法。 */
private[spark] object ShutdownHookManager extends Logging { val DEFAULT_SHUTDOWN_PRIORITY = 100   // 默認的ShutdownHookManager優先級

  /** * The shutdown priority of the SparkContext instance. This is lower than the default * priority, so that by default hooks are run before the context is shut down. * * SparkContext實例的shutdown優先級。這比默認的優先級要低,因此在默認情況下,在關閉上下文之前運行默認的hooks。 */ val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50

  /** * The shutdown priority of temp directory must be lower than the SparkContext shutdown * priority. Otherwise cleaning the temp directories while Spark jobs are running can * throw undesirable errors at the time of shutdown. * * temp目錄的關閉優先級必須低於SparkContext關閉的優先級。否則,當Spark作業正在運行時,清理temp目錄將會在關閉時拋出錯誤的錯誤。 */ val TEMP_DIR_SHUTDOWN_PRIORITY = 25

  // 懶加載
  private lazy val shutdownHooks = { val manager = new SparkShutdownHookManager() // 運行所有的hook,並且添加進去
 manager.install() manager } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() // Add a shutdown hook to delete the temp dirs when the JVM exits // 當JVM退出時,添加一個關閉鈎子來刪除temp dirs
  logDebug("Adding shutdown hook") // force eager creation of logger
 addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () => logInfo("Shutdown hook called") // we need to materialize the paths to delete because deleteRecursively removes items from // shutdownDeletePaths as we are traversing through it.
    shutdownDeletePaths.toArray.foreach { dirPath =>
      try { logInfo("Deleting directory " + dirPath) // 遞歸地刪除文件或目錄及其內容。 如果刪除失敗,則拋出異常。
        Utils.deleteRecursively(new File(dirPath)) } catch { case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e) } } } // Register the path to be deleted via shutdown hook // 通過關閉hook注冊要刪除的路徑
 def registerShutdownDeleteDir(file: File) { // 得到文件的絕對路徑
    val absolutePath = file.getAbsolutePath() // 假如到要刪除文件路徑的集合
 shutdownDeletePaths.synchronized { shutdownDeletePaths += absolutePath } } // Remove the path to be deleted via shutdown hook 刪除通過關閉hook刪除的路徑
 def removeShutdownDeleteDir(file: File) { val absolutePath = file.getAbsolutePath() // 刪除文件
 shutdownDeletePaths.synchronized { shutdownDeletePaths.remove(absolutePath) } } // Is the path already registered to be deleted via a shutdown hook ? // 已經注冊的路徑是否通過關閉hook被刪除? // 判斷shutdownDeletePaths中是否包含給定的路徑,如果包含返回true,否則返回false
  def hasShutdownDeleteDir(file: File): Boolean = { val absolutePath = file.getAbsolutePath() shutdownDeletePaths.synchronized { shutdownDeletePaths.contains(absolutePath) } } // Note: if file is child of some registered path, while not equal to it, then return true; // else false. This is to ensure that two shutdown hooks do not try to delete each others // paths - resulting in IOException and incomplete cleanup. // 注意:如果文件是某個已注冊路徑的子元素,而不等於它,則返回true;其他錯誤的。 // 這是為了確保兩個關閉hooks不會試圖刪除彼此的路徑——導致IOException和不完整的清理。
  def hasRootAsShutdownDeleteDir(file: File): Boolean = { val absolutePath = file.getAbsolutePath() val retval = shutdownDeletePaths.synchronized { shutdownDeletePaths.exists { path =>
        !absolutePath.equals(path) && absolutePath.startsWith(path) } } if (retval) { logInfo("path = " + file + ", already present as root for deletion.") } retval } /** * Detect whether this thread might be executing a shutdown hook. Will always return true if * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g. * if System.exit was just called by a concurrent thread). * * 檢測此線程是否正在執行關閉hook。如果當前線程是一個正在運行的關閉hook,但可能會錯誤地返回true(例如,如果系統), * 則將始終返回true。退出是由一個並發線程調用的。 * * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing * an IllegalStateException. * * 當前,這檢測到JVM是否在Runtime#addShutdownHook,拋出了一個IllegalStateException異常。 */ def inShutdown(): Boolean = { try { val hook = new Thread { override def run() {} } // 這一點先加入后移除 是什么意思啊? // scalastyle:off runtimeaddshutdownhook
 Runtime.getRuntime.addShutdownHook(hook) // scalastyle:on runtimeaddshutdownhook
 Runtime.getRuntime.removeShutdownHook(hook) } catch { case ise: IllegalStateException => return true } false } /** * Adds a shutdown hook with default priority. 添加默認優先級的 shutdown hook。 * * @param hook The code to run during shutdown. * @return A handle that can be used to unregister the shutdown hook. */ def addShutdownHook(hook: () => Unit): AnyRef = { addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook) } /** * Adds a shutdown hook with the given priority. Hooks with lower priority values run * first. * * 根據一個指定的優先級添加一個shutdown hook,優先級低的Hooks優先被運行 * * @param hook The code to run during shutdown. * @return A handle that can be used to unregister the shutdown hook. */ def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = { shutdownHooks.add(priority, hook) } /** * Remove a previously installed shutdown hook. 刪除先前安裝的shutdown hook * * @param ref A handle returned by `addShutdownHook`. * @return Whether the hook was removed. */ def removeShutdownHook(ref: AnyRef): Boolean = { shutdownHooks.remove(ref) } } private [util] class SparkShutdownHookManager { // 權限隊列
  private val hooks = new PriorityQueue[SparkShutdownHook]() @volatile private var shuttingDown = false

  /** * Install a hook to run at shutdown and run all registered hooks in order. * 安裝一個hook來運行關閉,並運行所有已注冊的hooks。 */ def install(): Unit = { val hookTask = new Runnable() { override def run(): Unit = runAll() } org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30) } def runAll(): Unit = { shuttingDown = true
    var nextHook: SparkShutdownHook = null
    while ({ nextHook = hooks.synchronized { hooks.poll() }; nextHook != null }) { Try(Utils.logUncaughtExceptions(nextHook.run())) } } def add(priority: Int, hook: () => Unit): AnyRef = { hooks.synchronized { if (shuttingDown) { throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.") } val hookRef = new SparkShutdownHook(priority, hook) hooks.add(hookRef) hookRef } } def remove(ref: AnyRef): Boolean = { hooks.synchronized { hooks.remove(ref) } } } private class SparkShutdownHook(private val priority: Int, hook: () => Unit) extends Comparable[SparkShutdownHook] { override def compareTo(other: SparkShutdownHook): Int = { other.priority - priority } def run(): Unit = hook() }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM