Spark HistoryServer日志解析&清理異常


一、背景介紹

用戶在使用 Spark 提交任務時,經常會出現任務完成后在 HistoryServer(Spark 1.6 和 Spark 2.1 HistoryServer 合並,統一由 Spark 2.1 HistoryServer 管控,因此本文的代碼分析都是基於 Spark 2.1 版本的代碼展開的)中找不到 appid 信息,尤其是對於失敗的任務,用戶無法查看日志分析任務失敗的原因。為此,特地對 Spark 2.1 HistoryServer 進行了研究,發現根本問題出在內部的兩個核心數據結構的使用存在異常導致的。

二、eventLog 日志文件及相關參數

2.1 eventLog 日志文件介紹

eventLog 是 Spark 任務在運行過程中,調用 EventLoggingListener#logEvent() 方法來輸出 eventLog 內容,Spark 中定義各種類型的事件,一旦某個事件被觸發,就會構造一個類型的 Event,然后獲取相應的運行信息並設置進去,最終將該 event 對象序列化成 json 字符串,追加到 eventLog 日志文件中。

 

Spark 中 eventLog 默認是不開啟的,由參數 'spark.history.fs.cleaner.enabled' 來控制,開啟這個配置后,任務運行的信息就會寫到 eventLog 日志文件中,日志文件具體保存在參數 'spark.eventLog.dir' 配置的目錄下。 

2.2 相關配置參數

一般這些配置放在 /etc/spark2/conf/spark-defaults.conf 中。

注:但在實際自定義修改 Spark HistoryServer 配置時,spark-defaults.conf 中並沒有寫入(具體原因待看)。但可以通過查看 HistoryServer 進程使用的 spark-history-server.conf 配置查看,在 Spark HistoryServer 所在機器上,通過 'ps -ef |grep HistoryServer' 查看具體配置 '--properties-file /run/cloudera-scm-agent/process/136253-spark2_on_yarn-SPARK2_YARN_HISTORY_SERVER/spark2-conf/spark-history-server.conf',這里會使用自定義更新的 HistoryServer 參數。

參數 默認 含義
spark.history.retainedApplications 50 在內存中保存 Application 歷史記錄的個數,如果超過這個值,舊的應用程序信息將被刪除,當再次訪問已被刪除的應用信息時需要重新構建頁面。
spark.history.fs.update.interval  10s 指定刷新日志的時間,更短的時間可以更快檢測到新的任務以及任務執行情況,但過快會加重服務器負載。
spark.history.ui.maxApplication Int.MaxValue 顯示在總歷史頁面中的程序的數量。如果總歷史頁面未顯示,程序 UI 仍可通過訪問其 URL 來顯示。
spark.history.ui.port 18089(Spark2.1) 指定history-server的網頁UI端口號
spark.history.fs.cleaner.enabled  false 指定history-server的日志是否定時清除,true為定時清除,false為不清除。這個值一定設置成true啊,不然日志文件會越來越大。
spark.history.fs.cleaner.interval 1d 定history-server的日志檢查間隔,默認每一天會檢查一下日志文件
spark.history.fs.cleaner.maxAge 7d 指定history-server日志生命周期,當檢查到某個日志文件的生命周期為7d時,則會刪除該日志文件
spark.eventLog.compress false 設置history-server產生的日志文件是否使用壓縮,true為使用,false為不使用。這個參數務可以成壓縮哦,不然日志文件歲時間積累會過大
spark.history.retainedApplications 50 在內存中保存Application歷史記錄的個數,如果超過這個值,舊的應用程序信息將被刪除,當再次訪問已被刪除的應用信息時需要重新構建頁面。
spark.history.fs.numReplayThreads ceil(cpu核數/4) 解析 eventLog 的線程數量

 

三、eventLog 日志解析及日志清理原理

3.1 兩個定時任務

FsHistoryProvider 類在初始化時,會調用 startPolling() 方法,來啟動兩個定時任務,即日志文件解析任務和日志文件清理任務,兩個任務均是由獨立線程執行。當然,日志文件清理任務是否開啟是由參數 spark.history.fs.cleaner.enabled 控制(默認為 false,線上環境為 true,即開啟了日志文件清理任務)。

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
  private[history] def initialize(): Thread = {
    if (!isFsInSafeMode()) {
      // 兩個定時任務啟動入口
      startPolling()
      null
    } else {
      startSafeModeCheckThread(None)
    }
  }

  private def startPolling(): Unit = {
    // Validate the log directory.
    val path = new Path(logDir)

    // Disable the background thread during tests.
    if (!conf.contains("spark.testing")) {
      // A task that periodically checks for event log updates on disk.
      logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds")
      // 日志文件解析線程
      pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS)

      if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) {
        // A task that periodically cleans event logs on disk.
        // 日志文件清理線程
        pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS)
      }
    } else {
      logDebug("Background update thread disabled for testing")
    }
  }

 

3.2 eventLog 日志文件解析原理

3.2.1 關鍵數據結構

在介紹日志解析前,先來看看兩個關鍵的數據結構。fileToAppInfo 和 applications。

fileToAppInfo 結構用於保存日志目錄 /user/spark/spark2ApplicationHistory/ 下每一條 eventLog 日志文件。每次 HDFS 目錄下新生成的文件都會更新到該數據結構。

val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()

applications 結構用於保存每個 App 對應的所有 AppAttempt 運行或完成的日志信息。

@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap()

舉個例子:HDFS 日志目錄下有同一個 App 的兩個 eventLog 文件。

/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_1
/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_2

此時,fileToAppInfo 保存的數據格式為:(兩條記錄)

<'/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_1', AttemptInfo>
<'/user/spark/spark2ApplicationHistory/application_1599034722009_10003548_2', AttemptInfo>

而 applications 保存的數據格式為:(一條記錄)

<'application_1599034722009_10003548', HistoryInfo<Attemp1, Attempt2>>

 

3.2.2 日志文件解析流程

eventLog 日志文件一次完整解析的流程大概分為以下幾個步驟:

  1. 掃描 /user/spark/spark2ApplicationHistory/ 目錄下日志文件是否有更新。(更新有兩個情況:一種是已有的日志文件大小增加,一種是生成了新的日志文件)
  2. 若有更新,則從線程池中啟動一個線程對日志進行初步解析。(解析環節是關鍵,UI 界面無法查看是因為解析出現異常)
  3. 將解析后的日志同時更新到 fileToAppInfo 和 applications 結構中,保證數據維持最新狀態。
  4. 等待解析線程執行完成,更新 HDFS 目錄的掃描時間。(線程池啟動的多個線程會阻塞執行,直到所有解析線程完成才更新掃描時間)

 

源碼分析如下:

這段代碼主要是前兩個步驟的介紹,定期掃描日志目錄(定期時間由參數 spark.history.fs.update.interval  控制,線上環境為 30s),將文件大小有增加和新生成的文件保存在 logInfos 對象中。然后將新文件放到 

replayExecutor 線程池中執行,該線程池大小默認為 機器cpu核數/4,由參數 spark.history.fs.numReplayThreads 控制,線上環境為 50。

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
  private[history] def checkForLogs(): Unit = {
    try {
      val newLastScanTime = getNewLastScanTime()
      logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
      val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
        .getOrElse(Seq[FileStatus]())
      // logInfos 保存所有新的 eventLog 文件(包括大小增加的和新生成的文件)
      // filter:過濾出新的日志文件
      // flatMap:過濾空的entry對象
      // sortWith:根據日志文件更新時間降序排序
      val logInfos: Seq[FileStatus] = statusList
        .filter { entry =>
          try {
            val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
            !entry.isDirectory() &&
              !entry.getPath().getName().startsWith(".") &&
              prevFileSize < entry.getLen()
          } catch {
            case e: AccessControlException =>
              logDebug(s"No permission to read $entry, ignoring.")
              false
          }
        }
        .flatMap { entry => Some(entry) }
        .sortWith { case (entry1, entry2) =>
          entry1.getModificationTime() >= entry2.getModificationTime()
      }

      if (logInfos.nonEmpty) {
        logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
      }

      var tasks = mutable.ListBuffer[Future[_]]()

      try {
        for (file <- logInfos) {
          // 對掃描出來的文件進行解析
          tasks += replayExecutor.submit(new Runnable {
            override def run(): Unit = mergeApplicationListing(file)
          })
        }
      } catch {
        case e: Exception =>
          logError(s"Exception while submitting event log for replay", e)
      }
       ... //省略
 }

 

第三步流程主要在 mergeApplicationListing() 方法中處理。先來看看 fileToAppInfo 結構如何更新,這里的關鍵是 replay() 方法,這里會對 eventLog 進行初步解析,然后將解析后的內容更新到 fileToAppInfo 中。

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
  private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
      // 函數監聽兩個事件:作業開始和作業結束
    val newAttempts = try {
      val eventsFilter: ReplayEventsFilter = { eventString =>
        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
          eventString.startsWith(APPL_END_EVENT_PREFIX)
      }

      val logPath = fileStatus.getPath()

      val appCompleted = isApplicationCompleted(fileStatus)

      // UI 查看的關鍵,對 eventLog 日志文件進行解析回放
      val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)

      // 根據解析的結果構建 FsApplicationAttemptInfo 對象
      if (appListener.appId.isDefined) {
        val attemptInfo = new FsApplicationAttemptInfo(
          logPath.getName(),
          appListener.appName.getOrElse(NOT_STARTED),
          appListener.appId.getOrElse(logPath.getName()),
          appListener.appAttemptId,
          appListener.startTime.getOrElse(-1L),
          appListener.endTime.getOrElse(-1L),
          fileStatus.getModificationTime(),
          appListener.sparkUser.getOrElse(NOT_STARTED),
          appCompleted,
          fileStatus.getLen()
        )
        // 更新 fileToAppInfo 結構
        fileToAppInfo(logPath) = attemptInfo
        logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
        Some(attemptInfo)
      } else {
        logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
          "The application may have not started.")
        None
      }

    }
    ... // 省略
}

 

那 applications 結構又是如何更新的呢?主要是先找出新的 App 對象,將舊的 App 列表和新的 App 列表進行合並,生成新的對象,並更新到 applications 中。

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
  private def mergeApplicationListing(fileStatus: FileStatus): Unit = {

      ... // 省略

      val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()

      // 多線程同時更新 applications 對象,這里用 synchronized 實現同步訪問該對象
    applications.synchronized {
      // newAttempts 對象是剛才解析 eventLog 構造的 FsApplicationAttemptInfo 對象列表
      // 這一步的目的就是要過濾出剛才新生成的App對象,並更新已存在但大小有增加的App對象
      newAttempts.foreach { attempt =>
        val appInfo = newAppMap.get(attempt.appId)
          .orElse(applications.get(attempt.appId))
          .map { app =>
            val attempts =
              app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt)
            new FsApplicationHistoryInfo(attempt.appId, attempt.name,
              attempts.sortWith(compareAttemptInfo))
          }
          .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
        newAppMap(attempt.appId) = appInfo
      }

      val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
      val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
      def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
        if (!mergedApps.contains(info.id)) {
          mergedApps += (info.id -> info)
        }
      }

      // mergedApps 對象用於保存已有App對象和新生成的App對象進行合並后結果,產生最新的 applications 對象
      val newIterator = newApps.iterator.buffered
      val oldIterator = applications.values.iterator.buffered
      while (newIterator.hasNext && oldIterator.hasNext) {
        if (newAppMap.contains(oldIterator.head.id)) {
          oldIterator.next()
        } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
          addIfAbsent(newIterator.next())
        } else {
          addIfAbsent(oldIterator.next())
        }
      }
      newIterator.foreach(addIfAbsent)
      oldIterator.foreach(addIfAbsent)

      applications = mergedApps
    }
}

 

