@
Hadoop RPC 框架解析
網絡通信模塊是分布式系統中最底層的模塊。它直接支撐了上層分布式環境下復雜的進程間通信(Inter-Process Communication, IPC)邏輯,是所有分布式系統的基礎。遠程過程調用(Remote Procedure Call, RPC)是一種常用的分布式網絡通信協議。它允許運行於一台計算機的程序調用另一台計算機的子程序,同時將網絡的通信細節隱藏起來,使得用戶無須額外地為這個交互作用編程。
作為一個分布式系統,Hadoop實現了自己的RPC通信協議,它是上層多個分布式子系統(如MapReduce, HDFS, HBase等)公用的網絡通信模塊。本文主要從框架設計和實現方面介紹Hadoop RPC,還有該RPC框架在MapReduce中的應用。
1.Hadoop RPC框架概述
1.1 RPC框架特點
RPC實際上是分布式計算中客戶機/服務器(Client/Server)模型的一個應用實例。對於Hadoop RPC而言,它具有以下幾個特點。
1.透明性:這是所有RPC框架的最根本特征,即當用戶在一台計算機的程序調用另外一台計算機上的子程序時,用戶自身不應感覺到其間涉及跨機器間的通信,而是感覺像是在執行一個本地調用。
2.高性能:Hadoop各個系統(如HDFS, MapReduce)均采用了Master/Slave結構。其中,Master實際上是一個RPC server,它負責處理集群中所有Slave發送的服務請求。為了保證Master的並發處理能力,RPC server應是一個高性能服務器,能夠高效地處理來自多個Client的並發RPC請求。
3.可控性:JDK中已經自帶了一個RPC框架——RMI(Remote Method Invocation,遠程方法調用)。之所以不直接使用該框架,主要是因為考慮到RPC是Hadoop最底層、最核心的模塊之一,保證其輕量級、高性能和可控性顯得尤為重要,而RMI過於重量級且用戶可控之處太少(如網絡連接、超時和緩沖等均難以定制或者修改)。
1.2 Hadoop RPC框架
與其他RPC框架一樣,Hadoop RPC主要分為四個部分,分別是序列化層、函數調用層、網絡傳輸層和服務器端處理框架,具體實現機制如下:
序列化層:序列化層的主要作用是將結構化對象轉為字節流以便於通過網絡進行傳輸或寫入持久存儲。在RPC框架中,它主要用於將用戶請求中的參數或者應答轉化成字節流以便跨機器傳輸。Hadoop自己實現了序列化框架,一個類只要實現Writable接口,即可支持對象序列化與反序列化。
函數調用層:函數調用層的主要功能是定位要調用的函數並執行該函數。HadoopRPC采用Java反射機制與動態代理實現了函數調用。
網絡傳輸層:網絡傳輸層描述了Client與Server之間消息傳輸的方式,Hadoop RPC采用了基於TCP/IP的Socket機制。
服務器端處理框架:服務器端處理框架可被抽象為網絡I/O模型。它描述了客戶端與服務器端間信息交互的方式。它的設計直接決定着服務器端的並發處理能力。常見的網絡I/O模型有阻塞式I/O、非阻塞式I/O、事件驅動I/O等,而Hadoop RPC采用了基於Reactor設計模式的事件驅動I/O模型。
Hadoop RPC總體架構自下而上可分為兩層。
第一層是一個基於Java NIO(New IO)實現的客戶機/服務器(Client/Server)通信模型。其中,客戶端將用戶的調用方法及其參數封裝成請求包后發送到服務器端。服務器端收到請求包后,經解包、調用函數、打包結果等一系列操作后,將結果返回給服務器端。為了增強Server端的擴展性和並發處理能力,Hadoop RPC采用了基於事件驅動的Reactor設計模式,在具體實現時,用到了JDK提供的各種功能包,主要包括java.nio(NIO)、java.lang.reflect(反射機制和動態代理)、java.net(網絡編程庫)等。
第二層是供更上層程序直接調用的RPC接口,這些接口底層即為客戶機/服務器通信模型。
看到這里有些小伙伴說我對於這些Java基礎知識都不是很記得了,沒關系,暖男的我現在就和大家一起來看看相關的這些Java基礎內容。又有一些小伙伴說我沒學過啊,那也沒關系,我們知識大致去了解一些類和這些類有哪些方法可以幫助我們理解RPC就夠了。我們使用Hadoop的時候不也不必關注RPC的細節么,那Java細節不會影響我們使用。對Java基礎反射、網絡編程和NIO很熟悉的小可愛可以直接跳過第二章節
2.Java基礎知識回顧
我們簡要介紹Hadoop RPC中用到的JDK開發工具包中的一些類。了解和掌握這些類的功能和使用方法是深入學習Hadoop RPC的基礎。這些類主要來自以下三個Java包:java.lang.reflect(反射機制和動態代理相關類)、java.net(網絡編程庫)和java.nio(NIO)。
2.1 Java反射機制與動態代理
反射機制是Java語言的一個重要特性,它的重要性也不用多說,在很多的框架中,反射撐起了半邊天。簡言之其作用:允許用戶動態獲取類的信息和動態調用對象的方法。
我們先來看看它提供的主要的類和類對應的功能:
類名&接口 | 功能描述 |
---|---|
Class | 代表一個Java類 |
Field | 代表Java類的屬性 |
Method | 代表Java類的方法 |
Constructor | 代表Java類的構造函數 |
Array | 提供了動態創建數組,以及訪問數組元素的靜態方法 |
Proxy類以及InvocationHandler接口 | 提供了動態生成代理類以及實例的方法 |
我們重點關注Java動態代理。在動態代理之前,我們先一起回顧一下代理概念及代理模式。有小可愛說不知道動態代理我只聽過名字啊,具體是個什么,我不知道呀。沒關系,我先簡單說一下動態代理的核心思想:是為其他對象提供一種代理以控制對這個對象的訪問。代理類負責為委托類進行預處理(如安全檢查,權限檢查等)或者執行完后的后續處理(如轉發給其他代理等)。動態代理的好處就是開發人員通過簡單的指定一組接口及委托類對象,便能動態地獲得代理類,這大大簡化了編寫代理類的步驟。
2.1.1 代理關鍵類&接口信息
在此我們先來了解一下代理的一些關鍵類&接口以及其主要方法:
1)java. lang.reflect.Proxy
這是Java動態代理機制的主類,它提供了一組靜態方法,用於為一組接口動態地生成代理類及其對象。
// Returns the invocation handler for the specified proxy instance.
// Params:proxy – the proxy instance to return the invocation handler for
// Returns:the invocation handler for the proxy instance
// 獲取指定代理對象所關聯的調用處理器
public static InvocationHandler getInvocationHandler(Object proxy)
// 獲取關聯於指定類裝載器和一組接口的動態代理類的對象
public static Class<?> getProxyClass(ClassLoader loader, Class<?>... interfaces)
// 判斷指定的類是不是一個動態代理類
public static boolean isProxyClass(Class<?> cl)
// 為指定類裝載器一組接口及調用處理器生成動態代理類實例
public static Object newProxyInstance(ClassLoader loader, Class<?>[] interfaces, InvocationHandler h)
2)java. lang.reflect.InvocationHandler
這是調用處理器接口。它定義了一個invoke方法,用於處理在動態代理類對象上的方法調用。通常開發人員需實現該接口,並在invoke方法中實現對委托類的代理訪問。
// 該方法負責處理動態代理類上的所有方法調用
// 參數:代理類實例,被調用的方法對象,調用參數
// 調用處理器根據這三個參數進行預處理或分派到委托類實例上執行
public Object invoke(Object proxy, Method method, Object[] args)
2.1.2 動態代理創建對象的過程
一個典型的動態代理創建對象的過程可分為以下4個步驟:
- 步驟1 通過實現InvocationHandler接口創建自己的調用處理器:
- 步驟2 通過為Proxy類指定ClassLoader對象和一組interface創建動態代理類:
- 步驟3 通過反射機制獲取動態代理類的構造函數,其參數類型是調用處理器接口類型:
- 步驟4 通過構造函數創建動態代理類實例,此時需將調用處理器對象作為參數
被傳入:
// step 1通過實現InvocationHandler接口創建自己的調用處理器
InvocationHandler handler=new InvocationHandlerImpl(...);
// 2 通過為Proxy類指定ClassLoader對象和一組interface創建動態代理類
Class clazz=Proxy.getProxyClass(classLoader, new Class[]{……});
// 3 通過反射機制獲取動態代理類的構造函數,其參數類型是調用處理器接口類型
Constructor constructor=clazz.getConstructor(new Class[]{InvocationHandler.class});
// 4通過構造函數創建動態代理類實例,此時需將調用處理器對象作為參數
Interface Proxy=(Interface)constructor.newInstance(new Object[]{handler});
Proxy類中的newInstance方法封裝了步驟2~步驟4,只需兩步即可完成代理對象的創建。
我們通過一個動態代理的例子來加深對於動態代理的理解;
目錄結構如下
定義一個接口協議
實現接口協議 Server類
實現調用處理器接口
測試用例
2.2 Java網絡編程
通常,Java網絡程序建立在TCP/IP協議基礎上,致力於實現應用層。傳輸層向應用層提供了套接字Socket接口,它封裝了下層的數據傳輸細節;應用層的程序可通過Socket與遠程主機建立連接和進行數據傳輸。
JDK提供了3種套接字類:java.net.Socket、java.net.ServerSocket和java.net.DatagramSocket。其中,java.net.Socket和java.net.ServerSocket類建立在TCP協議基礎上,而java.net.DatagramSocket類則建立在UDP協議基礎上。Java網絡程序均采用客戶機/服務器通信模式。下面介紹如何使用java.net.Socket和java.net.ServerSocket編寫客戶端和服務器端程序。
編寫一個客戶端程序需要以下3個步驟
步驟1 創建客戶端Socket:
其中,serverHost為服務器端的host, port為服務器端的監聽端口號。一旦Socket創建成功,則表示客戶端連接服務器成功。
Socket soc=new Socket(serverHost, port);
步驟2 創建輸出、輸入流以向服務器端發送數據和從服務器端接收數據:
//構造數據輸入流,用以接收數據
DataInputStream in=new DataInputStream(soc.getInputStream());
//構造數據輸出流,用以發送數據
DataOutputStream out=new DataOutputStream(soc.getOutputStream());
……
//應用程序發送和接收數據
步驟3 斷開連接:
soc.close();
編寫一個服務器端程序需要以下4個步驟:
步驟1 創建ServerSocket對象:
ServerSocket serverSocket=new ServerSocket(port);
其中,port為服務器端的監聽端口號。當客戶端向服務器端建立連接時,需要知道該端口號。創建ServerSocket對象成功后,操作系統將把當前進程注冊為服務器進程。
步驟2 監聽端口號,等待新連接到達:
Socket soc=serverSocket.accept();
運行函數accept()后,ServerSocket對象會一直處於監聽狀態,等待客戶端的連接請求。一旦有客戶端請求到達,該函數會返回一個Socket對象,該Socket對象與客戶端Socket對象形成一條通信鏈路。
步驟3 創建輸出、輸入流以向客戶端發送數據和從客戶端接收數據。此處的程序和客戶端的一樣,故不再贅述。
步驟4 斷開連接。此處的程序和客戶端的一樣,故不再贅述。
在Client/Server模型中,Server往往需要同時處理大量來自Client的訪問請求,因此Server端需采用支持高並發訪問的架構。一種簡單而又直接的解決方案是“one thread-perconnection”。這是一種基於阻塞式I/O的多線程模型,如下圖所示。在該模型中,Server為每個Client連接創建一個處理線程,每個處理線程阻塞式等待可能到達的數據,一旦數據到達,則立即處理請求、返回處理結果並再次進入等待狀態。由於每個Client連接有一個單獨的處理線程為其服務,因此可保證良好的響應時間。但當系統負載增大(並發請求增多)時,Server端需要的線程數會增加,這將成為系統擴展的瓶頸所在。
2.3 Java NIO
2.3.1 簡介
從J2SE 1.4版本以來,JDK發布了全新的I/O類庫,簡稱NIO(New IO)。它不但引入了全新的高效的I/O機制,同時引入了基於Reactor設計模式的多路復用異步模式。NIO的包中主要包含了以下幾種抽象數據類型:
Channel(通道):NIO把它支持的I/O對象抽象為Channel。它模擬了通信連接,類似於原I/O中的流(Stream),用戶可以通過它讀取和寫入數據。目前已知的實例類有SocketChannel、ServerSocketChannel、DatagramChannel、FileChannel等。
Buffer(緩沖區):Buffer是一塊連續的內存區域,一般作為Channel收發數據的載體出現。所有數據都通過Buffer對象來處理。用戶永遠不會將字節直接寫入通道中,相反,需將數據寫入包含一個或者多個字節的緩沖區;同樣,也不會直接從通道中讀取字節,而是將數據從通道讀入緩沖區,再從緩沖區獲取這個字節。
Selector(選擇器):Selector類提供了監控一個或多個通道當前狀態的機制。只要Channel向Selector注冊了某種特定事件,Selector就會監聽這些事件是否會發生,一旦發生某個事件,便會通知對應的Channel。使用選擇器,借助單一線程,就可對數量龐大的活動I/O通道實施監控和維護,如下圖所示:
2.3.2 常用類
1)Buffer相關類
所有緩沖區包含以下3個屬性:
capacity:緩沖區的末位值。它表明了緩沖區最多可以保存多少數據;
limit:表示緩沖區的當前存放數據的終點。不能對超過limit的區域進行讀寫數據;
position:下一個讀寫單元的位置。每次讀寫緩沖區時,均會修改該值,為下一次讀寫數據做准備。
這三個屬性的大小關系是capacity≥limit≥position≥0
Buffer有兩種不同的工作模式——寫模式和讀模式。在寫模式下,limit與capacity相同,position隨着寫入數據增加,逐漸增加到limit,因此,0到position之間的數據即為已經寫入的數據;在讀模式下,limit初始指向position所在位置,position隨着數據的讀取,逐漸增加到limit,則0到position之間的數據即為已經讀取的數據。
2)Channel相關類
java. nio提供了多種Channel實現,其中,最常用的是以SelectableChannel為基類的通道。SelectableChannel是一種支持阻塞I/O和非阻塞I/O的通道,它的主要方法如下:
- SelectableChannel configureBlocking(boolean block)throws IOException。
- 作用:設置當前SelectableChannel的阻塞模式。
- 參數含義:block表示是否將SelectableChannel設置為阻塞模式。
- 返回值:SelectableChannel對象本身的引用,相當於“return this”。
- SelectionKey register(Selector sel, int ops)throws ClosedChannelException。
- 作用:將當前Channel注冊到一個Selector中。
- 參數含義:sel表示要注冊的Selector;ops表示注冊事件。
- 返回值:與注冊Channel關聯的SelectionKey對象,用於跟蹤被注冊事件。
SelectableChannel的兩個子類是ServerSocketChannel和SocketChannel,它們分別是ServerSocket和Socket的替代類。
ServerSocketChannel主要用於監聽TCP連接,
SocketChannel可看作Socket的替代類,但功能比Socket更加強大。同ServerSocket-Channel類似,它提供了靜態工廠方法open()(創建對象)和socket()方法(返回與SocketChannel關聯的Socket對象)。
3)Selector類
Selector可監聽ServerSocketChannel和SocketChannel注冊的特定事件,一旦某個事件發生,則會通知對應的Channel。SelectableChannel的register()方法負責注冊事件,該方法返回一個SelectionKey對象,該對象即為用於跟蹤這些注冊事件的句柄。
Selector中常用的方法如下。
- static Selector open():一個靜態工廠方法,可用於創建Selector對象。
- int select(long timeout):該方法等待並返回發生的事件。一旦某個注冊的事件發生,就會返回對應的SelectionKey的數目,否則,一直處於阻塞狀態,直到以下四種情況之一發生:
- 至少一個事件發生;
- 其他線程調用了Selector的wakeup()方法;
- 當前執行select()方法的線程被中斷;
- 超出等待時間timeout,如果不設置等待時間,則表示永遠不會超時。
- set selectedKeys():Selector捕獲的已經發生事件對應的SelectionKey集合。
- Selector wakeup():立刻喚醒當前處於阻塞狀態的Selector。常見應用場景是,線程A調用Selector對象的select()方法,阻塞等待某個注冊事件發生,線程B通過調用wakeup()函數可立刻喚醒線程A,使其從select()方法中返回。
4)SelectionKey類
ServerSocketChannel或SocketChannel通過register()方法向Selector注冊事件時,register()方法會創建一個SelectionKey對象,用於跟蹤注冊事件。在SelectionKey中定義了4種事件,分別用以下4個整型常量表示:
通常而言,ServerSocketChannel對象向Selector中注冊SelectionKey.OP_ACCEPT事件,而SocketChannel對象向Selector中注冊SelectionKey.OP_CONNECT、SelectionKey.OP_READ和SelectionKey.OP_WRITE三種事件。
3.Hadoop RPC基本框架分析
3.1 RPC基本概念
RPC是一種通過網絡從遠程計算機上請求服務,但不需要了解底層網絡技術的協議。RPC協議假定某些傳輸協議已經存在,如TCP或UDP等,並通過這些傳輸協議為通信程序之間傳遞訪問請求或者應答信息。在OSI網絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發分布式應用程序更加容易。
3.1.1 RPC組成部分
RPC通常采用客戶機/服務器模型。請求程序是一個客戶機,而服務提供程序則是一個服務器。一個典型的RPC框架主要包括以下幾個部分:
通信模塊:兩個相互協作的通信模塊實現請求-應答協議。它們在客戶機和服務器之間傳遞請求和應答消息,一般不會對數據包進行任何處理。
請求-應答協議的實現方式有兩種,分別是同步方式和異步方式。如下圖同步模式下客戶端程序一直阻塞到服務器端發送的應答請求到達本地;而異步模式則不同,客戶端將請求發送到服務器端后,不必等待應答返回,可以做其他事情,待服務器端處理完請求后,主動通知客戶端。在高並發應用場景中,一般采用異步模式以降低訪問延遲和提高帶寬利用率。
Stub程序:客戶端和服務器端均包含Stub程序,可將之看作代理程序。它使得遠程函數調用表現的跟本地調用一樣,對用戶程序完全透明。在客戶端,它表現的就像一個本地程序,但不直接執行本地調用,而是將請求信息通過網絡模塊發送給服務器端。此外,當服務器端發送應答后,它會解碼對應結果。在服務器端,Stub程序依次進行以下處理:解碼請求消息中的參數、調用相應的服務過程和編碼應答結果的返回值。
調度程序:調度程序接收來自通信模塊的請求消息,並根據其中的標識選擇一個Stub程序處理。通常客戶端並發請求量比較大時,會采用線程池提高處理效率。
客戶程序/服務過程:請求的發出者和請求的處理者。如果是單機環境,客戶程序可直接通過函數調用訪問服務過程,但在分布式環境下,需要考慮網絡通信,這不得不增加通信模塊和Stub程序(保證函數調用的透明性)。
3.1.2 RPC工作流程
通常而言,一個RPC請求從發送到獲取處理結果,所經歷的步驟如下:
步驟1 客戶程序以本地方式調用系統產生的Stub程序;
步驟2 該Stub程序將函數調用信息按照網絡通信模塊的要求封裝成消息包,並交給通信模塊發送到遠程服務器端;
步驟3 遠程服務器端接收此消息后,將此消息發送給相應的Stub程序;
步驟4 Stub程序拆封消息,形成被調過程要求的形式,並調用對應的函數;
步驟5 被調用函數按照所獲參數執行,並將結果返回給Stub程序;
步驟6 Stub程序將此結果封裝成消息,通過網絡通信模塊逐級地傳送給客戶程序。
3.2 Hadoop RPC基本框架
在正式介紹Hadoop RPC基本框架之前,先介紹怎么樣使用它。Hadoop RPC主要對外提供了兩種接口。正所謂知其然,然后知其所以然。
- public static VersionedProtocol getProxy/waitForProxy():構造一個客戶端代理對象(該對象實現了某個協議),用於向服務器端發送RPC請求。
- public static Server getServer():為某個協議(實際上是Java接口)實例構造一個服務器對象,用於處理客戶端發送的請求。
3.2.1 構建一個簡單的Hadoop RPC
通常而言,Hadoop RPC使用方法可分為以下幾個步驟。
步驟1 定義RPC協議。RPC協議是客戶端和服務器端之間的通信接口,它定義了服務器端對外提供的服務接口。
步驟2 實現RPC協議。Hadoop RPC協議通常是一個Java接口,用戶需要實現
該接口。
步驟3 構造並啟動RPC Server。
步驟4 構造RPC Client,並發送RPC請求。
這四步沒有實操總覺得比較遙遠,那我們就動手編碼試一下。
// 1. 定義RPC協議
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol{
//版本號。默認情況下,不同版本號的RPC Client和Server之間不能相互通信
public static final long versionID=1L;
String echo(String value)throws IOException;
int add(int v1,int v2)throws IOException;
}
// 2.實現RPC協議
public static class ClientProtocolImpl implements ClientProtocol{
public long getProtocolVersion(String protocol, long clientVersion){
return ClientProtocol.versionID;
}
public String echo(String value)throws IOException{
return value;
}
public int add(int v1,int v2)throws IOException{
return v1+v2;
}
}
// 3.構造並啟動RPC Server 新建一個類,主方法如下
public static void main(String[] args){
server=RPC.getServer(new ClientProtocolImpl(),serverHost, serverPort,
numHandlers, false, conf);
server.start();
}
// 4.構造RPC Client ,構建客戶端類,方法如下
public static void main(String[] args){
proxy=(ClientProtocol)RPC.getProxy(
ClientProtocol.class, ClientProtocol.versionID, addr, conf);
int result=proxy.add(5,6);
String echoResult=proxy.echo("result");
}
3.2.2 Hadoop RPC 組成類分析
Hadoop RPC主要由三個大類組成,分別是RPC、Client和Server,分別對應對外編程接口、客戶端實現和服務器端實現。
3.2.2.1 RPC類分析
RPC類實際上是對底層客戶機/服務器網絡模型的封裝,以便為程序員提供一套更方便簡潔的編程接口。
RPC類自定義了一個內部類RPC.Server。它繼承Server抽象類,並利用Java反射機制實現了call接口(Server抽象類中並未給出該接口的實現),即根據客戶端請求中的調用方法名稱和對應參數完成方法調用。RPC類包含一個ClientCache類型的成員變量,它根據用戶提供的SocketFactory緩存Client對象,以達到重用Client對象的目的。
3.2.2.2 Client類分析
Client主要完成的功能是發送遠程過程調用信息並接收執行結果。Client類對外提供了兩種接口,一種用於執行單個遠程調用。另外一種用於執行批量遠程調用。
Client內部有兩個重要的內部類,分別是Call和Connection:
Call類:該類封裝了一個RPC請求,它包含五個成員變量,分別是唯一標識id、函數調用信息param、函數執行返回值value、出錯或者異常信息error和執行完成標識符done。由於Hadoop RPC Server采用了異步方式處理客戶端請求,這使得遠程過程調用的發生順序與結果返回順序無直接關系,而Client端正是通過id識別不同的函數調用。當客戶端向服務器端發送請求時,只需填充id和param兩個變量,而剩下的三個變量:value, error和done,則由服務器端根據函數執行情況填充。
Connection類:Client與每個Server之間維護一個通信連接。該連接相關的基本信息及操作被封裝到Connection類中。其中,基本信息主要包括:通信連接唯一標識(remoteId),與Server端通信的Socket(socket),網絡輸入數據流(in),網絡輸出數據流(out),保存RPC請求的哈希表(calls)等
當調用call函數執行某個遠程方法時,Client端需要進行如下幾個步驟:
步驟1 創建一個Connection對象,並將遠程方法調用信息封裝成Call對象,放到Connection對象中的哈希表calls中;
步驟2 調用Connetion類中的sendParam()方法將當前Call對象發送給Server端;
步驟3 Server端處理完RPC請求后,將結果通過網絡返回給Client端,Client端通過receiveResponse()函數獲取結果;
步驟4 Client端檢查結果處理狀態(成功還是失敗),並將對應的Call對象從哈希表中刪除。
3.2.2.3 Server類分析
Hadoop采用了Master/Slave結構。其中,Master是整個系統的單點,如NameNode或JobTracker,這是制約系統性能和可擴展性的最關鍵因素之一,而Master通過ipc.Server接收並處理所有Slave發送的請求,這就要求ipc.Server將高並發和可擴展性作為設計目標。為此,ipc.Server采用了很多具有提高並發處理能力的技術,主要包括線程池、事件驅動和Reactor設計模式等。這些技術均采用了JDK自帶的庫實現。
Reactor是並發編程中的一種基於事件驅動的設計模式。它具有以下兩個特點:
①通過派發/分離I/O操作事件提高系統的並發性能;
②提供了粗粒度的並發控制,使用單線程實現,避免了復雜的同步處理。
一個典型的Reactor模式中主要包括以下幾個角色。
Server的主要功能是接收來自客戶端的RPC請求,經過調用相應的函數獲取結果后,返回給對應的客戶端。為此,ipc.Server被划分成三個階段:接收請求,處理請求和返回結果。各階段實現細節如下:
1)接收請求
該階段的主要任務是接收來自各個客戶端的RPC請求,並將它們封裝成固定的格式(Call類)放到一個共享隊列(callQueue)中,以便進行后續處理。該階段內部又分為兩個子階段:建立連接和接收請求,分別由兩種線程完成:Listener和Reader。
整個Server只有一個Listener線程,統一負責監聽來自客戶端的連接請求。一旦有新的請求到達,它會采用輪詢的方式從線程池中選擇一個Reader線程進行處理。而Reader線程可同時存在多個,它們分別負責接收一部分客戶端連接的RPC請求。至於每個Reader線程負責哪些客戶端連接,完全由Listener決定。當前Listener只是采用了簡單的輪詢分配機制。
Listener和Reader線程內部各自包含一個Selector對象,分別用於監聽SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。對於Listener線程,主循環的實現體是監聽是否有新的連接請求到達,並采用輪詢策略選擇一個Reader線程處理新連接;對於Reader線程,主循環的實現體是監聽(它負責的那部分)客戶端連接中是否有新的RPC請求到達,並將新的RPC請求封裝成Call對象,放到共享隊列callQueue中。
2)處理請求
該階段的主要任務是從共享隊列callQueue中獲取Call對象,執行對應的函數調用,並將結果返回給客戶端,這全部由Handler線程完成。Server端可同時存在多個Handler線程。它們並行從共享隊列中讀取Call對象,經執行對應的函數調用后,將嘗試着直接將結果返回給對應的客戶端。但考慮到某些函數調用返回的結果很大或者網絡速度過慢,可能難以將結果一次性發送到客戶端,此時Handler將嘗試着將后續發送任務交給Responder線程。
3)返回結果
每個Handler線程執行完函數調用后,會嘗試着將執行結果返回給客戶端,但對於特殊情況,比如函數調用返回的結果過大或者網絡異常情況(網速過慢),會將發送任務交給Responder線程。
Server端僅存在一個Responder線程。它的內部包含一個Selector對象,用於監聽SelectionKey.OP_WRITE事件。當Handler沒能夠將結果一次性發送到客戶端時,會向該Selector對象注冊SelectionKey.OP_WRITE事件,進而由Responder線程采用異步方式繼續發送未發送完成的結果。
4. Hadoop RPC的發展與展望
當前存在非常多的開源RPC框架,比較有名的有Thrift, Protocol Buffers和Avro。與Hadoop RPC一樣,它們均由兩部分組成:對象序列化和遠程過程調用。相比於Hadoop RPC,它們有以下幾個特點。
隨着Hadoop版本的不斷演化,Hadoop RPC在跨語言支持和協議兼容性兩個方面存在不足,具體表現為:
1)從長遠發展看,Hadoop RPC應允許某些協議的客戶端或者服務器端采用其他語言實現,比如用戶希望直接使用C/C++語言讀寫HDFS中的文件,這就需要有C/C++語言的HDFS客戶端。
2)當前Hadoop版本較多,而不同版本之間不能通信。
從0.21.0版本開始,Hadoop嘗試着將RPC中的序列化部分剝離開,以便將現有的開源RPC框架集成進來。RPC類變成了一個工廠,它將具體的RPC實現授權給RpcEngine實現類,而現有的開源RPC只要實現RpcEngine接口,便可以集成到Hadoop RPC中。
正如當前的YARN使用的事件處理的方式,能夠大大增強並發性,從而提高系統整體性能。
以及Yarn的RPC通訊方式:
YARN中的序列化框架采用了Google開源的Protocol Buffers。Protocol Buffers的引入使得YARN在兼容性方面向前邁進了一大步。
總結
Hadoop RPC是Hadoop多個子系統公用的網絡通信模塊。其性能和可擴展性直接影響其上層系統的性能和可擴展性,因此扮演着極其重要的角色。
Hadoop RPC分為兩層:上層是直接供外面使用的公共RPC接口;下層是一個客戶機/服務器模型,該模型在實現過程中用到了Java自帶的多個工具包,包括java.lang.reflect(反射機制和動態代理相關類)、java.net(網絡編程庫)和java.nio(NIO)等。
Hadoop RPC主要由三個大類組成,分別是RPC、Client和Server,分別對應對外編程接口、客戶端實現和服務器端實現。其中,Server具有高性能和良好的可擴展性等特點,在具體實現時采用了線程池、事件驅動和Reactor設計模式等機制。
Hadoop MapReduce基於RPC框架實現了6個通信協議,分別是JobSubmissionsProtocol, RefreshUserMappingsProtocol,RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,InterTrackerProtocol和TaskUmbilicalProtocol。這些協議像是系統的“骨架”,支撐起整個MapReduce系統。隨着Hadoop的不斷演化,更多開源的RPC框架不斷和現有RPC機制進行整合,更好的提升Hadoop的並發和處理能力。
好了,今天的文章到這里就結束了,希望對小可愛們有所幫助。
路漫漫其修遠兮,吾將上下而求索。讓我們一起在不斷學習的道路上漸行漸遠漸無書。