Akka源碼分析-Remote-位置透明


  上一篇博客中,我們研究了remote模式下如何發消息給遠程actor,其實無論如何,最終都是通過RemoteActorRef來發送消息的。另外官網也明確說明了,ActorRef是可以忽略網絡位置的,這其實有兩點含義:1.ActorRef可以序列化后跨網絡傳輸;2.ActorRef反序列化后在本地可以正常識別是本地還是遠程。那么實現位置透明就有兩個關鍵點:1.ActorRef的序列化過程;2.ActorRef的識別。下面我們來逐一研究這兩個關鍵點。

  在local模式下,是通過InternalActorRef發送消息的;remote是通過RemoteActorRef發送消息的,那這兩者有什么區別呢?

/**
 * INTERNAL API
 * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
 * This reference is network-aware (remembers its origin) and immutable.
 */
private[akka] class RemoteActorRef private[akka] (
  remote:                RemoteTransport,
  val localAddressToUse: Address,
  val path:              ActorPath,
  val getParent:         InternalActorRef,
  props:                 Option[Props],
  deploy:                Option[Deploy])
  extends InternalActorRef with RemoteRef 

   從源碼可以看出RemoteActorRef繼承了InternalActorRef,還擴展了RemoteRef特質。

private[akka] trait RemoteRef extends ActorRefScope {
  final def isLocal = false
}

   RemoteRef比較簡單,就是把isLocal定義成了false。這樣看來RemoteRef和InternalActorRef差別並不是特別大。ActorRef在本地傳輸時,默認是不需要序列化的,那該如何切入序列化過程呢?我們首先來看序列化的過程。

  還記得之前的文章嗎?在remote模式下,是通過EndpointWriter.writeSend發送消息的。

 def writeSend(s: Send): Boolean = try {
    handle match {
      case Some(h) ⇒
        if (provider.remoteSettings.LogSend && log.isDebugEnabled) {
          def msgLog = s"RemoteMessage: [${s.message}] to [${s.recipient}]<+[${s.recipient.path}] from [${s.senderOption.getOrElse(extendedSystem.deadLetters)}]"
          log.debug("sending message {}", msgLog)
        }

        val pdu = codec.constructMessage(
          s.recipient.localAddressToUse,
          s.recipient,
          serializeMessage(s.message),
          s.senderOption,
          seqOption = s.seqOpt,
          ackOption = lastAck)

        val pduSize = pdu.size
        remoteMetrics.logPayloadBytes(s.message, pduSize)

        if (pduSize > transport.maximumPayloadBytes) {
          val reason = new OversizedPayloadException(s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes.")
          log.error(reason, "Transient association error (association remains live)")
          true
        } else {
          val ok = h.write(pdu)
          if (ok) {
            ackDeadline = newAckDeadline
            lastAck = None
          }
          ok
        }

      case None ⇒
        throw new EndpointException("Internal error: Endpoint is in state Writing, but no association handle is present.")
    }
  } catch {
    case e: NotSerializableException ⇒
      log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass)
      true
    case e: IllegalArgumentException ⇒
      log.error(e, "Serializer not defined for message type [{}]. Transient association error (association remains live)", s.message.getClass)
      true
    case e: MessageSerializer.SerializationException ⇒
      log.error(e, "{} Transient association error (association remains live)", e.getMessage)
      true
    case e: EndpointException ⇒
      publishAndThrow(e, Logging.ErrorLevel)
    case NonFatal(e) ⇒
      publishAndThrow(new EndpointException("Failed to write message to the transport", e), Logging.ErrorLevel)
  }

   可以看到codec.constructMessage函數中調用了serializeMessage函數對待發送的消息進行了序列化,那如果用戶發送的消息中包含ActorRef,就一定會在這個函數處理。哪些消息會包含ActorRef呢?還記得ActorIdentity嗎,里面就包含ActorRef。當然了,如果用戶自定義的消息包含ActorRef,也一定會被序列化。

private def serializeMessage(msg: Any): SerializedMessage = handle match {
    case Some(h) ⇒
      Serialization.currentTransportInformation.withValue(Serialization.Information(h.localAddress, extendedSystem)) {
        MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])
      }
    case None ⇒
      throw new EndpointException("Internal error: No handle was present during serialization of outbound message.")
  }

   很明顯EndpointWriter.serializeMessage有調用了MessageSerializer.serialize進行序列化。

  /**
   * Serialization information needed for serializing local actor refs,
   * or if serializer library e.g. custom serializer/deserializer in Jackson need
   * access to the current `ActorSystem`.
   */
  final case class Information(address: Address, system: ActorSystem)

   Serialization.Information這個case class比較簡單,官網說的也很清楚,這里不再詳細分析。簡單點說,它就是給序列化過程提供了必需的基礎變量,例如地址和當前的ActorSystem。

 /**
   * INTERNAL API: This holds a reference to the current transport serialization information used for
   * serializing local actor refs, or if serializer library e.g. custom serializer/deserializer in
   * Jackson need access to the current `ActorSystem`.
   */
  @InternalApi private[akka] val currentTransportInformation = new DynamicVariable[Information](null)

   Serialization.currentTransportInformation又是什么呢?

/** `DynamicVariables` provide a binding mechanism where the current
 *  value is found through dynamic scope, but where access to the
 *  variable itself is resolved through static scope.
 *
 *  The current value can be retrieved with the value method. New values
 *  should be pushed using the `withValue` method. Values pushed via
 *  `withValue` only stay valid while the `withValue`'s second argument, a
 *  parameterless closure, executes. When the second argument finishes,
 *  the variable reverts to the previous value.
 *
 *  {{{
 *  someDynamicVariable.withValue(newValue) {
 *    // ... code called in here that calls value ...
 *    // ... will be given back the newValue ...
 *  }
 *  }}}
 *
 *  Each thread gets its own stack of bindings.  When a
 *  new thread is created, the `DynamicVariable` gets a copy
 *  of the stack of bindings from the parent thread, and
 *  from then on the bindings for the new thread
 *  are independent of those for the original thread.
 *
 *  @author  Lex Spoon
 *  @version 1.1, 2007-5-21
 */
class DynamicVariable[T](init: T) 

   currentTransportInformation是一個動態變量,其具體的功能和用法,scala官網說的也很清楚,你可以把它理解成一個能夠繼承父線程數據的ThreadLocal變量。

/** Set the value of the variable while executing the specified
    * thunk.
    *
    * @param newval The value to which to set the variable
    * @param thunk The code to evaluate under the new setting
    */
  def withValue[S](newval: T)(thunk: => S): S = {
    val oldval = value
    tl set newval

    try thunk
    finally tl set oldval
  }

   withValue函數,其實就是給thunk提供一個線程安全的執行變量環境。

  綜上所述,MessageSerializer.serialize(extendedSystem, msg.asInstanceOf[AnyRef])在執行時通過currentTransportInformation獲取到的值就是Serialization.Information(h.localAddress, extendedSystem),那就來看看serialize在做什么。

/**
   * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol
   * Throws `NotSerializableException` if serializer was not configured for the message type.
   * Throws `MessageSerializer.SerializationException` if exception was thrown from `toBinary` of the
   * serializer.
   */
  def serialize(system: ExtendedActorSystem, message: AnyRef): SerializedMessage = {
    val s = SerializationExtension(system)
    val serializer = s.findSerializerFor(message)
    val builder = SerializedMessage.newBuilder

    val oldInfo = Serialization.currentTransportInformation.value
    try {
      if (oldInfo eq null)
        Serialization.currentTransportInformation.value = system.provider.serializationInformation

      builder.setMessage(ByteString.copyFrom(serializer.toBinary(message)))
      builder.setSerializerId(serializer.identifier)

      val ms = Serializers.manifestFor(serializer, message)
      if (ms.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(ms))

      builder.build
    } catch {
      case NonFatal(e) ⇒
        throw new SerializationException(s"Failed to serialize remote message [${message.getClass}] " +
          s"using serializer [${serializer.getClass}].", e)
    } finally Serialization.currentTransportInformation.value = oldInfo
  }

   serialize函數簡單來說,就是通過SerializationExtension給message找到一個serializer,用serializer把message轉化成二進制,也就是序列化message。在通過SerializedMessage.Builder設置一些其他信息,最終返回SerializedMessage消息。那么如何通過SerializationExtension找到一個合適的serializer就很重要了。

/**
   * Returns the configured Serializer for the given Class. The configured Serializer
   * is used if the configured class `isAssignableFrom` from the `clazz`, i.e.
   * the configured class is a super class or implemented interface. In case of
   * ambiguity it is primarily using the most specific configured class,
   * and secondly the entry configured first.
   *
   * Throws java.io.NotSerializableException if no `serialization-bindings` is configured for the class.
   */
  @throws(classOf[NotSerializableException])
  def serializerFor(clazz: Class[_]): Serializer =
    serializerMap.get(clazz) match {
      case null ⇒ // bindings are ordered from most specific to least specific
        def unique(possibilities: immutable.Seq[(Class[_], Serializer)]): Boolean =
          possibilities.size == 1 ||
            (possibilities forall (_._1 isAssignableFrom possibilities(0)._1)) ||
            (possibilities forall (_._2 == possibilities(0)._2))

        val ser = {
          bindings.filter {
            case (c, _) ⇒ c isAssignableFrom clazz
          } match {
            case immutable.Seq() ⇒
              throw new NotSerializableException(s"No configured serialization-bindings for class [${clazz.getName}]")
            case possibilities ⇒
              if (unique(possibilities))
                possibilities.head._2
              else {
                // give JavaSerializer lower priority if multiple serializers found
                val possibilitiesWithoutJavaSerializer = possibilities.filter {
                  case (_, _: JavaSerializer)         ⇒ false
                  case (_, _: DisabledJavaSerializer) ⇒ false
                  case _                              ⇒ true
                }
                if (possibilitiesWithoutJavaSerializer.isEmpty) {
                  // shouldn't happen
                  throw new NotSerializableException(s"More than one JavaSerializer configured for class [${clazz.getName}]")
                }

                if (!unique(possibilitiesWithoutJavaSerializer)) {
                  _log.warning(LogMarker.Security, "Multiple serializers found for [{}], choosing first of: [{}]",
                    clazz.getName,
                    possibilitiesWithoutJavaSerializer.map { case (_, s) ⇒ s.getClass.getName }.mkString(", "))
                }
                possibilitiesWithoutJavaSerializer.head._2

              }

          }
        }

        serializerMap.putIfAbsent(clazz, ser) match {
          case null ⇒
            if (shouldWarnAboutJavaSerializer(clazz, ser)) {
              _log.warning(LogMarker.Security, "Using the default Java serializer for class [{}] which is not recommended because of " +
                "performance implications. Use another serializer or disable this warning using the setting " +
                "'akka.actor.warn-about-java-serializer-usage'", clazz.getName)
            }
            log.debug("Using serializer [{}] for message [{}]", ser.getClass.getName, clazz.getName)
            ser
          case some ⇒ some
        }
      case ser ⇒ ser
    }

   findSerializerFor最終調用了serializerFor,serializerFor簡單點來說就是首先查找配置的序列化函數,如果沒有找到則通過bindings中查找是否符合isAssignableFrom條件的序列化類,如果只找到了相同的序列化類,則使用該序列化類,如果找到多個則優先使用除JavaSerializer以外的序列化類。當然了,默認情況下是一定可以找到JavaSerializer的。serializer具體加載的過程這里就不再具體分析,只需要知道它是從配置文件加載的就可以了。那默認配置是怎么樣的呢?下面是akka remote包里面的reference.conf摘錄出來的部分配置。

serializers {
      akka-containers = "akka.remote.serialization.MessageContainerSerializer"
      akka-misc = "akka.remote.serialization.MiscMessageSerializer"
      artery = "akka.remote.serialization.ArteryMessageSerializer"
      proto = "akka.remote.serialization.ProtobufSerializer"
      daemon-create = "akka.remote.serialization.DaemonMsgCreateSerializer"
      primitive-long = "akka.remote.serialization.LongSerializer"
      primitive-int = "akka.remote.serialization.IntSerializer"
      primitive-string = "akka.remote.serialization.StringSerializer"
      primitive-bytestring = "akka.remote.serialization.ByteStringSerializer"
      akka-system-msg = "akka.remote.serialization.SystemMessageSerializer"
    }

    serialization-bindings {
      "akka.actor.ActorSelectionMessage" = akka-containers

      "akka.remote.DaemonMsgCreate" = daemon-create

      "akka.remote.artery.ArteryMessage" = artery

      # Since akka.protobuf.Message does not extend Serializable but
      # GeneratedMessage does, need to use the more specific one here in order
      # to avoid ambiguity.
      "akka.protobuf.GeneratedMessage" = proto

      # Since com.google.protobuf.Message does not extend Serializable but
      # GeneratedMessage does, need to use the more specific one here in order
      # to avoid ambiguity.
      # This com.google.protobuf serialization binding is only used if the class can be loaded,
      # i.e. com.google.protobuf dependency has been added in the application project.
      "com.google.protobuf.GeneratedMessage" = proto

      "java.util.Optional" = akka-misc


      # The following are handled by the MiscMessageSerializer, but they are not enabled for
      # compatibility reasons (it was added in Akka 2.5.[8,9,12]). Enable them by adding:
      # akka.actor.serialization-bindings {
      #   "akka.Done"                 = akka-misc
      #   "akka.NotUsed"              = akka-misc
      #   "akka.actor.Address"        = akka-misc
      #   "akka.remote.UniqueAddress" = akka-misc
      # }
    }

    # Additional serialization-bindings that are replacing Java serialization are
    # defined in this section for backwards compatibility reasons. They are included
    # by default but can be excluded for backwards compatibility with Akka 2.4.x.
    # They can be disabled with enable-additional-serialization-bindings=off.
    additional-serialization-bindings {
      "akka.actor.Identify" = akka-misc
      "akka.actor.ActorIdentity" = akka-misc
      "scala.Some" = akka-misc
      "scala.None$" = akka-misc
      "akka.actor.Status$Success" = akka-misc
      "akka.actor.Status$Failure" = akka-misc
      "akka.actor.ActorRef" = akka-misc
      "akka.actor.PoisonPill$" = akka-misc
      "akka.actor.Kill$" = akka-misc
      "akka.remote.RemoteWatcher$Heartbeat$" = akka-misc
      "akka.remote.RemoteWatcher$HeartbeatRsp" = akka-misc
      "akka.actor.ActorInitializationException" = akka-misc

      "akka.dispatch.sysmsg.SystemMessage" = akka-system-msg

      "java.lang.String" = primitive-string
      "akka.util.ByteString$ByteString1C" = primitive-bytestring
      "akka.util.ByteString$ByteString1" = primitive-bytestring
      "akka.util.ByteString$ByteStrings" = primitive-bytestring
      "java.lang.Long" = primitive-long
      "scala.Long" = primitive-long
      "java.lang.Integer" = primitive-int
      "scala.Int" = primitive-int

      # Java Serializer is by default used for exceptions.
      # It's recommended that you implement custom serializer for exceptions that are
      # sent remotely, e.g. in akka.actor.Status.Failure for ask replies. You can add
      # binding to akka-misc (MiscMessageSerializerSpec) for the exceptions that have
      # a constructor with single message String or constructor with message String as
      # first parameter and cause Throwable as second parameter. Note that it's not
      # safe to add this binding for general exceptions such as IllegalArgumentException
      # because it may have a subclass without required constructor.
      "java.lang.Throwable" = java
      "akka.actor.IllegalActorStateException" = akka-misc
      "akka.actor.ActorKilledException" = akka-misc
      "akka.actor.InvalidActorNameException" = akka-misc
      "akka.actor.InvalidMessageException" = akka-misc

      "akka.actor.LocalScope$" = akka-misc
      "akka.remote.RemoteScope" = akka-misc

      "com.typesafe.config.impl.SimpleConfig" = akka-misc
      "com.typesafe.config.Config" = akka-misc

      "akka.routing.FromConfig" = akka-misc
      "akka.routing.DefaultResizer" = akka-misc
      "akka.routing.BalancingPool" = akka-misc
      "akka.routing.BroadcastGroup" = akka-misc
      "akka.routing.BroadcastPool" = akka-misc
      "akka.routing.RandomGroup" = akka-misc
      "akka.routing.RandomPool" = akka-misc
      "akka.routing.RoundRobinGroup" = akka-misc
      "akka.routing.RoundRobinPool" = akka-misc
      "akka.routing.ScatterGatherFirstCompletedGroup" = akka-misc
      "akka.routing.ScatterGatherFirstCompletedPool" = akka-misc
      "akka.routing.SmallestMailboxPool" = akka-misc
      "akka.routing.TailChoppingGroup" = akka-misc
      "akka.routing.TailChoppingPool" = akka-misc
      "akka.remote.routing.RemoteRouterConfig" = akka-misc
    }

   通過上面的配置,我們知道ActorRef是通過akka-misc,也就是akka.remote.serialization.MiscMessageSerializer來序列化的。

  MiscMessageSerializer.toBinary是調用了serializeActorRef對ActorRef序列化的。

private def serializeActorRef(ref: ActorRef): Array[Byte] =
    actorRefBuilder(ref).build().toByteArray

 

  private def actorRefBuilder(actorRef: ActorRef): ContainerFormats.ActorRef.Builder =
    ContainerFormats.ActorRef.newBuilder()
      .setPath(Serialization.serializedActorPath(actorRef))

 

/**
   * The serialized path of an actorRef, based on the current transport serialization information.
   * If there is no external address available in the given `ActorRef` then the systems default
   * address will be used and that is retrieved from the ThreadLocal `Serialization.Information`
   * that was set with [[Serialization#withTransportInformation]].
   */
  def serializedActorPath(actorRef: ActorRef): String = {
    val path = actorRef.path
    val originalSystem: ExtendedActorSystem = actorRef match {
      case a: ActorRefWithCell ⇒ a.underlying.system.asInstanceOf[ExtendedActorSystem]
      case _                   ⇒ null
    }
    Serialization.currentTransportInformation.value match {
      case null ⇒ originalSystem match {
        case null ⇒ path.toSerializationFormat
        case system ⇒
          try path.toSerializationFormatWithAddress(system.provider.getDefaultAddress)
          catch { case NonFatal(_) ⇒ path.toSerializationFormat }
      }
      case Information(address, system) ⇒
        if (originalSystem == null || originalSystem == system)
          path.toSerializationFormatWithAddress(address)
        else {
          val provider = originalSystem.provider
          path.toSerializationFormatWithAddress(provider.getExternalAddressFor(address).getOrElse(provider.getDefaultAddress))
        }
    }
  }

   首先Serialization.currentTransportInformation.value一定不為空,這個之前已經賦值過了,所以一定會走到Information(address, system),而且無論執行if的哪個分支,最后都是通過調用toSerializationFormatWithAddress對ActorRef進行序列化的。

  這樣來看ActorRef在序列化時,對當前的path轉化成了序列化的格式,其實就是ActorPath的String值。只不過在remote模式下,是包含host:port、協議(比如akka.tcp)等信息的。那么分析到這里,聰明的讀者一定知道反序列化的過程了:對ActorPath的String值進行解析,轉化成對應的RemoteActorRef。關於如果通過ActorPath在之前的文章其實我們有分析過,但這里還是再帶領大家過一遍。

override def receive: Receive = {
    case Disassociated(info) ⇒ handleDisassociated(info)

    case InboundPayload(p) if p.size <= transport.maximumPayloadBytes ⇒
      val (ackOption, msgOption) = tryDecodeMessageAndAck(p)

      for (ack ← ackOption; reliableDelivery ← reliableDeliverySupervisor) reliableDelivery ! ack

      msgOption match {
        case Some(msg) ⇒
          if (msg.reliableDeliveryEnabled) {
            ackedReceiveBuffer = ackedReceiveBuffer.receive(msg)
            deliverAndAck()
          } else try
            msgDispatch.dispatch(msg.recipient, msg.recipientAddress, msg.serializedMessage, msg.senderOption)
          catch {
            case e: NotSerializableException ⇒ logTransientSerializationError(msg, e)
            case e: IllegalArgumentException ⇒ logTransientSerializationError(msg, e)
          }

        case None ⇒
      }

    case InboundPayload(oversized) ⇒
      log.error(
        new OversizedPayloadException(s"Discarding oversized payload received: " +
          s"max allowed size [${transport.maximumPayloadBytes}] bytes, actual size [${oversized.size}] bytes."),
        "Transient error while reading from association (association remains live)")

    case StopReading(writer, replyTo) ⇒
      saveState()
      context.become(notReading)
      replyTo ! StoppedReading(writer)

  }

   EndpointReader.receive在收到InboundPayload消息后,先把它decode成Message,然后把消息通過msgDispatch.dispatch發送出去,而msgDispatch是一個DefaultMessageDispatcher實例。

override def dispatch(
    recipient:         InternalActorRef,
    recipientAddress:  Address,
    serializedMessage: SerializedMessage,
    senderOption:      OptionVal[ActorRef]): Unit = {

    import provider.remoteSettings._

    lazy val payload: AnyRef = MessageSerializer.deserialize(system, serializedMessage)
    def payloadClass: Class[_] = if (payload eq null) null else payload.getClass
    val sender: ActorRef = senderOption.getOrElse(system.deadLetters)
    val originalReceiver = recipient.path

    def logMessageReceived(messageType: String): Unit = {
      if (LogReceive && log.isDebugEnabled)
        log.debug(s"received $messageType RemoteMessage: [{}] to [{}]<+[{}] from [{}]", payload, recipient, originalReceiver, sender)
    }

    recipient match {

      case `remoteDaemon` ⇒
        if (UntrustedMode) log.debug(LogMarker.Security, "dropping daemon message in untrusted mode")
        else {
          logMessageReceived("daemon message")
          remoteDaemon ! payload
        }

      case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒
        logMessageReceived("local message")
        payload match {
          case sel: ActorSelectionMessage ⇒
            if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) ||
              sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian))
              log.debug(
                LogMarker.Security,
                "operating in UntrustedMode, dropping inbound actor selection to [{}], " +
                  "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration",
                sel.elements.mkString("/", "/", ""))
            else
              // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor
              ActorSelection.deliverSelection(l, sender, sel)
          case msg: PossiblyHarmful if UntrustedMode ⇒
            log.debug(LogMarker.Security, "operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName)
          case msg: SystemMessage ⇒ l.sendSystemMessage(msg)
          case msg                ⇒ l.!(msg)(sender)
        }

      case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒
        logMessageReceived("remote-destined message")
        if (provider.transport.addresses(recipientAddress))
          // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed)
          r.!(payload)(sender)
        else
          log.error(
            "dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]",
            payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))

      case r ⇒ log.error(
        "dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]",
        payloadClass, r, recipientAddress, provider.transport.addresses.mkString(", "))

    }
  }

   dispatch首先調用MessageSerializer.deserialize(system, serializedMessage)對消息進行反序列化。

  /**
   * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message
   */
  def deserialize(system: ExtendedActorSystem, messageProtocol: SerializedMessage): AnyRef = {
    SerializationExtension(system).deserialize(
      messageProtocol.getMessage.toByteArray,
      messageProtocol.getSerializerId,
      if (messageProtocol.hasMessageManifest) messageProtocol.getMessageManifest.toStringUtf8 else "").get
  }

 

 /**
   * Deserializes the given array of bytes using the specified serializer id,
   * using the optional type hint to the Serializer.
   * Returns either the resulting object or an Exception if one was thrown.
   */
  def deserialize(bytes: Array[Byte], serializerId: Int, manifest: String): Try[AnyRef] =
    Try {
      val serializer = try getSerializerById(serializerId) catch {
        case _: NoSuchElementException ⇒ throw new NotSerializableException(
          s"Cannot find serializer with id [$serializerId]. The most probable reason is that the configuration entry " +
            "akka.actor.serializers is not in synch between the two systems.")
      }
      deserializeByteArray(bytes, serializer, manifest)
    }

   分析上面代碼得知,反序列化時就是簡單的通過serializerId找到對應具體的序列化類,然后調用deserializeByteArray函數進行反序列化。還記得如果當前消息是ActorRef的話,serializerId是什么嗎?沒錯,就是akka-misc。也就是說最終通過akka.remote.serialization.MiscMessageSerializer進行反序列化。但有一個字段也是比較關鍵manifest,這個manifest是什么呢?可以從ActorRef的序列化過程找到蛛絲馬跡。

  在MessageSerializer.serialize函數中,有一段代碼對這個manifest進行了賦值:val ms = Serializers.manifestFor(serializer, message)。

def manifestFor(s: Serializer, message: AnyRef): String = s match {
    case s2: SerializerWithStringManifest ⇒ s2.manifest(message)
    case _                                ⇒ if (s.includeManifest) message.getClass.getName else ""
  }

   其實就是判斷當前的Serializer是不是SerializerWithStringManifest的子類,如果是就調用manifest,如果不是,就判斷includeManifest是不是為true,如果是就返回當前類的類名,否則返回空字符串。我們來看下MiscMessageSerializer的定義。

class MiscMessageSerializer(val system: ExtendedActorSystem) extends SerializerWithStringManifest with BaseSerializer

 

private val ActorRefManifest = "G"

    很明顯它繼承了SerializerWithStringManifest,而且對於ActorRef,manifest的值就是字符串G。

private def deserializeByteArray(bytes: Array[Byte], serializer: Serializer, manifest: String): AnyRef = {

    @tailrec def updateCache(cache: Map[String, Option[Class[_]]], key: String, value: Option[Class[_]]): Boolean = {
      manifestCache.compareAndSet(cache, cache.updated(key, value)) ||
        updateCache(manifestCache.get, key, value) // recursive, try again
    }

    withTransportInformation { () ⇒
      serializer match {
        case s2: SerializerWithStringManifest ⇒ s2.fromBinary(bytes, manifest)
        case s1 ⇒
          if (manifest == "")
            s1.fromBinary(bytes, None)
          else {
            val cache = manifestCache.get
            cache.get(manifest) match {
              case Some(cachedClassManifest) ⇒ s1.fromBinary(bytes, cachedClassManifest)
              case None ⇒
                system.dynamicAccess.getClassFor[AnyRef](manifest) match {
                  case Success(classManifest) ⇒
                    val classManifestOption: Option[Class[_]] = Some(classManifest)
                    updateCache(cache, manifest, classManifestOption)
                    s1.fromBinary(bytes, classManifestOption)
                  case Failure(e) ⇒
                    throw new NotSerializableException(
                      s"Cannot find manifest class [$manifest] for serializer with id [${serializer.identifier}].")
                }
            }
          }
      }
    }
  }

   那我們接下來看看deserializeByteArray的源碼,它首先會判斷當前serializer是不是SerializerWithStringManifest,很顯然對於ActorRef,serializer是SerializerWithStringManifest的子類,那我們來看看是fromBinary是如何實現的。

  override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef =
    fromBinaryMap.get(manifest) match {
      case Some(deserializer) ⇒ deserializer(bytes)
      case None ⇒ throw new NotSerializableException(
        s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
    }

 

 private val fromBinaryMap = Map[String, Array[Byte] ⇒ AnyRef](
    IdentifyManifest → deserializeIdentify,
    ActorIdentityManifest → deserializeActorIdentity,
    StatusSuccessManifest → deserializeStatusSuccess,
    StatusFailureManifest → deserializeStatusFailure,
    ThrowableManifest → throwableSupport.deserializeThrowable,
    ActorRefManifest → deserializeActorRefBytes,
    OptionManifest → deserializeOption,
    OptionalManifest → deserializeOptional,
    PoisonPillManifest → ((_) ⇒ PoisonPill),
    KillManifest → ((_) ⇒ Kill),
    RemoteWatcherHBManifest → ((_) ⇒ RemoteWatcher.Heartbeat),
    DoneManifest → ((_) ⇒ Done),
    NotUsedManifest → ((_) ⇒ NotUsed),
    AddressManifest → deserializeAddressData,
    UniqueAddressManifest → deserializeUniqueAddress,
    RemoteWatcherHBRespManifest → deserializeHeartbeatRsp,
    ActorInitializationExceptionManifest → deserializeActorInitializationException,
    LocalScopeManifest → ((_) ⇒ LocalScope),
    RemoteScopeManifest → deserializeRemoteScope,
    ConfigManifest → deserializeConfig,
    FromConfigManifest → deserializeFromConfig,
    DefaultResizerManifest → deserializeDefaultResizer,
    BalancingPoolManifest → deserializeBalancingPool,
    BroadcastPoolManifest → deserializeBroadcastPool,
    RandomPoolManifest → deserializeRandomPool,
    RoundRobinPoolManifest → deserializeRoundRobinPool,
    ScatterGatherPoolManifest → deserializeScatterGatherPool,
    TailChoppingPoolManifest → deserializeTailChoppingPool,
    RemoteRouterConfigManifest → deserializeRemoteRouterConfig
  )

 

  private def deserializeActorRefBytes(bytes: Array[Byte]): ActorRef =
    deserializeActorRef(ContainerFormats.ActorRef.parseFrom(bytes))

 

private def deserializeActorRef(actorRef: ContainerFormats.ActorRef): ActorRef =
    serialization.system.provider.resolveActorRef(actorRef.getPath)

   由此可見,首先調用了ContainerFormats.ActorRef.parseFrom把Array[Byte] 轉化成了ContainerFormats.ActorRef,這個過程就不再具體分析;其次調用serialization.system.provider.resolveActorRef把當前的ActorPathString轉化成了ActorRef。根據上下文,serialization.system.provider應該就是RemoteActorRefProvider。

def resolveActorRef(path: String): ActorRef = {
    // using thread local LRU cache, which will call internalRresolveActorRef
    // if the value is not cached
    actorRefResolveThreadLocalCache match {
      case null ⇒ internalResolveActorRef(path) // not initalized yet
      case c    ⇒ c.threadLocalCache(this).getOrCompute(path)
    }
  }

   resolveActorRef這段代碼會先判斷當前actorRefResolveThreadLocalCache緩存是否已經初始化,很顯然在RemoteActorRefProvider.init過程中,actorRefResolveThreadLocalCache已經被創建,之前也分析過。

/**
 * INTERNAL API
 */
private[akka] final class ActorRefResolveCache(provider: RemoteActorRefProvider)
  extends LruBoundedCache[String, ActorRef](capacity = 1024, evictAgeThreshold = 600) {

  override protected def compute(k: String): ActorRef =
    provider.internalResolveActorRef(k)

  override protected def hash(k: String): Int = Unsafe.fastHash(k)

  override protected def isCacheable(v: ActorRef): Boolean = !v.isInstanceOf[EmptyLocalActorRef]
}

   actorRefResolveThreadLocalCache是一個ActorSystem的Extension,這個Extension最終是ActorRefResolveCache的實例,這個類是不是很熟悉?它是一個LruBoundedCache,容量是1024,過期時間是600秒。很顯然第一次getOrCompute時,會調用compute函數,而compute又調用了provider.internalResolveActorRef,解析之后對解析的結果進行緩存,具體如何緩存也不再分析。下面來分析internalResolveActorRef。

 /**
   * INTERNAL API: This is used by the `ActorRefResolveCache` via the
   * public `resolveActorRef(path: String)`.
   */
  private[akka] def internalResolveActorRef(path: String): ActorRef = path match {
    case ActorPathExtractor(address, elems) ⇒
      if (hasAddress(address)) local.resolveActorRef(rootGuardian, elems)
      else {
        val rootPath = RootActorPath(address) / elems
        try {
          new RemoteActorRef(transport, transport.localAddressForRemote(address),
            rootPath, Nobody, props = None, deploy = None)
        } catch {
          case NonFatal(e) ⇒
            log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
            new EmptyLocalActorRef(this, rootPath, eventStream)
        }
      }
    case _ ⇒
      log.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path)
      deadLetters
  }

   internalResolveActorRef還有印象嗎?這個函數在分析actorSelection過程中出現過,它首先會判斷當前機器是否包含path中的address,如果包含就調用LocalActorRefProvider進行查找,否則就創建RemoteActorRef。因為消息來自遠程actor,所以會創建RemoteActorRef作為遠程actor的代理。至此遠程ActorRef反序列化結束。

  至此我們對ActorRef的序列化、反序列化過程做了完整的分析,其實Akka的位置透明就是通過ActorPath來實現的,ActorRef跨網路傳輸都是通過ActorPath的String值(包含host/port等信息)來完成的,指定host收到ActorRef的序列化消息,會根據反序列化后的ActorPath的String值,在本地創建遠程actor的ActorRef代理:RemoteActorRef。之后的通信都是通過RemoteActorRef。這樣看來,位置透明也是比較簡單的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM