由於本人對Akka比較感興趣,也用Akka開發了一些系統,但對Akka的源碼還沒有具體分析過,希望研究源碼的同時寫一點博客跟大家分享。有不當之處還請指正。我准備采取Debug的方式來研究Akka的運行過程,從入口開始,直至分析Akka是如何運轉的。這樣雖然會有點亂,但比較直接,大家湊合着看吧。
使用Akka首先要創建的一個對象就是ActorSystem,那么我們就先分析這個類及其相關的技術細節。
val system = ActorSystem("WhilePattern1",ConfigFactory.load())
第一步就是創建ActorSystem,很明顯,這是調用了ActorSystem伴生對象的apply方法。ActorSystem的伴生對象並不復雜,有很多的apply和create方法來創建ActorSystem的實例。apply/create分別供scala和java開發使用。其他字段都是一些環境變量,例如version、envHome、systemHome。還有一個內部類Settings,主要是用來給ActorSystem提供參數配置。
下面我們來看ActorSystem類,這是一個抽象類,它繼承了ActorRefFactory特質,下面是源碼中對該特質的描述。很明顯,這個特質是用來創建Actor實例的。我們常用的actorFor和actorSelection是該特質提供的比較重要的方法,當然還有與創建actor有關的其他函數和字段。ActorSystem是一個抽象類,除了繼承ActorRefFactory特質的函數和字段之外,定義了一些其他字段和方法,但也都沒有具體的實現。
/**
* Interface implemented by ActorSystem and ActorContext, the only two places
* from which you can get fresh actors.
*/
通過跟蹤AcotSystem的apply我們發現最終調用了以下代碼,主要涉及了兩個對象:ActorSystemSetup、ActorSystemImpl。其中源碼中對ActorSystemSetup的描述是“A set of setup settings for programmatic configuration of the actor system.”很明顯主要是提供一些可編程的配置,我們不再深入這個類。ActorSystemImpl則是我們需要關心的類,因為ActorSystem.apply最終創建了這個類的實例。而ActorSystemImpl由繼承了ExtendedActorSystem,ExtendedActorSystem抽象類提供了有限的幾個函數,暴露了ActorRefFactory中本來是protected的函數,也並沒有具體的實現,我們也暫時忽略。
/** * Scala API: Creates a new actor system with the specified name and settings * The core actor system settings are defined in [[BootstrapSetup]] */ def apply(name: String, setup: ActorSystemSetup): ActorSystem = { val bootstrapSettings = setup.get[BootstrapSetup] val cl = bootstrapSettings.flatMap(_.classLoader).getOrElse(findClassLoader()) val appConfig = bootstrapSettings.flatMap(_.config).getOrElse(ConfigFactory.load(cl)) val defaultEC = bootstrapSettings.flatMap(_.defaultExecutionContext) new ActorSystemImpl(name, appConfig, cl, defaultEC, None, setup).start() }
由於ActorSystemImpl代碼比較多,如果從頭到尾讀一遍代碼效率比較低。而且從上面代碼可以看出,apply在創建ActorSystemImpl實例之后,調用了start函數,那么我們就從start切入,看看做了哪些操作。
private lazy val _start: this.type = try { registerOnTermination(stopScheduler()) // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) // at this point it should be initialized "enough" for most extensions that we might want to guard against otherwise _initialized = true if (settings.LogDeadLetters > 0) logDeadLetterListener = Some(systemActorOf(Props[DeadLetterListener], "deadLetterListener")) eventStream.startUnsubscriber() loadExtensions() if (LogConfigOnStart) logConfiguration() this } catch { case NonFatal(e) ⇒ try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) } throw e }
其實start的代碼還是比較清晰的,首先用registerOnTermination注冊了stopScheduler(),也就是給ActorSystem的退出注冊了一個回調函數stopScheduler(),這一點也不再具體分析。而provider.init(this)這段代碼比較重要,從provider的類型來看,它是一個ActorRefProvider,前面我們已經分析過,這是一個用來創建actor的工廠類。provider初始化完成意味着就可以創建actor了,源碼注釋中也明確的說明了這一點。
val provider: ActorRefProvider = try { val arguments = Vector( classOf[String] → name, classOf[Settings] → settings, classOf[EventStream] → eventStream, classOf[DynamicAccess] → dynamicAccess) dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get } catch { case NonFatal(e) ⇒ Try(stopScheduler()) throw e }
上面是provider的創建過程,最重要的一段代碼是dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get,它使用DynamicAccess創建了ActorRefProvider對象的實例。跟蹤dynamicAccess創建我們發現這是一個ReflectiveDynamicAccess實例,其實這個類也比較簡單,就是從ClassLoader中根據ProviderClass字段加載對應的類並創建對應的實例。ProviderClass定義如下,這是配置文件中經常看到的配置。目前的provider一共有三種:LocalActorRefProvider、akka.remote.RemoteActorRefProvider、akka.cluster.ClusterActorRefProvider,當然我們也可以自定義。
final val ProviderClass: String = setup.get[BootstrapSetup] .flatMap(_.actorRefProvider).map(_.identifier) .getOrElse(getString("akka.actor.provider")) match { case "local" ⇒ classOf[LocalActorRefProvider].getName // these two cannot be referenced by class as they may not be on the classpath case "remote" ⇒ "akka.remote.RemoteActorRefProvider" case "cluster" ⇒ "akka.cluster.ClusterActorRefProvider" case fqcn ⇒ fqcn }
自此provider創建結束,簡單來說就是根據配置,通過Class.forName加載了對應的ActorRefProvider實現類,並把當前的參數傳給它,調用對應的構造函數,完成實例的創建。provider創建完成后調用init完成初始化,就可以創建actor了。
start函數還創建了一個DeadLetterListener類型的actor,這也是我們經常會遇到的。如果給一個不存在的目標actor發消息,或者發送消息超時,都會把消息轉發給這個DeadLetter。這就是一個普通的actor,主要用來接收沒有發送成功的消息,並把消息打印出來。后面還調用了eventStream.startUnsubscriber(),由於eventStream也不是我們關注的重點,先忽略。loadExtensions()功能也比較單一,就是根據配置加載ActorSystem的擴展類,並進行注冊,關於Extensions也不再深入分析。
private def loadExtensions() { /** * @param throwOnLoadFail Throw exception when an extension fails to load (needed for backwards compatibility) */ def loadExtensions(key: String, throwOnLoadFail: Boolean): Unit = { immutableSeq(settings.config.getStringList(key)) foreach { fqcn ⇒ dynamicAccess.getObjectFor[AnyRef](fqcn) recoverWith { case _ ⇒ dynamicAccess.createInstanceFor[AnyRef](fqcn, Nil) } match { case Success(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()) case Success(p: ExtensionId[_]) ⇒ registerExtension(p) case Success(other) ⇒ if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) else throw new RuntimeException(s"[$fqcn] is not an 'ExtensionIdProvider' or 'ExtensionId'") case Failure(problem) ⇒ if (!throwOnLoadFail) log.error(problem, "While trying to load extension [{}], skipping...", fqcn) else throw new RuntimeException(s"While trying to load extension [$fqcn]", problem) } } } // eager initialization of CoordinatedShutdown CoordinatedShutdown(this) loadExtensions("akka.library-extensions", throwOnLoadFail = true) loadExtensions("akka.extensions", throwOnLoadFail = false) }
至此,我們就對ActorSystem的創建和啟動分析完畢,但還有一些細節需要說明,在start之前還是有一些其他字段的初始化。由於這些字段同樣重要,且初始化的順序沒有太大關聯,我就按照代碼結構從上至下依次分析幾個重要的字段。
final val threadFactory: MonitorableThreadFactory = MonitorableThreadFactory(name, settings.Daemonicity, Option(classLoader), uncaughtExceptionHandler)
threadFactory這是一個線程工廠類,默認是MonitorableThreadFactory,我們只需要記住這是一個線程工廠類,默認創建ForkJoinWorkerThread的線程就好了。
val scheduler: Scheduler = createScheduler()
scheduler是一個調度器,主要用來定時發送一些消息,這個我們也會經常遇到,但不是此次分析的重點,略過就好。
val mailboxes: Mailboxes = new Mailboxes(settings, eventStream, dynamicAccess, deadLetters)
mailboxes是一個非常重要的字段,它是Mailboxes一個實例,用來創建對應的Mailbox,Mailbox用來接收消息,並通過dispatcher分發給對應的actor。
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext)) val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
dispatchers是Dispatchers的一個實例,它用來創建、查詢對應的MessageDispatcher。它有一個默認的全局dispatcher,從代碼來看,它從配置中讀取akka.actor.default-dispatcher,並創建MessageDispatcher實例。MessageDispatcher也是一個非常重要的類,我們后面再具體分析。
/** * The one and only default dispatcher. */ def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
object Dispatchers { /** * The id of the default dispatcher, also the full key of the * configuration of the default dispatcher. */ final val DefaultDispatcherId = "akka.actor.default-dispatcher" }
到這里我們就算分析完了ActorSystem的創建過程及其技術細節,當然ActorSystem創建只是第一步,后面需要創建actor,actor如何收到dispatcher的消息,還是需要進一步研究的。