Akka源碼分析-ActorSystem


  由於本人對Akka比較感興趣,也用Akka開發了一些系統,但對Akka的源碼還沒有具體分析過,希望研究源碼的同時寫一點博客跟大家分享。有不當之處還請指正。我准備采取Debug的方式來研究Akka的運行過程,從入口開始,直至分析Akka是如何運轉的。這樣雖然會有點亂,但比較直接,大家湊合着看吧。

  使用Akka首先要創建的一個對象就是ActorSystem,那么我們就先分析這個類及其相關的技術細節。

val system = ActorSystem("WhilePattern1",ConfigFactory.load())

   第一步就是創建ActorSystem,很明顯,這是調用了ActorSystem伴生對象的apply方法。ActorSystem的伴生對象並不復雜,有很多的apply和create方法來創建ActorSystem的實例。apply/create分別供scala和java開發使用。其他字段都是一些環境變量,例如version、envHome、systemHome。還有一個內部類Settings,主要是用來給ActorSystem提供參數配置。

  下面我們來看ActorSystem類,這是一個抽象類,它繼承了ActorRefFactory特質,下面是源碼中對該特質的描述。很明顯,這個特質是用來創建Actor實例的。我們常用的actorFor和actorSelection是該特質提供的比較重要的方法,當然還有與創建actor有關的其他函數和字段。ActorSystem是一個抽象類,除了繼承ActorRefFactory特質的函數和字段之外,定義了一些其他字段和方法,但也都沒有具體的實現。

/**
* Interface implemented by ActorSystem and ActorContext, the only two places
* from which you can get fresh actors.
*/

  通過跟蹤AcotSystem的apply我們發現最終調用了以下代碼,主要涉及了兩個對象:ActorSystemSetup、ActorSystemImpl。其中源碼中對ActorSystemSetup的描述是“A set of setup settings for programmatic configuration of the actor system.”很明顯主要是提供一些可編程的配置,我們不再深入這個類。ActorSystemImpl則是我們需要關心的類,因為ActorSystem.apply最終創建了這個類的實例。而ActorSystemImpl由繼承了ExtendedActorSystem,ExtendedActorSystem抽象類提供了有限的幾個函數,暴露了ActorRefFactory中本來是protected的函數,也並沒有具體的實現,我們也暫時忽略。

/**
   * Scala API: Creates a new actor system with the specified name and settings
   * The core actor system settings are defined in [[BootstrapSetup]]
   */
  def apply(name: String, setup: ActorSystemSetup): ActorSystem = {
    val bootstrapSettings = setup.get[BootstrapSetup]
    val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader())
    val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl))
    val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext)

    new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start()
  }

   由於ActorSystemImpl代碼比較多,如果從頭到尾讀一遍代碼效率比較低。而且從上面代碼可以看出,apply在創建ActorSystemImpl實例之后,調用了start函數,那么我們就從start切入,看看做了哪些操作。

private lazy val _start: this.type = try {
    registerOnTermination(stopScheduler())
    // the provider is expected to start default loggers, LocalActorRefProvider does this
    provider.init(this)
    // at this point it should be initialized "enough" for most extensions that we might want to guard against otherwise
    _initialized = true

    if (settings.LogDeadLetters > 0)
      logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener"))
    eventStream.startUnsubscriber()
    loadExtensions()
    if (LogConfigOnStart) logConfiguration()
    this
  } catch {
    case NonFatal(e) ⇒
      try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) }
      throw e
  }

   其實start的代碼還是比較清晰的,首先用registerOnTermination注冊了stopScheduler(),也就是給ActorSystem的退出注冊了一個回調函數stopScheduler(),這一點也不再具體分析。而provider.init(this)這段代碼比較重要,從provider的類型來看,它是一個ActorRefProvider,前面我們已經分析過,這是一個用來創建actor的工廠類。provider初始化完成意味着就可以創建actor了,源碼注釋中也明確的說明了這一點。

val provider: ActorRefProvider = try {
    val arguments = Vector(
      classOf[String] → name,
      classOf[Settings] → settings,
      classOf[EventStream] → eventStream,
      classOf[DynamicAccess] → dynamicAccess)

    dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get
  } catch {
    case NonFatal(e) ⇒
      Try(stopScheduler())
      throw e
  }

   上面是provider的創建過程,最重要的一段代碼是dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get,它使用DynamicAccess創建了ActorRefProvider對象的實例。跟蹤dynamicAccess創建我們發現這是一個ReflectiveDynamicAccess實例,其實這個類也比較簡單,就是從ClassLoader中根據ProviderClass字段加載對應的類並創建對應的實例。ProviderClass定義如下,這是配置文件中經常看到的配置。目前的provider一共有三種:LocalActorRefProvider、akka.remote.RemoteActorRefProvider、akka.cluster.ClusterActorRefProvider,當然我們也可以自定義。

final val ProviderClass: String =
      setup.get[BootstrapSetup]
        .flatMap(_.actorRefProvider).map(_.identifier)
        .getOrElse(getString("akka.actor.provider")) match {
          case "local"   ⇒ classOf[LocalActorRefProvider].getName
          // these two cannot be referenced by class as they may not be on the classpath
          case "remote"  ⇒ "akka.remote.RemoteActorRefProvider"
          case "cluster" ⇒ "akka.cluster.ClusterActorRefProvider"
          case fqcn      ⇒ fqcn
        }

  自此provider創建結束,簡單來說就是根據配置,通過Class.forName加載了對應的ActorRefProvider實現類,並把當前的參數傳給它,調用對應的構造函數,完成實例的創建。provider創建完成后調用init完成初始化,就可以創建actor了。

  start函數還創建了一個DeadLetterListener類型的actor,這也是我們經常會遇到的。如果給一個不存在的目標actor發消息,或者發送消息超時,都會把消息轉發給這個DeadLetter。這就是一個普通的actor,主要用來接收沒有發送成功的消息,並把消息打印出來。后面還調用了eventStream.startUnsubscriber(),由於eventStream也不是我們關注的重點,先忽略。loadExtensions()功能也比較單一,就是根據配置加載ActorSystem的擴展類,並進行注冊,關於Extensions也不再深入分析。

private def loadExtensions() {
    /**
     * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility)
     */
    def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = {
      immutableSeq(settings.config.getStringList(key)) foreach { fqcn ⇒
        dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match {
          case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup())
          case Success(p: ExtensionId[_])      ⇒ registerExtension(p)
          case Success(other) ⇒
            if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn)
            else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'")
          case Failure(problem) ⇒
            if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn)
            else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem)
        }
      }
    }

    // eager initialization of CoordinatedShutdown
    CoordinatedShutdown(this)

    loadExtensions("akka.library-extensions", throwOnLoadFail = true)
    loadExtensions("akka.extensions", throwOnLoadFail = false)
  }

   至此,我們就對ActorSystem的創建和啟動分析完畢,但還有一些細節需要說明,在start之前還是有一些其他字段的初始化。由於這些字段同樣重要,且初始化的順序沒有太大關聯,我就按照代碼結構從上至下依次分析幾個重要的字段。

final val threadFactory: MonitorableThreadFactory =
    MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler)

   threadFactory這是一個線程工廠類,默認是MonitorableThreadFactory,我們只需要記住這是一個線程工廠類,默認創建ForkJoinWorkerThread的線程就好了。

val scheduler: Scheduler = createScheduler()

   scheduler是一個調度器,主要用來定時發送一些消息,這個我們也會經常遇到,但不是此次分析的重點,略過就好。

val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters)

   mailboxes是一個非常重要的字段,它是Mailboxes一個實例,用來創建對應的Mailbox,Mailbox用來接收消息,並通過dispatcher分發給對應的actor。

val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
    threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))

  val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher

   dispatchers是Dispatchers的一個實例,它用來創建、查詢對應的MessageDispatcher。它有一個默認的全局dispatcher,從代碼來看,它從配置中讀取akka.actor.default-dispatcher,並創建MessageDispatcher實例。MessageDispatcher也是一個非常重要的類,我們后面再具體分析。

/**
   * The one and only default dispatcher.
   */
  def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)

 

object Dispatchers {
  /**
   * The id of the default dispatcher, also the full key of the
   * configuration of the default dispatcher.
   */
  final val DefaultDispatcherId = "akka.actor.default-dispatcher"
}

   到這里我們就算分析完了ActorSystem的創建過程及其技術細節,當然ActorSystem創建只是第一步,后面需要創建actor,actor如何收到dispatcher的消息,還是需要進一步研究的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM