項目概述
需求
目前大多數的分布式架構底層通信都是通過RPC實現的,RPC框架非常多,比如前我們學過的Hadoop項目的RPC通信框架,但是Hadoop在設計之初就是為了運行長達數小時的批量而設計的,在某些極端的情況下,任務提交的延遲很高,所以Hadoop的RPC顯得有些笨重。
Spark 的RPC是通過Akka類庫實現的,Akka用Scala語言開發,基於Actor並發模型實現,Akka具有高可靠、高性能、可擴展等特點,使用Akka可以輕松實現分布式RPC功能。
Akka簡介
友情鏈接: Actors介紹: https://www.iteblog.com/archives/1154.html
Akka基於Actor模型,提供了一個用於構建可擴展的(Scalable)、彈性的(Resilient)、快速響應的(Responsive)應用程序的平台。
Actor模型:在計算機科學領域,Actor模型是一個並行計算(Concurrent Computation)模型,它把actor作為並行計算的基本元素來對待:為響應一個接收到的消息,一個actor能夠自己做出一些決策,如創建更多的actor,或發送更多的消息,或者確定如何去響應接收到的下一個消息。
Actor是Akka中最核心的概念,它是一個封裝了狀態和行為的對象,Actor之間可以通過交換消息的方式進行通信,每個Actor都有自己的收件箱(Mailbox)。通過Actor能夠簡化鎖及線程管理,可以非常容易地開發出正確地並發程序和並行系統,Actor具有如下特性:
(1)、提供了一種高級抽象,能夠簡化在並發(Concurrency)/並行(Parallelism)應用場景下的編程開發
(2)、提供了異步非阻塞的、高性能的事件驅動編程模型
(3)、超級輕量級事件處理(每GB堆內存幾百萬Actor)
項目實現
實戰一:
利用Akka的actor編程模型,實現2個進程間的通信。
架構圖
重要類介紹
ActorSystem:在Akka中,ActorSystem是一個重量級的結構,他需要分配多個線程,所以在實際應用中,ActorSystem通常是一個單例對象,我們可以使用這個ActorSystem創建很多Actor。
注意:
(1)、ActorSystem是一個進程中的老大,它負責創建和監督actor
(2)、ActorSystem是一個單例對象
(3)、actor負責通信
Actor
在Akka中,Actor負責通信,在Actor中有一些重要的生命周期方法。
(1)preStart()方法:該方法在Actor對象構造方法執行后執行,整個Actor生命周期中僅執行一次。
(2)receive()方法:該方法在Actor的preStart方法執行完成后執行,用於接收消息,會被反復執行。
具體代碼
① Master類
package cn.itcast.rpc import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.typesafe.config.ConfigFactory //todo:利用akka的actor模型實現2個進程間的通信-----Master端 class Master extends Actor{ //構造代碼塊先被執行 println("master constructor invoked") //prestart方法會在構造代碼塊執行后被調用,並且只被調用一次 override def preStart(): Unit = { println("preStart method invoked") } //receive方法會在prestart方法執行后被調用,表示不斷的接受消息 override def receive: Receive = { case "connect" =>{ println("a client connected") //master發送注冊成功信息給worker sender ! "success" } } } object Master{ def main(args: Array[String]): Unit = { //master的ip地址 val host=args(0) //master的port端口 val port=args(1) //准備配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //配置config對象 利用ConfigFactory解析配置文件,獲取配置信息 val config=ConfigFactory.parseString(configStr) // 1、創建ActorSystem,它是整個進程中老大,它負責創建和監督actor,它是單例對象 val masterActorSystem = ActorSystem("masterActorSystem",config) // 2、通過ActorSystem來創建master actor val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor") // 3、向master actor發送消息 //masterActor ! "connect" } }
② Worker類
package cn.itcast.rpc import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory //todo:利用akka中的actor實現2個進程間的通信-----Worker端 class Worker extends Actor{ println("Worker constructor invoked") //prestart方法會在構造代碼塊之后被調用,並且只會被調用一次 override def preStart(): Unit = { println("preStart method invoked") //獲取master actor的引用 //ActorContext全局變量,可以通過在已經存在的actor中,尋找目標actor //調用對應actorSelection方法, // 方法需要一個path路徑:1、通信協議、2、master的IP地址、3、master的端口 4、創建master actor老大 5、actor層級 val master: ActorSelection = context.actorSelection("akka.tcp://masterActorSystem@172.16.43.63:8888/user/masterActor") //向master發送消息 master ! "connect" } //receive方法會在prestart方法執行后被調用,不斷的接受消息 override def receive: Receive = { case "connect" =>{ println("a client connected") } case "success" =>{ println("注冊成功") } } } object Worker{ def main(args: Array[String]): Unit = { //定義worker的IP地址 val host=args(0) //定義worker的端口 val port=args(1) //准備配置文件 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //通過configFactory來解析配置信息 val config=ConfigFactory.parseString(configStr) // 1、創建ActorSystem,它是整個進程中的老大,它負責創建和監督actor val workerActorSystem = ActorSystem("workerActorSystem",config) // 2、通過actorSystem來創建 worker actor val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker),"workerActor") //向worker actor發送消息 workerActor ! "connect" } }
③ 運行
使用idea開發工具,配置參數時,多個參數之間用空格隔開
啟動Master
啟動Worker
實戰二
使用Akka實現一個簡易版的spark通信框架
架構圖
具體代碼
① Master類
package cn.itcast.spark import akka.actor.{Actor, ActorRef, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.concurrent.duration._ //todo:利用akka實現簡易版的spark通信框架-----Master端 class Master extends Actor{ //構造代碼塊先被執行 println("master constructor invoked") //定義一個map集合,用於存放worker信息 private val workerMap = new mutable.HashMap[String,WorkerInfo]() //定義一個list集合,用於存放WorkerInfo信息,方便后期按照worker上的資源進行排序 private val workerList = new ListBuffer[WorkerInfo] //master定時檢查的時間間隔 val CHECK_OUT_TIME_INTERVAL=15000 //15秒 //prestart方法會在構造代碼塊執行后被調用,並且只被調用一次 override def preStart(): Unit = { println("preStart method invoked") //master定時檢查超時的worker //需要手動導入隱式轉換 import context.dispatcher context.system.scheduler.schedule(0 millis,CHECK_OUT_TIME_INTERVAL millis,self,CheckOutTime) } //receive方法會在prestart方法執行后被調用,表示不斷的接受消息 override def receive: Receive = { //master接受worker的注冊信息 case RegisterMessage(workerId,memory,cores) =>{ //判斷當前worker是否已經注冊 if(!workerMap.contains(workerId)){ //保存信息到map集合中 val workerInfo = new WorkerInfo(workerId,memory,cores) workerMap.put(workerId,workerInfo) //保存workerinfo到list集合中 workerList +=workerInfo //master反饋注冊成功給worker sender ! RegisteredMessage(s"workerId:$workerId 注冊成功") } } //master接受worker的心跳信息 case SendHeartBeat(workerId)=>{ //判斷worker是否已經注冊,master只接受已經注冊過的worker的心跳信息 if(workerMap.contains(workerId)){ //獲取workerinfo信息 val workerInfo: WorkerInfo = workerMap(workerId) //獲取當前系統時間 val lastTime: Long = System.currentTimeMillis() workerInfo.lastHeartBeatTime=lastTime } } case CheckOutTime=>{ //過濾出超時的worker 判斷邏輯: 獲取當前系統時間 - worker上一次心跳時間 >master定時檢查的時間間隔 val outTimeWorkers: ListBuffer[WorkerInfo] = workerList.filter(x => System.currentTimeMillis() -x.lastHeartBeatTime > CHECK_OUT_TIME_INTERVAL) //遍歷超時的worker信息,然后移除掉超時的worker for(workerInfo <- outTimeWorkers){ //獲取workerid val workerId: String = workerInfo.workerId //從map集合中移除掉超時的worker信息 workerMap.remove(workerId) //從list集合中移除掉超時的workerInfo信息 workerList -= workerInfo println("超時的workerId:" +workerId) } println("活着的worker總數:" + workerList.size) //master按照worker內存大小進行降序排列 println(workerList.sortBy(x => x.memory).reverse.toList) } } } object Master{ def main(args: Array[String]): Unit = { //master的ip地址 val host=args(0) //master的port端口 val port=args(1) //准備配置文件信息 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //配置config對象 利用ConfigFactory解析配置文件,獲取配置信息 val config=ConfigFactory.parseString(configStr) // 1、創建ActorSystem,它是整個進程中老大,它負責創建和監督actor,它是單例對象 val masterActorSystem = ActorSystem("masterActorSystem",config) // 2、通過ActorSystem來創建master actor val masterActor: ActorRef = masterActorSystem.actorOf(Props(new Master),"masterActor") // 3、向master actor發送消息 //masterActor ! "connect" } }
② Worker類
package cn.itcast.spark import java.util.UUID import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ //todo:利用akka實現簡易版的spark通信框架-----Worker端 class Worker(val memory:Int,val cores:Int,val masterHost:String,val masterPort:String) extends Actor{ println("Worker constructor invoked") //定義workerId private val workerId: String = UUID.randomUUID().toString //定義發送心跳的時間間隔 val SEND_HEART_HEAT_INTERVAL=10000 //10秒 //定義全局變量 var master: ActorSelection=_ //prestart方法會在構造代碼塊之后被調用,並且只會被調用一次 override def preStart(): Unit = { println("preStart method invoked") //獲取master actor的引用 //ActorContext全局變量,可以通過在已經存在的actor中,尋找目標actor //調用對應actorSelection方法, // 方法需要一個path路徑:1、通信協議、2、master的IP地址、3、master的端口 4、創建master actor老大 5、actor層級 master= context.actorSelection(s"akka.tcp://masterActorSystem@$masterHost:$masterPort/user/masterActor") //向master發送注冊信息,將信息封裝在樣例類中,主要包含:workerId,memory,cores master ! RegisterMessage(workerId,memory,cores) } //receive方法會在prestart方法執行后被調用,不斷的接受消息 override def receive: Receive = { //worker接受master的反饋信息 case RegisteredMessage(message) =>{ println(message) //向master定期的發送心跳 //worker先自己給自己發送心跳 //需要手動導入隱式轉換 import context.dispatcher context.system.scheduler.schedule(0 millis,SEND_HEART_HEAT_INTERVAL millis,self,HeartBeat) } //worker接受心跳 case HeartBeat =>{ //這個時候才是真正向master發送心跳 master ! SendHeartBeat(workerId) } } } object Worker{ def main(args: Array[String]): Unit = { //定義worker的IP地址 val host=args(0) //定義worker的端口 val port=args(1) //定義worker的內存 val memory=args(2).toInt //定義worker的核數 val cores=args(3).toInt //定義master的ip地址 val masterHost=args(4) //定義master的端口 val masterPort=args(5) //准備配置文件 val configStr= s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //通過configFactory來解析配置信息 val config=ConfigFactory.parseString(configStr) // 1、創建ActorSystem,它是整個進程中的老大,它負責創建和監督actor val workerActorSystem = ActorSystem("workerActorSystem",config) // 2、通過actorSystem來創建 worker actor val workerActor: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory,cores,masterHost,masterPort)),"workerActor") //向worker actor發送消息 workerActor ! "connect" } }
③ WorkerInfo類
package cn.itcast.spark //封裝worker信息 class WorkerInfo(val workerId:String,val memory:Int,val cores:Int) { //定義一個變量用於存放worker上一次心跳時間 var lastHeartBeatTime:Long=_ override def toString: String = { s"workerId:$workerId , memory:$memory , cores:$cores" } }
④ 樣例類
package cn.itcast.spark trait RemoteMessage extends Serializable{} //worker向master發送注冊信息,由於不在同一進程中,需要實現序列化 case class RegisterMessage(val workerId:String,val memory:Int,val cores:Int) extends RemoteMessage //master反饋注冊成功信息給worker,由於不在同一進程中,也需要實現序列化 case class RegisteredMessage(message:String) extends RemoteMessage //worker向worker發送心跳 由於在同一進程中,不需要實現序列化 case object HeartBeat //worker向master發送心跳,由於不在同一進程中,需要實現序列化 case class SendHeartBeat(val workerId:String) extends RemoteMessage //master自己向自己發送消息,由於在同一進程中,不需要實現序列化 case object CheckOutTime
⑤ 運行
配置參數時,多個參數之間用空格隔開
首先啟動Master_Spark
啟動work_spark-01
啟動work_spark-02,然后關閉
觀察Master_Spark 輸出