3.3 eventLog 日志清理原理

了解了前面 fileToAppInfo 和 applications 數據結構,日志清理的原理相對而言就簡單很多,主要是對 applications 對象進行處理。

日志清理大致流程如下:

  1. 獲取 eventLog 日志保留的生命周期事件,由參數 spark.history.fs.cleaner.maxAge 控制,默認 7d,線上 5d。
  2. 掃描 applications 對象,將待清理的日志對象保存在 attemptsToClean 對象,保留的對象保存在 appsToRetain。(一個文件是否可以刪除由函數 shouldClean() 控制)
  3. 更新 applications 對象。
  4. 調用 HDFS api 執行真正的刪除操作。

 

源碼分析:

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
   private[history] def cleanLogs(): Unit = {
    try {
      // 1、獲取 eventLog 保存的生命周期時間
      val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000

      val now = clock.getTimeMillis()
      val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

      // 判斷函數:超過生命周期並完成(后綴不是 .inprogress 結束)的任務可以正常清理
      def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
        now - attempt.lastUpdated > maxAge && attempt.completed
      }

      // 2、掃描 applications 對象,將超過生命周期待清理的 eventLog 保存在 attemptsToClean 對象中,未超過的保存在 appsToRetain 對象中
      applications.values.foreach { app =>
        val (toClean, toRetain) = app.attempts.partition(shouldClean)
        attemptsToClean ++= toClean

        if (toClean.isEmpty) {
          appsToRetain += (app.id -> app)
        } else if (toRetain.nonEmpty) {
          appsToRetain += (app.id ->
            new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
        }
      }

      // 3、更新 applications 對象
      applications = appsToRetain

      val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
      // 4、調用 HDFS api 執行真正的清理操作
      attemptsToClean.foreach { attempt =>
        try {
          fs.delete(new Path(logDir, attempt.logPath), true)
        } catch {
          case e: AccessControlException =>
            logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
          case t: IOException =>
            logError(s"IOException in cleaning ${attempt.logPath}", t)
            leftToClean += attempt
        }
      }
      // 沒有正常清理的對象重新更新到 attemptsToClean 中
      attemptsToClean = leftToClean
    } catch {
      case t: Exception => logError("Exception in cleaning logs", t)
    }
  }

 

四、原因分析&解決方案

上面日志解析和日志清理的邏輯都依賴 fileToAppInfo 和 applications 對象,Spark HistoryServer UI 界面展示的內容也是依賴這兩個對象,所以,UI 無法加載任務信息也是由於這里的數據結構出現了多線程訪問的線程安全問題。

4.1 HashMap 線程同步問題&解決方案

4.1.1 原因分析

fileToAppInfo 對象是 FsHistoryProvider 類的一個對象,數據結構采用 HashMap,是線程不安全的對象,但在多線程調用 mergeApplicationListing() 方法操作 fileToAppInfo 對象並不是同步訪問,導致每次載入所有 eventLog 日志文件,會出現不能保證所有文件都能被正常加載。那為什么會出現這種情況呢?其實就是多線程訪問同一個對象時經常出現的一個問題。

 

下圖是多線程訪問同一對象帶來的線程安全問題的一個簡單例子:

  • 當線程 1 執行 x++ 后將結果更新到內存中,內存中此時 x=1,沒有問題。
  • 但由於線程 1 在讀內存數據時線程 2 同時也讀取內存中 x 的值,當線程 2 執行 x++ 后,將結果更新到內存中,此時內存中 x 的值還是 1。
  • 而預期的結果是 x = 2,這種情況便是多線程訪問同一對象的線程安全問題。

多線程訪問同一對象帶來的線程安全問題

4.1.2 解決方案

HashMap 對象帶來的線程安全問題,解決方法比較簡單,用 ConcurrentHashMap 替代即可。參考 patch:SPARK-21223

var fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]()

 

4.2 Synchronized 鎖同步問題

4.2.1 原因分析

在 Spark HistoryServer 中,applications 更新的玩法是這樣的:

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
    = new mutable.LinkedHashMap()

applications.synchronized {
  ... // 省略

  val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

  ... // 省略更新 mergedApps 的值

  applications = mergedApps
}

 

咋一看,這樣使用 synchronized 鎖住 applications 對象似乎沒什么問題。但其實是有問題的,我們先來看一個例子。

class Synchronized {
    private List aList = new ArrayList();
    public void anyObject1() {
        // 和 HistoryServer 玩法一致,鎖住 aList 對象,代碼塊中用 aList2 更新 aList 對象值
        synchronized (aList) {
            List aList2 = new ArrayList();
            for (int i = 0; i < 10; i++) {
                System.out.println("anyObject"  + "-" + Thread.currentThread());
                aList2.add(1);
            }
            aList = aList2;
            System.out.println("aList =" + aList.size());
        }
    }
}

public class SynchronizedDemo01 {

    public static void main(String[] args) {
        SynchronizedDemo01 syn = new SynchronizedDemo01();
        syn.anyObjTest();
    }

    public void anyObjTest() {
        final Synchronized syn = new Synchronized();
        // 啟動5個線程去操作aList對象,每次打印10條記錄
        for (int i = 0; i < 5; i++) {
            new Thread() {
                @Override
                public void run() {
                    syn.anyObject1();
                }
            }.start();
        }
    }
}


運行結果:(隨機多運行幾次)
anyObject-Thread[Thread-3,5,main]
anyObject-Thread[Thread-2,5,main]
anyObject-Thread[Thread-3,5,main]
anyObject-Thread[Thread-2,5,main]
anyObject-Thread[Thread-3,5,main]
aList =10
anyObject-Thread[Thread-3,5,main]
anyObject-Thread[Thread-3,5,main]
anyObject-Thread[Thread-3,5,main]
anyObject-Thread[Thread-3,5,main]
anyObject-Thread[Thread-3,5,main]
aList =10
anyObject-Thread[Thread-4,5,main]
anyObject-Thread[Thread-4,5,main]
anyObject-Thread[Thread-4,5,main]

 

通過這個例子,可以看出 Thread-3 在 Thread-2 線程中打印了信息,也就是說通過這種方式鎖住 synchronized(aList 對象)(非 this 對象)是有問題的,線程並沒有真正的鎖住 aList 對象。那為什么會出現這種情況呢?我們接着看。

https://blog.csdn.net/weixin_42762133/article/details/103241439 這篇文章給出了 Synchronized 鎖幾種使用場景。

修飾目標
方法 實例方法 當前對象實例(即方法調用者)
靜態方法 類對象
代碼塊

this

當前對象實例(即方法調用者)
class 對象 類對象
任意 Object 對象 當前對象實例(即方法調用者)

這里重點介紹下 synchronized 修飾目標為 this 和任意 Object 對象這兩種情況。要理解他們之間的區別,就需要搞清楚 synchronized 到底鎖住的是什么?在 https://juejin.im/post/6844903872431915022  這篇文章中,介紹了 synchronized 鎖住的內容有兩種,一種是類,另一種是對象實例。這里的關鍵就在於第二種情況,當使用 synchronized 鎖住的是對象實例時,HistoryServer 和上面 aList 的例子那就有問題了,怎么說呢?我們來看看下面這張圖。

Synchronized 鎖住的對象示意圖

通過這張圖就一目了然,synchronized(aList) 代碼塊鎖住的是 aList 對象指向的堆中的對象實例,當在代碼塊中通過 aList = aList2 賦值后,aList 便指向的新的對象實例,導致原來的對象實例變成了無主狀態,synchronized(aList) 代碼塊的鎖其實也就失去了意義。所以才會出現線程安全的問題。

 

上面那段測試代碼如果采用 synchronized(this) 則不會出現多線程錯亂打印的情況,為什么呢?通過上表中我們知道 synchronized(this) 的鎖是當前對象實例,即方法的調用者,在測試代碼中也就是 "SynchronizedDemo01 syn = new SynchronizedDemo01(); " 這里創建 syn 對象實例,在內存中的表現為:

Synchronized 對象堆內表現示意圖

使用 synchronized(this) 之所以不會出問題,是由於不管 aList 指向哪個對象實例,this 對象(即 syn 對象)指向的對象實例始終沒有變,所以多線程訪問 aList 不會出現線程安全問題。

 

至此,HistoryServer 中的那段代碼塊是有問題的,並不能實現 applications 對象的多線程安全訪問。

 

4.2.2 解決方案

分析清楚了具體原因后,解決方法就比較容易了,將那段代碼的 synchronized 鎖住的對象從 applications 對象改成 this 對象即可。

//位置:core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
    = new mutable.LinkedHashMap()

this.synchronized {
  ... // 省略

  val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

  ... // 省略更新 mergedApps 的值

  applications = mergedApps
}

 

4.3.3 一點小擴展

上面解決了 synchronized 鎖住 applications 非 this 對象的問題,那 Spark 中為什么不直接用 this 對象呢?這里還是有一點小竅門的。那就是 synchronzied(this) 比 Synchronized(非this) 的效率要低一些,為什么這么說呢?來看兩個例子。

 

例子1:兩個線程使用同一個對象分別訪問 synchronized 方法和 synchronized(str) 代碼塊。

結論:兩個線程是異步執行的,Thread1 鎖住的 'str' Object 對象實例,而 Thread2 鎖住的是 service 對象實例,互不影響。

public class SynchronizedDemo02 {
  static Service service = new Service();

  public static void main(String[] args) {
   new Thread () {
    @Override
    public void run() {
     service.method1();
    }
   }.start();
   new Thread () {
    @Override
    public void run() {
     service.method2();
    }
   }.start();
  }
}

class Service {
 String str = "test";

 public void method1() {
  synchronized (str) {
   System.out.println("method1 begin");

   try {
    Thread.sleep(1000);
   }catch (Exception e) {
    e.printStackTrace();
   }
   System.out.println("method1 end");
  }
 }

 public synchronized void method2() {
  System.out.println("method2 begin");
  try {
   Thread.sleep(1000);
  }catch (Exception e) {
   e.printStackTrace();
  }
  System.out.println("method2 end");
 }
}


結果輸出:
method1 begin
method2 begin
method1 end
method2 end

 

例子2:兩個線程使用同一個對象分別訪問 synchronized 方法和 synchronized(this) 代碼塊。

結論:兩個線程同步執行,鎖住的是同一個 this 對象(即 service 對象),必須一個線程執行完才能執行另一個線程。

public class SynchronizedDemo02 {
  static Service service = new Service();

  public static  void main(String[] args) {
   new Thread () {
    @Override
    public void run() {
     service.method1();
    }
   }.start();
   new Thread () {
    @Override
    public void run() {
     service.method2();
    }
   }.start();
  }
}

class Service {
 String str = "test";

 public void method1() {
  synchronized (this) {
   System.out.println("method1 begin");

   try {
    Thread.sleep(1000);
   }catch (Exception e) {
    e.printStackTrace();
   }
   System.out.println("method1 end");
  }
 }

 public synchronized  void method2() {
  System.out.println("method2 begin");
  try {
   Thread.sleep(1000);
  }catch (Exception e) {
   e.printStackTrace();
  }
  System.out.println("method2 end");
 }
}


結果輸出:
method1 begin
method1 end
method2 begin
method2 end

所以,采用 synchronized(非 this 對象) 會減少當前對象鎖與其他 synchorinzed(this) 代碼塊或 synchronized 方法之間的鎖競爭,與其他 synchronized 代碼異步執行,互不影響,會提高代碼的執行效率。

 

【參考資料】


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM