Spark的Rpct模塊的學習


Spark的Rpc模塊是1.x重構出來可,以前的代碼中大量使用了akka的類,為了把akka從項目的依賴中移除,所有添加了該模塊。先看下該模塊的幾個主要的類


 
使用EA把該模塊所有的類都添加進來了
 
要看懂該模塊還是要先了解akka,  akka有Actor和ActorRef兩個類,一個用於接收消息,一個用於發送消息。正好對應該模塊的RpcEndpoint和RpcEndpointRef兩個類。
下面大致介紹下這幾個類,附帶一些scala的特性
1:RpcAddress 
該類是一個case class, 用來表示主機名和端口號 ,  case class也可以添加方法,以前還以為不可以呢
它的伴生對象用於從URI,String等構造一個RpcAddress對象
 
2:RpcTimeout
表示一個超時時間,話說該類的職責有點亂,竟然還有下面的一個方法
  1. def awaitResult[T](awaitable:Awaitable[T]): T ={
    try{
    Await.result(awaitable, duration)
    }catch addMessageIfTimeout
    }

     

在規定時間內返回對象, Await是scala並發庫中的一個對象,result在duration時間片內返回Awaitable的執行結果,ready表示 duration時間片內Awaitable的狀態變成complete,兩個方法都是阻塞的, Awaitable相當java中的future,當然scala也有future類,正是繼承該類。
它的伴生對象主要是配置文件中獲取時間值然后生成該對象
 
3:RpcEnvFactory
該對象用於創建一個RpcEnv,在RpcEnv中可以看到如何使用該方法
  1. private def getRpcEnvFactory(conf:SparkConf):RpcEnvFactory={
    // Add more RpcEnv implementations here
    val rpcEnvNames =Map("akka"->"org.apache.spark.rpc.akka.AkkaRpcEnvFactory")
    val rpcEnvName = conf.get("spark.rpc","akka")
    val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
    Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
    }

     

目前spark.rpc只有akka的實現,如果覺得akka的性能不好也可以自己實現一個Rpc框架。
 
 
4: RpcEnv
 注解:這是一個RPC環境,所有的RpcEndpont需要注冊到該對象中用於接收消息,注冊時需要指定一個name, RpcEnv將會處理從RpcEndpontRef和遠程節點發送過來的消息(接口里面看不到這塊邏輯),然后發送給相應的Endpoint處理,對於接收到的異常使用RpcCallContext來處理。
 
看RpcEnv像akka中的ActorSystem對象,所有的actor和acotorred都屬於它,同時有一個根地址,所有RpcEnv有注冊RpcEndpoint的方法,也有一個address返回根地址的方法,RpcEnv有幾個方法用於獲取RpcEndpointRef ,  這里說下Endpoint注冊名會成為RpcEndpoint的地址,可以看uriof方法,還有停止和關閉的方法。
RpcEnv的deserialize不明白具體用法,RpcEndpiontRef只能使用RpcEnv解碼,當包含有RpcEndpointRef的對象解碼時,解碼代碼將會被方法包裝
 
5:RpcEnvConfig
用於構建RpcEnv的配置對象,一個RpcEnv需要host,port,name,附帶sparkconf,securitymananger
host,port,name構造結構入下  akka://host:port/name  大致如此
 
 
6:RpcEndpoint
進程間調用的一個端點,當一個消息到來時,方法調用順序為  onStart, receive, onStop
它的生命周期為constructor -> onStart -> receive* -> onStop  .當然還有一些其他方法,都是間觸發方法
 
7:RpcEndpointRef
一個遠程RpcEndpoint的引用,通過它可以給遠程RpcEndpoint發送消息,可以是同步可以是異步,它映射一個地址 ,
 
 






免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM