Spark的Rpc模塊是1.x重構出來可,以前的代碼中大量使用了akka的類,為了把akka從項目的依賴中移除,所有添加了該模塊。先看下該模塊的幾個主要的類
使用EA把該模塊所有的類都添加進來了
要看懂該模塊還是要先了解akka, akka有Actor和ActorRef兩個類,一個用於接收消息,一個用於發送消息。正好對應該模塊的RpcEndpoint和RpcEndpointRef兩個類。
下面大致介紹下這幾個類,附帶一些scala的特性
1:RpcAddress
該類是一個case class, 用來表示主機名和端口號 , case class也可以添加方法,以前還以為不可以呢
它的伴生對象用於從URI,String等構造一個RpcAddress對象
2:RpcTimeout
表示一個超時時間,話說該類的職責有點亂,竟然還有下面的一個方法
-
def awaitResult[T](awaitable:Awaitable[T]): T ={ try{ Await.result(awaitable, duration) }catch addMessageIfTimeout }
在規定時間內返回對象, Await是scala並發庫中的一個對象,result在duration時間片內返回Awaitable的執行結果,ready表示
duration時間片內Awaitable的狀態變成complete,兩個方法都是阻塞的,
Awaitable相當java中的future,當然scala也有future類,正是繼承該類。
它的伴生對象主要是配置文件中獲取時間值然后生成該對象
3:RpcEnvFactory
該對象用於創建一個RpcEnv,在RpcEnv中可以看到如何使用該方法
-
private def getRpcEnvFactory(conf:SparkConf):RpcEnvFactory={ // Add more RpcEnv implementations here val rpcEnvNames =Map("akka"->"org.apache.spark.rpc.akka.AkkaRpcEnvFactory") val rpcEnvName = conf.get("spark.rpc","akka") val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName) Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory] }
目前spark.rpc只有akka的實現,如果覺得akka的性能不好也可以自己實現一個Rpc框架。
4: RpcEnv
注解:這是一個RPC環境,所有的RpcEndpont需要注冊到該對象中用於接收消息,注冊時需要指定一個name, RpcEnv將會處理從RpcEndpontRef和遠程節點發送過來的消息(接口里面看不到這塊邏輯),然后發送給相應的Endpoint處理,對於接收到的異常使用RpcCallContext來處理。
看RpcEnv像akka中的ActorSystem對象,所有的actor和acotorred都屬於它,同時有一個根地址,所有RpcEnv有注冊RpcEndpoint的方法,也有一個address返回根地址的方法,RpcEnv有幾個方法用於獲取RpcEndpointRef , 這里說下Endpoint注冊名會成為RpcEndpoint的地址,可以看uriof方法,還有停止和關閉的方法。
RpcEnv的deserialize不明白具體用法,RpcEndpiontRef只能使用RpcEnv解碼,當包含有RpcEndpointRef的對象解碼時,解碼代碼將會被方法包裝
5:RpcEnvConfig
用於構建RpcEnv的配置對象,一個RpcEnv需要host,port,name,附帶sparkconf,securitymananger
host,port,name構造結構入下 akka://host:port/name 大致如此
6:RpcEndpoint
進程間調用的一個端點,當一個消息到來時,方法調用順序為 onStart, receive, onStop
它的生命周期為constructor -> onStart -> receive* -> onStop .當然還有一些其他方法,都是間觸發方法
7:RpcEndpointRef
一個遠程RpcEndpoint的引用,通過它可以給遠程RpcEndpoint發送消息,可以是同步可以是異步,它映射一個地址 ,
