上一篇博客中,我們研究了remote模式下如何發消息給遠程actor,其實無論如何,最終都是通過RemoteActorRef來發送消息的。另外官網也明確說明了,ActorRef是可以忽略網絡位置的,這其實有兩點含義:1.ActorRef可以序列化后跨網絡傳輸;2.ActorRef反序列化后在本地可以正常識別是本地還是遠程。那么實現位置透明就有兩個關鍵點:1.ActorRef的序列化過程;2.ActorRef的識別。下面我們來逐一研究這兩個關鍵點。
在local模式下,是通過InternalActorRef發送消息的;remote是通過RemoteActorRef發送消息的,那這兩者有什么區別呢?
/** * INTERNAL API * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. */ private[akka] class RemoteActorRef private[akka] ( remote: RemoteTransport, val localAddressToUse: Address, val path: ActorPath, val getParent: InternalActorRef, props: Option[Props], deploy: Option[Deploy]) extends InternalActorRef with RemoteRef
從源碼可以看出RemoteActorRef繼承了InternalActorRef,還擴展了RemoteRef特質。
private[akka] trait RemoteRef extends ActorRefScope { final def isLocal = false }
RemoteRef比較簡單,就是把isLocal定義成了false。這樣看來RemoteRef和InternalActorRef差別並不是特別大。ActorRef在本地傳輸時,默認是不需要序列化的,那該如何切入序列化過程呢?我們首先來看序列化的過程。
還記得之前的文章嗎?在remote模式下,是通過EndpointWriter.writeSend發送消息的。
def writeSend(s: Send): Boolean = try { handle match { case Some(h) ⇒ if (provider.remoteSettings.LogSend && log.isDebugEnabled) { def msgLog = s"RemoteMessage: [${s.message}] to [${s.recipient}]<+[${s.recipient.path}] from [${s.senderOption.getOrElse(extendedSystem.deadLetters)}]" log.debug("sending message {}", msgLog) } val pdu = codec.constructMessage( s.recipient.localAddressToUse, s.recipient, serializeMessage(s.message), s.senderOption, seqOption = s.seqOpt, ackOption = lastAck) val pduSize = pdu.size remoteMetrics.logPayloadBytes(s.message, pduSize) if (pduSize > transport.maximumPayloadBytes) { val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes.") log.error(reason, "Transient association error (association remains live)") true } else { val ok = h.write(pdu) if (ok) { ackDeadline = newAckDeadline lastAck = None } ok } case None ⇒ throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.") } } catch { case e: NotSerializableException ⇒ log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass) true case e: IllegalArgumentException ⇒ log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass) true case e: MessageSerializer.SerializationException ⇒ log.error(e, "{} Transient association error (association remains live)", e.getMessage) true case e: EndpointException ⇒ publishAndThrow(e, Logging.ErrorLevel) case NonFatal(e) ⇒ publishAndThrow(new EndpointException("Failed to write message to the transport", e), Logging.ErrorLevel) }
可以看到codec.constructMessage函數中調用了serializeMessage函數對待發送的消息進行了序列化,那如果用戶發送的消息中包含ActorRef,就一定會在這個函數處理。哪些消息會包含ActorRef呢?還記得ActorIdentity嗎,里面就包含ActorRef。當然了,如果用戶自定義的消息包含ActorRef,也一定會被序列化。
private def serializeMessage(msg: Any): SerializedMessage = handle match { case Some(h) ⇒ Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, extendedSystem)) { MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef]) } case None ⇒ throw new EndpointException("Internal error: No handle was present during serialization of outbound message.") }
很明顯EndpointWriter.serializeMessage有調用了MessageSerializer.serialize進行序列化。
/** * Serialization information needed for serializing local actor refs, * or if serializer library e.g. custom serializer/deserializer in Jackson need * access to the current `ActorSystem`. */ final case class Information(address: Address, system: ActorSystem)
Serialization.Information這個case class比較簡單,官網說的也很清楚,這里不再詳細分析。簡單點說,它就是給序列化過程提供了必需的基礎變量,例如地址和當前的ActorSystem。
/** * INTERNAL API: This holds a reference to the current transport serialization information used for * serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in * Jackson need access to the current `ActorSystem`. */ @InternalApi private[akka] val currentTransportInformation = new DynamicVariable[Information](null)
Serialization.currentTransportInformation又是什么呢?
/** `DynamicVariables` provide a binding mechanism where the current * value is found through dynamic scope, but where access to the * variable itself is resolved through static scope. * * The current value can be retrieved with the value method. New values * should be pushed using the `withValue` method. Values pushed via * `withValue` only stay valid while the `withValue`'s second argument, a * parameterless closure, executes. When the second argument finishes, * the variable reverts to the previous value. * * {{{ * someDynamicVariable.withValue(newValue) { * // ... code called in here that calls value ... * // ... will be given back the newValue ... * } * }}} * * Each thread gets its own stack of bindings. When a * new thread is created, the `DynamicVariable` gets a copy * of the stack of bindings from the parent thread, and * from then on the bindings for the new thread * are independent of those for the original thread. * * @author Lex Spoon * @version 1.1, 2007-5-21 */ class DynamicVariable[T](init: T)
currentTransportInformation是一個動態變量,其具體的功能和用法,scala官網說的也很清楚,你可以把它理解成一個能夠繼承父線程數據的ThreadLocal變量。
/** Set the value of the variable while executing the specified * thunk. * * @param newval The value to which to set the variable * @param thunk The code to evaluate under the new setting */ def withValue[S](newval: T)(thunk: => S): S = { val oldval = value tl set newval try thunk finally tl set oldval }
withValue函數,其實就是給thunk提供一個線程安全的執行變量環境。
綜上所述,MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])在執行時通過currentTransportInformation獲取到的值就是Serialization.Information(h.localAddress, extendedSystem),那就來看看serialize在做什么。
/** * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol * Throws `NotSerializableException` if serializer was not configured for the message type. * Throws `MessageSerializer.SerializationException` if exception was thrown from `toBinary` of the * serializer. */ def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) val builder = SerializedMessage.newBuilder val oldInfo = Serialization.currentTransportInformation.value try { if (oldInfo eq null) Serialization.currentTransportInformation.value = system.provider.serializationInformation builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) builder.setSerializerId(serializer.identifier) val ms = Serializers.manifestFor(serializer, message) if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms)) builder.build } catch { case NonFatal(e) ⇒ throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " + s"using serializer [${serializer.getClass}].", e) } finally Serialization.currentTransportInformation.value = oldInfo }
serialize函數簡單來說,就是通過SerializationExtension給message找到一個serializer,用serializer把message轉化成二進制,也就是序列化message。在通過SerializedMessage.Builder設置一些其他信息,最終返回SerializedMessage消息。那么如何通過SerializationExtension找到一個合適的serializer就很重要了。
/** * Returns the configured Serializer for the given Class. The configured Serializer * is used if the configured class `isAssignableFrom` from the `clazz`, i.e. * the configured class is a super class or implemented interface. In case of * ambiguity it is primarily using the most specific configured class, * and secondly the entry configured first. * * Throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class. */ @throws(classOf[NotSerializableException]) def serializerFor(clazz: Class[_]): Serializer = serializerMap.get(clazz) match { case null ⇒ // bindings are ordered from most specific to least specific def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean = possibilities.size == 1 || (possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) || (possibilities forall (_._2 == possibilities(0)._2)) val ser = { bindings.filter { case (c, _) ⇒ c isAssignableFrom clazz } match { case immutable.Seq() ⇒ throw new NotSerializableException(s"No configured serialization-bindings for class [${clazz.getName}]") case possibilities ⇒ if (unique(possibilities)) possibilities.head._2 else { // give JavaSerializer lower priority if multiple serializers found val possibilitiesWithoutJavaSerializer = possibilities.filter { case (_, _: JavaSerializer) ⇒ false case (_, _: DisabledJavaSerializer) ⇒ false case _ ⇒ true } if (possibilitiesWithoutJavaSerializer.isEmpty) { // shouldn't happen throw new NotSerializableException(s"More than one JavaSerializer configured for class [${clazz.getName}]") } if (!unique(possibilitiesWithoutJavaSerializer)) { _log.warning(LogMarker.Security, "Multiple serializers found for [{}], choosing first of: [{}]", clazz.getName, possibilitiesWithoutJavaSerializer.map { case (_, s) ⇒ s.getClass.getName }.mkString(", ")) } possibilitiesWithoutJavaSerializer.head._2 } } } serializerMap.putIfAbsent(clazz, ser) match { case null ⇒ if (shouldWarnAboutJavaSerializer(clazz, ser)) { _log.warning(LogMarker.Security, "Using the default Java serializer for class [{}] which is not recommended because of " + "performance implications. Use another serializer or disable this warning using the setting " + "'akka.actor.warn-about-java-serializer-usage'", clazz.getName) } log.debug("Using serializer [{}] for message [{}]", ser.getClass.getName, clazz.getName) ser case some ⇒ some } case ser ⇒ ser }
findSerializerFor最終調用了serializerFor,serializerFor簡單點來說就是首先查找配置的序列化函數,如果沒有找到則通過bindings中查找是否符合isAssignableFrom條件的序列化類,如果只找到了相同的序列化類,則使用該序列化類,如果找到多個則優先使用除JavaSerializer以外的序列化類。當然了,默認情況下是一定可以找到JavaSerializer的。serializer具體加載的過程這里就不再具體分析,只需要知道它是從配置文件加載的就可以了。那默認配置是怎么樣的呢?下面是akka remote包里面的reference.conf摘錄出來的部分配置。
serializers { akka-containers = "akka.remote.serialization.MessageContainerSerializer" akka-misc = "akka.remote.serialization.MiscMessageSerializer" artery = "akka.remote.serialization.ArteryMessageSerializer" proto = "akka.remote.serialization.ProtobufSerializer" daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer" primitive-long = "akka.remote.serialization.LongSerializer" primitive-int = "akka.remote.serialization.IntSerializer" primitive-string = "akka.remote.serialization.StringSerializer" primitive-bytestring = "akka.remote.serialization.ByteStringSerializer" akka-system-msg = "akka.remote.serialization.SystemMessageSerializer" } serialization-bindings { "akka.actor.ActorSelectionMessage" = akka-containers "akka.remote.DaemonMsgCreate" = daemon-create "akka.remote.artery.ArteryMessage" = artery # Since akka.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order # to avoid ambiguity. "akka.protobuf.GeneratedMessage" = proto # Since com.google.protobuf.Message does not extend Serializable but # GeneratedMessage does, need to use the more specific one here in order # to avoid ambiguity. # This com.google.protobuf serialization binding is only used if the class can be loaded, # i.e. com.google.protobuf dependency has been added in the application project. "com.google.protobuf.GeneratedMessage" = proto "java.util.Optional" = akka-misc # The following are handled by the MiscMessageSerializer, but they are not enabled for # compatibility reasons (it was added in Akka 2.5.[8,9,12]). Enable them by adding: # akka.actor.serialization-bindings { # "akka.Done" = akka-misc # "akka.NotUsed" = akka-misc # "akka.actor.Address" = akka-misc # "akka.remote.UniqueAddress" = akka-misc # } } # Additional serialization-bindings that are replacing Java serialization are # defined in this section for backwards compatibility reasons. They are included # by default but can be excluded for backwards compatibility with Akka 2.4.x. # They can be disabled with enable-additional-serialization-bindings=off. additional-serialization-bindings { "akka.actor.Identify" = akka-misc "akka.actor.ActorIdentity" = akka-misc "scala.Some" = akka-misc "scala.None$" = akka-misc "akka.actor.Status$Success" = akka-misc "akka.actor.Status$Failure" = akka-misc "akka.actor.ActorRef" = akka-misc "akka.actor.PoisonPill$" = akka-misc "akka.actor.Kill$" = akka-misc "akka.remote.RemoteWatcher$Heartbeat$" = akka-misc "akka.remote.RemoteWatcher$HeartbeatRsp" = akka-misc "akka.actor.ActorInitializationException" = akka-misc "akka.dispatch.sysmsg.SystemMessage" = akka-system-msg "java.lang.String" = primitive-string "akka.util.ByteString$ByteString1C" = primitive-bytestring "akka.util.ByteString$ByteString1" = primitive-bytestring "akka.util.ByteString$ByteStrings" = primitive-bytestring "java.lang.Long" = primitive-long "scala.Long" = primitive-long "java.lang.Integer" = primitive-int "scala.Int" = primitive-int # Java Serializer is by default used for exceptions. # It's recommended that you implement custom serializer for exceptions that are # sent remotely, e.g. in akka.actor.Status.Failure for ask replies. You can add # binding to akka-misc (MiscMessageSerializerSpec) for the exceptions that have # a constructor with single message String or constructor with message String as # first parameter and cause Throwable as second parameter. Note that it's not # safe to add this binding for general exceptions such as IllegalArgumentException # because it may have a subclass without required constructor. "java.lang.Throwable" = java "akka.actor.IllegalActorStateException" = akka-misc "akka.actor.ActorKilledException" = akka-misc "akka.actor.InvalidActorNameException" = akka-misc "akka.actor.InvalidMessageException" = akka-misc "akka.actor.LocalScope$" = akka-misc "akka.remote.RemoteScope" = akka-misc "com.typesafe.config.impl.SimpleConfig" = akka-misc "com.typesafe.config.Config" = akka-misc "akka.routing.FromConfig" = akka-misc "akka.routing.DefaultResizer" = akka-misc "akka.routing.BalancingPool" = akka-misc "akka.routing.BroadcastGroup" = akka-misc "akka.routing.BroadcastPool" = akka-misc "akka.routing.RandomGroup" = akka-misc "akka.routing.RandomPool" = akka-misc "akka.routing.RoundRobinGroup" = akka-misc "akka.routing.RoundRobinPool" = akka-misc "akka.routing.ScatterGatherFirstCompletedGroup" = akka-misc "akka.routing.ScatterGatherFirstCompletedPool" = akka-misc "akka.routing.SmallestMailboxPool" = akka-misc "akka.routing.TailChoppingGroup" = akka-misc "akka.routing.TailChoppingPool" = akka-misc "akka.remote.routing.RemoteRouterConfig" = akka-misc }
通過上面的配置,我們知道ActorRef是通過akka-misc,也就是akka.remote.serialization.MiscMessageSerializer來序列化的。
MiscMessageSerializer.toBinary是調用了serializeActorRef對ActorRef序列化的。
private def serializeActorRef(ref: ActorRef): Array[Byte] = actorRefBuilder(ref).build().toByteArray
private def actorRefBuilder(actorRef: ActorRef): ContainerFormats.ActorRef.Builder = ContainerFormats.ActorRef.newBuilder() .setPath(Serialization.serializedActorPath(actorRef))
/** * The serialized path of an actorRef, based on the current transport serialization information. * If there is no external address available in the given `ActorRef` then the systems default * address will be used and that is retrieved from the ThreadLocal `Serialization.Information` * that was set with [[Serialization#withTransportInformation]]. */ def serializedActorPath(actorRef: ActorRef): String = { val path = actorRef.path val originalSystem: ExtendedActorSystem = actorRef match { case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem] case _ ⇒ null } Serialization.currentTransportInformation.value match { case null ⇒ originalSystem match { case null ⇒ path.toSerializationFormat case system ⇒ try path.toSerializationFormatWithAddress(system.provider.getDefaultAddress) catch { case NonFatal(_) ⇒ path.toSerializationFormat } } case Information(address, system) ⇒ if (originalSystem == null || originalSystem == system) path.toSerializationFormatWithAddress(address) else { val provider = originalSystem.provider path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress)) } } }
首先Serialization.currentTransportInformation.value一定不為空,這個之前已經賦值過了,所以一定會走到Information(address, system),而且無論執行if的哪個分支,最后都是通過調用toSerializationFormatWithAddress對ActorRef進行序列化的。
這樣來看ActorRef在序列化時,對當前的path轉化成了序列化的格式,其實就是ActorPath的String值。只不過在remote模式下,是包含host:port、協議(比如akka.tcp)等信息的。那么分析到這里,聰明的讀者一定知道反序列化的過程了:對ActorPath的String值進行解析,轉化成對應的RemoteActorRef。關於如果通過ActorPath在之前的文章其實我們有分析過,但這里還是再帶領大家過一遍。
override def receive: Receive = { case Disassociated(info) ⇒ handleDisassociated(info) case InboundPayload(p) if p.size <= transport.maximumPayloadBytes ⇒ val (ackOption, msgOption) = tryDecodeMessageAndAck(p) for (ack ← ackOption; reliableDelivery ← reliableDeliverySupervisor) reliableDelivery ! ack msgOption match { case Some(msg) ⇒ if (msg.reliableDeliveryEnabled) { ackedReceiveBuffer = ackedReceiveBuffer.receive(msg) deliverAndAck() } else try msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption) catch { case e: NotSerializableException ⇒ logTransientSerializationError(msg, e) case e: IllegalArgumentException ⇒ logTransientSerializationError(msg, e) } case None ⇒ } case InboundPayload(oversized) ⇒ log.error( new OversizedPayloadException(s"Discarding oversized payload received: " + s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes."), "Transient error while reading from association (association remains live)") case StopReading(writer, replyTo) ⇒ saveState() context.become(notReading) replyTo ! StoppedReading(writer) }
EndpointReader.receive在收到InboundPayload消息后,先把它decode成Message,然后把消息通過msgDispatch.dispatch發送出去,而msgDispatch是一個DefaultMessageDispatcher實例。
override def dispatch( recipient: InternalActorRef, recipientAddress: Address, serializedMessage: SerializedMessage, senderOption: OptionVal[ActorRef]): Unit = { import provider.remoteSettings._ lazy val payload: AnyRef = MessageSerializer.deserialize(system, serializedMessage) def payloadClass: Class[_] = if (payload eq null) null else payload.getClass val sender: ActorRef = senderOption.getOrElse(system.deadLetters) val originalReceiver = recipient.path def logMessageReceived(messageType: String): Unit = { if (LogReceive && log.isDebugEnabled) log.debug(s"received $messageType RemoteMessage: [{}] to [{}]<+[{}] from [{}]", payload, recipient, originalReceiver, sender) } recipient match { case `remoteDaemon` ⇒ if (UntrustedMode) log.debug(LogMarker.Security, "dropping daemon message in untrusted mode") else { logMessageReceived("daemon message") remoteDaemon ! payload } case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ logMessageReceived("local message") payload match { case sel: ActorSelectionMessage ⇒ if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) || sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) log.debug( LogMarker.Security, "operating in UntrustedMode, dropping inbound actor selection to [{}], " + "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration", sel.elements.mkString("/", "/", "")) else // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor ActorSelection.deliverSelection(l, sender, sel) case msg: PossiblyHarmful if UntrustedMode ⇒ log.debug(LogMarker.Security, "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName) case msg: SystemMessage ⇒ l.sendSystemMessage(msg) case msg ⇒ l.!(msg)(sender) } case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒ logMessageReceived("remote-destined message") if (provider.transport.addresses(recipientAddress)) // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) r.!(payload)(sender) else log.error( "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]", payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) case r ⇒ log.error( "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) } }
dispatch首先調用MessageSerializer.deserialize(system, serializedMessage)對消息進行反序列化。
/** * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message */ def deserialize(system: ExtendedActorSystem, messageProtocol: SerializedMessage): AnyRef = { SerializationExtension(system).deserialize( messageProtocol.getMessage.toByteArray, messageProtocol.getSerializerId, if (messageProtocol.hasMessageManifest) messageProtocol.getMessageManifest.toStringUtf8 else "").get }
/** * Deserializes the given array of bytes using the specified serializer id, * using the optional type hint to the Serializer. * Returns either the resulting object or an Exception if one was thrown. */ def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] = Try { val serializer = try getSerializerById(serializerId) catch { case _: NoSuchElementException ⇒ throw new NotSerializableException( s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " + "akka.actor.serializers is not in synch between the two systems.") } deserializeByteArray(bytes, serializer, manifest) }
分析上面代碼得知,反序列化時就是簡單的通過serializerId找到對應具體的序列化類,然后調用deserializeByteArray函數進行反序列化。還記得如果當前消息是ActorRef的話,serializerId是什么嗎?沒錯,就是akka-misc。也就是說最終通過akka.remote.serialization.MiscMessageSerializer進行反序列化。但有一個字段也是比較關鍵manifest,這個manifest是什么呢?可以從ActorRef的序列化過程找到蛛絲馬跡。
在MessageSerializer.serialize函數中,有一段代碼對這個manifest進行了賦值:val ms = Serializers.manifestFor(serializer, message)。
def manifestFor(s: Serializer, message: AnyRef): String = s match { case s2: SerializerWithStringManifest ⇒ s2.manifest(message) case _ ⇒ if (s.includeManifest) message.getClass.getName else "" }
其實就是判斷當前的Serializer是不是SerializerWithStringManifest的子類,如果是就調用manifest,如果不是,就判斷includeManifest是不是為true,如果是就返回當前類的類名,否則返回空字符串。我們來看下MiscMessageSerializer的定義。
class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer
private val ActorRefManifest = "G"
很明顯它繼承了SerializerWithStringManifest,而且對於ActorRef,manifest的值就是字符串G。
private def deserializeByteArray(bytes: Array[Byte], serializer: Serializer, manifest: String): AnyRef = { @tailrec def updateCache(cache: Map[String, Option[Class[_]]], key: String, value: Option[Class[_]]): Boolean = { manifestCache.compareAndSet(cache, cache.updated(key, value)) || updateCache(manifestCache.get, key, value) // recursive, try again } withTransportInformation { () ⇒ serializer match { case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest) case s1 ⇒ if (manifest == "") s1.fromBinary(bytes, None) else { val cache = manifestCache.get cache.get(manifest) match { case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest) case None ⇒ system.dynamicAccess.getClassFor[AnyRef](manifest) match { case Success(classManifest) ⇒ val classManifestOption: Option[Class[_]] = Some(classManifest) updateCache(cache, manifest, classManifestOption) s1.fromBinary(bytes, classManifestOption) case Failure(e) ⇒ throw new NotSerializableException( s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].") } } } } } }
那我們接下來看看deserializeByteArray的源碼,它首先會判斷當前serializer是不是SerializerWithStringManifest,很顯然對於ActorRef,serializer是SerializerWithStringManifest的子類,那我們來看看是fromBinary是如何實現的。
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = fromBinaryMap.get(manifest) match { case Some(deserializer) ⇒ deserializer(bytes) case None ⇒ throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") }
private val fromBinaryMap = Map[String, Array[Byte] ⇒ AnyRef]( IdentifyManifest → deserializeIdentify, ActorIdentityManifest → deserializeActorIdentity, StatusSuccessManifest → deserializeStatusSuccess, StatusFailureManifest → deserializeStatusFailure, ThrowableManifest → throwableSupport.deserializeThrowable, ActorRefManifest → deserializeActorRefBytes, OptionManifest → deserializeOption, OptionalManifest → deserializeOptional, PoisonPillManifest → ((_) ⇒ PoisonPill), KillManifest → ((_) ⇒ Kill), RemoteWatcherHBManifest → ((_) ⇒ RemoteWatcher.Heartbeat), DoneManifest → ((_) ⇒ Done), NotUsedManifest → ((_) ⇒ NotUsed), AddressManifest → deserializeAddressData, UniqueAddressManifest → deserializeUniqueAddress, RemoteWatcherHBRespManifest → deserializeHeartbeatRsp, ActorInitializationExceptionManifest → deserializeActorInitializationException, LocalScopeManifest → ((_) ⇒ LocalScope), RemoteScopeManifest → deserializeRemoteScope, ConfigManifest → deserializeConfig, FromConfigManifest → deserializeFromConfig, DefaultResizerManifest → deserializeDefaultResizer, BalancingPoolManifest → deserializeBalancingPool, BroadcastPoolManifest → deserializeBroadcastPool, RandomPoolManifest → deserializeRandomPool, RoundRobinPoolManifest → deserializeRoundRobinPool, ScatterGatherPoolManifest → deserializeScatterGatherPool, TailChoppingPoolManifest → deserializeTailChoppingPool, RemoteRouterConfigManifest → deserializeRemoteRouterConfig )
private def deserializeActorRefBytes(bytes: Array[Byte]): ActorRef = deserializeActorRef(ContainerFormats.ActorRef.parseFrom(bytes))
private def deserializeActorRef(actorRef: ContainerFormats.ActorRef): ActorRef = serialization.system.provider.resolveActorRef(actorRef.getPath)
由此可見,首先調用了ContainerFormats.ActorRef.parseFrom把Array[Byte] 轉化成了ContainerFormats.ActorRef,這個過程就不再具體分析;其次調用serialization.system.provider.resolveActorRef把當前的ActorPathString轉化成了ActorRef。根據上下文,serialization.system.provider應該就是RemoteActorRefProvider。
def resolveActorRef(path: String): ActorRef = { // using thread local LRU cache, which will call internalRresolveActorRef // if the value is not cached actorRefResolveThreadLocalCache match { case null ⇒ internalResolveActorRef(path) // not initalized yet case c ⇒ c.threadLocalCache(this).getOrCompute(path) } }
resolveActorRef這段代碼會先判斷當前actorRefResolveThreadLocalCache緩存是否已經初始化,很顯然在RemoteActorRefProvider.init過程中,actorRefResolveThreadLocalCache已經被創建,之前也分析過。
/** * INTERNAL API */ private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider) extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) { override protected def compute(k: String): ActorRef = provider.internalResolveActorRef(k) override protected def hash(k: String): Int = Unsafe.fastHash(k) override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef] }
actorRefResolveThreadLocalCache是一個ActorSystem的Extension,這個Extension最終是ActorRefResolveCache的實例,這個類是不是很熟悉?它是一個LruBoundedCache,容量是1024,過期時間是600秒。很顯然第一次getOrCompute時,會調用compute函數,而compute又調用了provider.internalResolveActorRef,解析之后對解析的結果進行緩存,具體如何緩存也不再分析。下面來分析internalResolveActorRef。
/** * INTERNAL API: This is used by the `ActorRefResolveCache` via the * public `resolveActorRef(path: String)`. */ private[akka] def internalResolveActorRef(path: String): ActorRef = path match { case ActorPathExtractor(address, elems) ⇒ if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems) else { val rootPath = RootActorPath(address) / elems try { new RemoteActorRef(transport, transport.localAddressForRemote(address), rootPath, Nobody, props = None, deploy = None) } catch { case NonFatal(e) ⇒ log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage) new EmptyLocalActorRef(this, rootPath, eventStream) } } case _ ⇒ log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path) deadLetters }
internalResolveActorRef還有印象嗎?這個函數在分析actorSelection過程中出現過,它首先會判斷當前機器是否包含path中的address,如果包含就調用LocalActorRefProvider進行查找,否則就創建RemoteActorRef。因為消息來自遠程actor,所以會創建RemoteActorRef作為遠程actor的代理。至此遠程ActorRef反序列化結束。
至此我們對ActorRef的序列化、反序列化過程做了完整的分析,其實Akka的位置透明就是通過ActorPath來實現的,ActorRef跨網路傳輸都是通過ActorPath的String值(包含host/port等信息)來完成的,指定host收到ActorRef的序列化消息,會根據反序列化后的ActorPath的String值,在本地創建遠程actor的ActorRef代理:RemoteActorRef。之后的通信都是通過RemoteActorRef。這樣看來,位置透明也是比較簡單的。