SPARK如何使用AKKA實現進程、節點通信


SPARK如何使用AKKA實現進程、節點通信

《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市

《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境准備》

《深入理解Spark:核心思想與源碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》

《深入理解Spark:核心思想與源碼分析》一書第三章第一部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(伯篇)》

《深入理解Spark:核心思想與源碼分析》一書第三章第二部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(仲篇)》

《深入理解Spark:核心思想與源碼分析》一書第三章第三部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(叔篇)》

《深入理解Spark:核心思想與源碼分析》一書第三章第四部分的內容請看鏈接《深入理解Spark:核心思想與源碼分析》——SparkContext的初始化(季篇)》

AKKA簡介

  Scala認為Java線程通過共享數據以及通過鎖來維護共享數據的一致性是糟糕的做法,容易引起鎖的爭用,而且線程的上下文切換會帶來不少開銷,降低並發程序的性能,甚至會引入死鎖的問題。在Scala中只需要自定義類型繼承Actor,並且提供act方法,就如同Java里實現Runnable接口,需要實現run方法一樣。但是不能直接調用act方法,而是通過發送消息的方式(Scala發送消息是異步的),傳遞數據。如:
Actor ! message
Akka是Actor編程模型的高級類庫,類似於JDK 1.5之后越來越豐富的並發工具包,簡化了程序員並發編程的難度。Akka是一款提供了用於構建高並發的、分布式的、可伸縮的、基於Java虛擬機的消息驅動應用的工具集和運行時環境。從下面Akka官網提供的一段代碼示例,可以看出Akka並發編程的簡約。

復制代碼
case class Greeting(who: String)
class GreetingActor extends Actor with ActorLogging {
  def receive = {
    case Greeting(who) ⇒ log.info("Hello " + who)
  }
}
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
greeter ! Greeting("Charlie Parker")
復制代碼

Akka提供了分布式的框架,意味着用戶不需要考慮如何實現分布式部署,Akka官網提供了下面的示例演示如何獲取遠程Actor的引用。

復制代碼
// config on all machines
akka {
  actor {
    provider = akka.remote.RemoteActorRefProvider
    deployment {
      /greeter {
        remote = akka.tcp://MySystem@machine1:2552
      }
    }
  }
}
// ------------------------------
// define the greeting actor and the greeting message
case class Greeting(who: String) extends Serializable
class GreetingActor extends Actor with ActorLogging {
  def receive = {
    case Greeting(who) ⇒ log.info("Hello " + who)
  }
}
// ------------------------------
// on machine 1: empty system, target for deployment from machine 2
val system = ActorSystem("MySystem")
// ------------------------------
// on machine 2: Remote Deployment - deploying on machine1
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
// ------------------------------
// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)
val system = ActorSystem("MySystem")
val greeter = system.actorSelection("akka.tcp://MySystem@machine2:2552/user/greeter")
greeter ! Greeting("Sonny Rollins")
復制代碼

Actor之間最終會構成一棵樹,作為父親的Actor應當對所有兒子的異常失敗進行處理(監管)Akka給出了簡單的示例,代碼如下。

復制代碼
class Supervisor extends Actor {
  override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException ⇒ Resume
    case _: NullPointerException ⇒ Restart
    case _: Exception ⇒ Escalate
  }
  val worker = context.actorOf(Props[Worker])
  def receive = {
    case n: Int => worker forward n
  }
}
復制代碼

Akka的更多信息請訪問官方網站:http://akka.io/

基於AKKA的分布式消息系統ACTORSYSTEM

  Spark使用Akka提供的消息系統實現並發:ActorSystem是Spark中最基礎的設施,Spark既使用它發送分布式消息,又用它實現並發編程。正是因為Actor輕量級的並發編程、消息發送以及ActorSystem支持分布式消息發送等特點,Spark選擇了ActorSystem。
SparkEnv中創建ActorSystem時用到了AkkaUtils工具類,代碼如下。

復制代碼
val (actorSystem, boundPort) =
Option(defaultActorSystem) match {
  case Some(as) => (as, port)
  case None =>
    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
    AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
}
復制代碼

AkkaUtils.createActorSystem方法用於啟動ActorSystem,代碼如下。

復制代碼
def createActorSystem(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager): (ActorSystem, Int) = {
  val startService: Int => (ActorSystem, Int) = { actualPort =>
    doCreateActorSystem(name, host, actualPort, conf, securityManager)
  }
  Utils.startServiceOnPort(port, startService, conf, name)
}
復制代碼

AkkaUtils使用了Utils的靜態方法startServiceOnPort, startServiceOnPort最終會回調方法startService: Int=> (T, Int),此處的startService實際是方法doCreateActorSystem。真正啟動ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具體實現細節請見AkkaUtils的詳細介紹。關於startServiceOnPort的實現,請參閱[《Spark中常用工具類Utils的簡明介紹》](http://blog.csdn.net/beliefer/article/details/50904662)一文的內容。

AKKAUTILS

  AkkaUtils是Spark對Akka相關API的又一層封裝,這里對其常用的功能進行介紹。

(1)doCreateActorSystem

功能描述:創建ActorSystem。

復制代碼
private def doCreateActorSystem(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager): (ActorSystem, Int) = {

  val akkaThreads = conf.getInt("spark.akka.threads", 4)
  val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
  val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
  val akkaFrameSize = maxFrameSizeBytes(conf)
  val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
  val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
  if (!akkaLogLifecycleEvents) {
    Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
  }
  val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
  val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
  val akkaFailureDetector =
    conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
  val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
  val secretKey = securityManager.getSecretKey()
  val isAuthOn = securityManager.isAuthenticationEnabled()
  if (isAuthOn && secretKey == null) {
    throw new Exception("Secret key is null with authentication on")
  }
  val requireCookie = if (isAuthOn) "on" else "off"
  val secureCookie = if (isAuthOn) secretKey else ""
  logDebug("In createActorSystem, requireCookie is: " + requireCookie)
  val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
    ConfigFactory.parseString(
    s"""
    |akka.daemonic = on
    |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
    |akka.stdout-loglevel = "ERROR"
    |akka.jvm-exit-on-fatal-error = off
    |akka.remote.require-cookie = "$requireCookie"
    |akka.remote.secure-cookie = "$secureCookie"
    |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
    |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
    |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
    |akka.remote.netty.tcp.hostname = "$host"
    |akka.remote.netty.tcp.port = $port
    |akka.remote.netty.tcp.tcp-nodelay = on
    |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
    |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
    |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
    |akka.actor.default-dispatcher.throughput = $akkaBatchSize
    |akka.log-config-on-start = $logAkkaConfig
    |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
    |akka.log-dead-letters = $lifecycleEvents
    |akka.log-dead-letters-during-shutdown = $lifecycleEvents
    """.stripMargin))
  val actorSystem = ActorSystem(name, akkaConf)
  val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
  val boundPort = provider.getDefaultAddress.port.get
  (actorSystem, boundPort)
}
復制代碼

(2)makeDriverRef

功能描述:從遠端ActorSystem中查找已經注冊的某個Actor。

復制代碼
def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
  val driverActorSystemName = SparkEnv.driverActorSystemName
  val driverHost: String = conf.get("spark.driver.host", "localhost")
  val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  Utils.checkHost(driverHost, "Expected hostname")
  val url = s"akka.tcp://$driverActorSystemName@$driverHost:$driverPort/user/$name"
  val timeout = AkkaUtils.lookupTimeout(conf)
  logInfo(s"Connecting to $name: $url")
  Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}
復制代碼
道生一,一生二,二生三,三生萬物。
 
分類:  ScalaSpark大數據


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM