輕松使用Hadoop RPC


Hadoop RPC是Hadoop的一個重要部分,提供分布式環境下的對象調用功能,源碼在org.apache.hadoop.ipc中。而HBase也幾乎完全copy了這部分的源碼,只是在配置項上面有所改動。

關於Hadoop RPC的機制分析和源碼解讀,網上已經有許多資料,一搜一大把,這里就不在描述了。本文通過一個小例子,介紹如何調用Hadoop RPC。

1. 應用場景

Hadoop RPC在整個Hadoop中應用非常廣泛,Client、DataNode、NameNode之間的通訊全靠它了。

舉個例子,我們平時操作HDFS的時候,使用的是FileSystem類,它的內部有個DFSClient對象,這個對象負責與NameNode打交道。在運行時,DFSClient在本地創建一個NameNode的代理,然后就操作這個代理,這個代理就會通過網絡,遠程調用到NameNode的方法,也能返回值。

在我的應用場景中,需要一個元數據服務器,各節點經常需要去查詢元數據,可以使用這套RPC機制。

2. Protocol

被遠程訪問的類,也就是Server端,必須實現VersionedProtocol接口,這個接口只有一個方法getProtocolVersion,用來判斷Server和Client端調用的是不是一個版本的,一般Server的代碼修改一次,版本號就得改一次。

在例子中,我們定義一個接口MyProtocol,繼承VersionedProtocol,里面定義Server端需要實現的方法。

這里MyProtocol接口只有一個方法println,輸入一個Text,打印出來,並返回一個Text。

MyProtocol.java代碼如下:

1 import org.apache.hadoop.io.Text;
2 import org.apache.hadoop.ipc.VersionedProtocol;
3
4 public interface MyProtocol extends VersionedProtocol {
5 public Text println(Text t);
6 }

3. Server

Server端實現上述的Protocol接口,里面需要啟動一個RPC.Server,它是一個Thread。

構造方法是RPC.getServer(Object instance, String bindAddress, int port, Configuration conf)

  • instance:表示提供遠程訪問的對象,一般Server都會傳入this作為參數;
  • bindAddress:Server綁定的ip地址;
  • port:Server綁定的端口;
  • conf:Configuration對象,不用解釋了吧。

MyServer實現了MyProtocol接口中定義的println方法,將參數打印到控制台,並返回finish。

MyServer.java代碼如下:

 1 import java.io.IOException;
2 import java.net.UnknownHostException;
3
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.io.Text;
6 import org.apache.hadoop.ipc.RPC;
7 import org.apache.hadoop.ipc.RPC.Server;
8
9 public class MyServer implements MyProtocol{
10 private Server server;
11
12 public MyServer(){
13 try {
14 server = RPC.getServer(this, "localhost", 8888, new Configuration());
15 server.start();
16 server.join();
17 } catch (UnknownHostException e) {
18 e.printStackTrace();
19 } catch (IOException e) {
20 e.printStackTrace();
21 } catch (InterruptedException e) {
22 e.printStackTrace();
23 }
24 }
25
26 @Override
27 public Text println(Text t){
28 System.out.println(t);
29 return new Text("finish");
30 }
31
32 @Override
33 public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
34 return 1;
35 }
36
37 public static void main(String[] args) {
38 new MyServer();
39 }
40
41 }

4. Client

Client端需要創建一個Server的遠程代理,並可以通過操作這個代理,來調用到Server端的方法。

創建代理可以調用RPC.waitForProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf)

  • protocol:一個Protocol的class,它必須是繼承VersionedProtocol的接口;
  • clientVersion:客戶端的版本號,如果與服務端不一致,則會拋錯;
  • addr:一個InetSocketAddress對象,包含了ip和port;
  • conf:不解釋。

這個方法會返回一個VersionedProtocol類型的代理對象,將它強制轉型成自己定義的Protocol,接下來就可以操作創建好的代理了。在例子中,我們通過代理來讓Server端打印字符串到控制台,並接受返回的消息。

MyClient.java代碼如下:

 1 import java.io.IOException;
2 import java.net.InetSocketAddress;
3
4 import org.apache.hadoop.conf.Configuration;
5 import org.apache.hadoop.io.Text;
6 import org.apache.hadoop.ipc.RPC;
7
8 public class MyClient {
9
10 private MyProtocol proxy;
11
12 public MyClient(){
13 InetSocketAddress addr = new InetSocketAddress("localhost",8888);
14 try {
15 proxy = (MyProtocol) RPC.waitForProxy(MyProtocol.class, 1, addr , new Configuration());
16 } catch (IOException e) {
17 e.printStackTrace();
18 }
19 }
20
21 public void println(String s){
22 System.out.println(proxy.println(new Text(s)));
23 }
24
25 public void close(){
26 RPC.stopProxy(proxy);
27 }
28
29 public static void main(String[] args) {
30 MyClient c = new MyClient();
31 c.println("123");
32 c.close();
33 }
34 }

 5. 運行

運行MyServer,控制台顯示:

2011-12-30 18:49:56 -[INFO] Initializing RPC Metrics with hostName=MyServer, port=8888
2011-12-30 18:49:56 -[INFO] IPC Server listener on 8888: starting
2011-12-30 18:49:56 -[INFO] IPC Server Responder: starting
2011-12-30 18:49:56 -[INFO] IPC Server handler 0 on 8888: starting

運行MyClient,控制台顯示:

finish

MyServer端會追加顯示:

123 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM