hadoop IPC 源代碼分析


      

如圖所示, 在hadoop中客戶端需要和服務端通信 。 首先我們看一下需求是啥。

舉一個例子,在客戶端想要往hadoop集群中寫數據的時候,它需要先和namenode通信,以便獲得 諸一個blockID。

這時 ,我們希望在客戶端可以做到 諸如 調用一個方法 如 getBlockID() 則就獲得了服務端的發過來的ID ,如果調用本地方法一樣。

需求搞定,我們看現實有的條件 服務端通信我們有的能力為socket,這個是已經封裝在linux內核之中, JAVA對linux內核通信又進行了封裝,有了自己的

Socket ServerSocket 通信, 同時在JAVA Nio中又提出了 異步方式的IO。

好,我們有的資源和需要達到的目標都已經有了,下面是實現中間件來彌補兩者之間的鴻溝。

首先從客戶端來看。 客戶端調用服務端的服務,肯定需要底層通信處理,而且這些通信處理需要集中處理,不能每次遠程調用,都需重新處理一遍底層連接。

有什么方法可以達到這個目的么 ? 動態代理。

  1. public Object invoke(Object proxy, Method method, Object[] args)
  2.   throws Throwable {
  3.      
  4.   ObjectWritable value = (ObjectWritable)
  5.     client.call(new Invocation(method, args), remoteId);
  6.     
  7.   return value.get();
  8. }

一般我們看到的動態代理的invoke()方法中總會有 method.invoke(ac, arg);  這句代碼。而上面代碼中卻沒有,這是為什么呢?其實使用 method.invoke(ac, arg); 是在本地JVM中調用;

在客戶端這邊並沒有Proxy對象,我們需要到服務找到對應的對象然后調用相應的方法。在hadoop中,是將數據發送給服務端,服務端將處理的結果再返回給客戶端,所以這里的invoke()方法必然需要進行網絡通信。

到這里我們可以再一次從圖形來表示一下我們需要達到的目標。

    

             

                 

    

    

    

下面這句代碼就是服務端真正的調用相應方法的語句, 其中的instance對象,是運行在服務端的對象,call是客戶端傳遞過來的參數。 通過反射機制,進行方法調用。

  1. Object value = method.invoke(instance, call.getParameters());

    

上面我們把大體的框架搭建起來了,下面一步步進行細節分析。

    

在上面所示的invok()方法中,最終調用的方法為

  1. client.call(new Invocation(method, args), remoteId);
  2. value.get();

我們需求分析 Client端是如何通過 這兩個方法 調用了遠程服務器的方法,並且獲取返回值得。

需要解決的三個問題是

  1. 客戶端和服務端的連接是怎樣建立的?
  2. . 客戶端是怎樣給服務端發送數據的?
  3. 客戶端是怎樣獲取服務端的返回數據的?
  4. public Writable call(Writable param, ConnectionId remoteId)
  5.                        throws InterruptedException, IOException {
  6.     Call call = new Call(param); //將傳入的數據封裝成call對象
  7.     Connection connection = getConnection(remoteId, call); //獲得一個連接
  8.     connection.sendParam(call); // 向服務端發送call對象
  9.     boolean interrupted = false;
  10.     synchronized (call) {
  11.       while (!call.done) {
  12.         try {
  13.           call.wait(); // 等待結果的返回,在Call類的callComplete()方法里有notify()方法用於喚醒線程
  14.         } catch (InterruptedException ie) {
  15.           // 因中斷異常而終止,設置標志interrupted為true
  16.           interrupted = true;
  17.         }
  18.       }
  19.       if (interrupted) {
  20.         Thread.currentThread().interrupt();
  21.       }
  22.     
  23.       if (call.error != null) {
  24.         if (call.error instanceof RemoteException) {
  25.           call.error.fillInStackTrace();
  26.           throw call.error;
  27.         } else { // 本地異常
  28.           throw wrapException(remoteId.getAddress(), call.error);
  29.         }
  30.       } else {
  31.         return call.value; //返回結果數據
  32.       }
  33.     }
  34.   }

網絡通信有關的代碼只會是下面的兩句了:

  1. Connection connection = getConnection(remoteId, call); //獲得一個連接
  2. connection.sendParam(call); // 向服務端發送call對象

先看看是怎么獲得一個到服務端的連接吧,下面貼出ipc.Client類中的getConnection()方法。

  1. private Connection getConnection(ConnectionId remoteId,
  2.                                    Call call)
  3.                                    throws IOException, InterruptedException {
  4.     if (!running.get()) {
  5.       // 如果client關閉了
  6.       throw new IOException("The client is stopped");
  7.     }
  8.     Connection connection;
  9. //如果connections連接池中有對應的連接對象,就不需重新創建了;如果沒有就需重新創建一個連接對象。
  10. //但請注意,該//連接對象只是存儲了remoteId的信息,其實還並沒有和服務端建立連接。
  11.     do {
  12.       synchronized (connections) {
  13.         connection = connections.get(remoteId);
  14.         if (connection == null) {
  15.           connection = new Connection(remoteId);
  16.           connections.put(remoteId, connection);
  17.         }
  18.       }
  19.     } while (!connection.addCall(call)); //將call對象放入對應連接中的calls池,就不貼出源碼了
  20.    //這句代碼才是真正的完成了和服務端建立連接哦~
  21.     connection.setupIOstreams();
  22.     return connection;
  23.   }

下面貼出Client.Connection類中的setupIOstreams()方法:

  1. private synchronized void setupIOstreams() throws InterruptedException {
  2. ???
  3.     try {
  4.      ???
  5.       while (true) {
  6.         setupConnection(); //建立連接
  7.         InputStream inStream = NetUtils.getInputStream(socket); //獲得輸入流
  8.         OutputStream outStream = NetUtils.getOutputStream(socket); //獲得輸出流
  9.         writeRpcHeader(outStream);
  10.         ???
  11.         this.in = new DataInputStream(new BufferedInputStream
  12.             (new PingInputStream(inStream))); //將輸入流裝飾成DataInputStream
  13.         this.out = new DataOutputStream
  14.         (new BufferedOutputStream(outStream)); //將輸出流裝飾成DataOutputStream
  15.         writeHeader();
  16.         // 跟新活動時間
  17.         touch();
  18.         //當連接建立時,啟動接受線程等待服務端傳回數據,注意:Connection繼承了Tread
  19.         start();
  20.         return;
  21.       }
  22.     } catch (IOException e) {
  23.       markClosed(e);
  24.       close();
  25.     }
  26.   }

再有一步我們就知道客戶端的連接是怎么建立的啦,下面貼出Client.Connection類中的setupConnection()方法:

  1. private synchronized void setupConnection() throws IOException {
  2.     short ioFailures = 0;
  3.     short timeoutFailures = 0;
  4.     while (true) {
  5.       try {
  6.         this.socket = socketFactory.createSocket(); //終於看到創建socket的方法了
  7.         this.socket.setTcpNoDelay(tcpNoDelay);
  8.        ???
  9.         // 設置連接超時為20s
  10.         NetUtils.connect(this.socket, remoteId.getAddress(), 20000);
  11.         this.socket.setSoTimeout(pingInterval);
  12.         return;
  13.       } catch (SocketTimeoutException toe) {
  14.         /* 設置最多連接重試為45次。
  15.          * 總共有20s*45 = 15 分鍾的重試時間。
  16.          */
  17.         handleConnectionFailure(timeoutFailures++, 45, toe);
  18.       } catch (IOException ie) {
  19.         handleConnectionFailure(ioFailures++, maxRetries, ie);
  20.       }
  21.     }
  22.   }

終於,我們知道了客戶端的連接是怎樣建立的了,其實就是創建一個普通的socket進行通信。

問題2:客戶端是怎樣給服務端發送數據的? 

第一句為了完成連接的建立,我們已經分析完畢;而第二句是為了發送數據,呵呵,分析下去,看能不能解決我們的問題呢。下面貼出Client.Connection類的sendParam()方法吧:

  1. public void sendParam(Call call) {
  2.       if (shouldCloseConnection.get()) {
  3.         return;
  4.       }
  5.       DataOutputBuffer d=null;
  6.       try {
  7.         synchronized (this.out) {
  8.           if (LOG.isDebugEnabled())
  9.             LOG.debug(getName() + " sending #" + call.id);
  10.           //創建一個緩沖區
  11.           d = new DataOutputBuffer();
  12.           d.writeInt(call.id);
  13.           call.param.write(d);
  14.           byte[] data = d.getData();
  15.           int dataLength = d.getLength();
  16.           out.writeInt(dataLength); //首先寫出數據的長度
  17.           out.write(data, 0, dataLength); //向服務端寫數據
  18.           out.flush();
  19.         }
  20.       } catch(IOException e) {
  21.         markClosed(e);
  22.       } finally {
  23.         IOUtils.closeStream(d);
  24.       }
  25.     }

問題3:客戶端是怎樣獲取服務端的返回數據的? 

,當連接建立時會啟動一個線程用於處理服務端返回的數據,我們看看這個處理線程是怎么實現的吧,下面貼出Client.Connection類和Client.Call類中的相關方法吧:

  1. 方法一:
  2.   public void run() {
  3.       ???
  4.       while (waitForWork()) {
  5.         receiveResponse(); //具體的處理方法
  6.       }
  7.       close();
  8.      ???
  9. }
  10.     
  11. 方法二:
  12. private void receiveResponse() {
  13.       if (shouldCloseConnection.get()) {
  14.         return;
  15.       }
  16.       touch();
  17.       try {
  18.         int id = in.readInt(); // 阻塞讀取id
  19.         if (LOG.isDebugEnabled())
  20.           LOG.debug(getName() + " got value #" + id);
  21.           Call call = calls.get(id); //在calls池中找到發送時的那個對象
  22.         int state = in.readInt(); // 阻塞讀取call對象的狀態
  23.         if (state == Status.SUCCESS.state) {
  24.           Writable value = ReflectionUtils.newInstance(valueClass, conf);
  25.           value.readFields(in); // 讀取數據
  26.         //將讀取到的值賦給call對象,同時喚醒Client等待線程,貼出setValue()代碼方法三
  27.           call.setValue(value);
  28.           calls.remove(id); //刪除已處理的call
  29.         } else if (state == Status.ERROR.state) {
  30.         ???
  31.         } else if (state == Status.FATAL.state) {
  32.         ???
  33.         }
  34.       } catch (IOException e) {
  35.         markClosed(e);
  36.       }
  37. }
  38.     
  39. 方法三:
  40. public synchronized void setValue(Writable value) {
  41.       this.value = value;
  42.       callComplete(); //具體實現
  43. }
  44. protected synchronized void callComplete() {
  45.       this.done = true;
  46.       notify(); // 喚醒client等待線程
  47.     }

    

客戶端的代碼分析就到這里,我們可以發現 ,客戶端使用 普通的socket 連接把客戶端的方法調用 名稱 參數 (形參 和實參) 傳遞到服務端了。

下面分析服務端的代碼。

對於ipc.Server,我們先分析一下它的幾個內部類吧:

     

Call :用於存儲客戶端發來的請求
Listener 
監聽類,用於監聽客戶端發來的請求,同時Listener內部還有一個靜態類,Listener.Reader,當監聽器監聽到用戶請求,便讓Reader讀取用戶請求。
Responder 
:響應RPC請求類,請求處理完畢,由Responder發送給請求客戶端。
Connection 
:連接類,真正的客戶端請求讀取邏輯在這個類中。
Handler 
:請求處理類,會循環阻塞讀取callQueue中的call對象,並對其進行操作。

     

你會發現其實ipc.Server是一個abstract修飾的抽象類。那隨之而來的問題就是:hadoop是怎樣初始化RPCServer端的呢?Namenode初始化時一定初始化了RPCSever端,那我們去看看Namenode的初始化源碼吧:

  1. private void initialize(Configuration conf) throws IOException {
  2.    ???
  3.     // 創建 rpc server
  4.     InetSocketAddress dnSocketAddr = getServiceRpcServerAddress(conf);
  5.     if (dnSocketAddr != null) {
  6.       int serviceHandlerCount =
  7.         conf.getInt(DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY,
  8.                     DFSConfigKeys.DFS_NAMENODE_SERVICE_HANDLER_COUNT_DEFAULT);
  9.       //獲得serviceRpcServer
  10.       this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
  11.           dnSocketAddr.getPort(), serviceHandlerCount,
  12.           false, conf, namesystem.getDelegationTokenSecretManager());
  13.       this.serviceRPCAddress = this.serviceRpcServer.getListenerAddress();
  14.       setRpcServiceServerAddress(conf);
  15. }
  16. //獲得server
  17.     this.server = RPC.getServer(this, socAddr.getHostName(),
  18.         socAddr.getPort(), handlerCount, false, conf, namesystem
  19.         .getDelegationTokenSecretManager());
  20.     
  21.    ???
  22.     this.server.start(); //啟動 RPC server Clients只允許連接該server
  23.     if (serviceRpcServer != null) {
  24.       serviceRpcServer.start(); //啟動 RPC serviceRpcServer 為HDFS服務的server
  25.     }
  26.     startTrashEmptier(conf);
  27.   }
    1.     this.serviceRpcServer = RPC.getServer(this, dnSocketAddr.getHostName(),
  28.           dnSocketAddr.getPort(), serviceHandlerCount,

    

這里面我們需要重點關注的是這個上面這個方法, 可以看到這里面傳遞過去的第一個參數是this .我們在前面說服務端最終是需要調用在服務端的某個對象來實際運行方法的。

現在這個this對象,及namenode對象就是服務端的相應對象。我們就有疑問,那么客戶端有那么多接口 ,namenode都實現了相應的對象么?是的都實現了。這也好理解,客戶端

會調用什么方法,肯定都是服務端和客戶端事先約定好的,服務端肯定把相應的對象創建好了來等待客戶端的調用。我們可以看一下namenode實現的端口,就很明晰了。

  1. public class NameNode implements ClientProtocol, DatanodeProtocol,
  2.                                  NamenodeProtocol, FSConstants,
  3.                                  RefreshAuthorizationPolicyProtocol,
  4.                                  RefreshUserMappingsProtocol {

    

下面我們來分析服務端是如何處理請求的。

分析過ipc.Client源碼后,我們知道Client端的底層通信直接采用了阻塞式IO編程。但hadoop是單中心結構,所以服務端不可以這么做,而是采用了java  NIO來實現Server端,那Server端采用java NIO是怎么建立連接的呢?分析源碼得知,Server端采用Listener監聽客戶端的連接,下面先分析一下Listener的構造函數吧:

  1. public Listener() throws IOException {
  2.   address = new InetSocketAddress(bindAddress, port);
  3.   // 創建ServerSocketChannel,並設置成非阻塞式
  4.   acceptChannel = ServerSocketChannel.open();
  5.   acceptChannel.configureBlocking(false);
  6.     
  7.   // 將server socket綁定到本地端口
  8.   bind(acceptChannel.socket(), address, backlogLength);
  9.   port = acceptChannel.socket().getLocalPort();
  10.   // 獲得一個selector
  11.   selector= Selector.open();
  12.   readers = new Reader[readThreads];
  13.   readPool = Executors.newFixedThreadPool(readThreads);
  14.   //啟動多個reader線程,為了防止請求多時服務端響應延時的問題
  15.   for (int i = 0; i < readThreads; i++) {
  16.     Selector readSelector = Selector.open();
  17.     Reader reader = new Reader(readSelector);
  18.     readers[i] = reader;
  19.     readPool.execute(reader);
  20.   }
  21.   // 注冊連接事件
  22.   acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
  23.   this.setName("IPC Server listener on " + port);
  24.   this.setDaemon(true);
  25. }

在啟動Listener線程時,服務端會一直等待客戶端的連接,下面貼出Server.Listener類的run()方法:

  1. public void run() {
  2.    ???
  3.     while (running) {
  4.       SelectionKey key = null;
  5.       try {
  6.         selector.select();
  7.         Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
  8.         while (iter.hasNext()) {
  9.           key = iter.next();
  10.           iter.remove();
  11.           try {
  12.             if (key.isValid()) {
  13.               if (key.isAcceptable())
  14.                 doAccept(key); //具體的連接方法
  15.             }
  16.           } catch (IOException e) {
  17.           }
  18.           key = null;
  19.         }
  20.       } catch (OutOfMemoryError e) {
  21.      ???
  22.   }

下面貼出Server.Listener類中doAccept ()方法中的關鍵源碼吧:

  1.     void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
  2.       Connection c = null;
  3.       ServerSocketChannel server = (ServerSocketChannel) key.channel();
  4.       SocketChannel channel;
  5.       while ((channel = server.accept()) != null) { //建立連接
  6.         channel.configureBlocking(false);
  7.         channel.socket().setTcpNoDelay(tcpNoDelay);
  8.         Reader reader = getReader(); //從readers池中獲得一個reader
  9.         try {
  10.           reader.startAdd(); // 激活readSelector,設置adding為true
  11.           SelectionKey readKey = reader.registerChannel(channel);//將讀事件設置成興趣事件
  12.           c = new Connection(readKey, channel, System.currentTimeMillis());//創建一個連接對象
  13.           readKey.attach(c); //將connection對象注入readKey
  14.           synchronized (connectionList) {
  15.             connectionList.add(numConnections, c);
  16.             numConnections++;
  17.           }
  18.         ???
  19.         } finally {
  20. //設置adding為false,采用notify()喚醒一個reader,其實代碼十三中啟動的每個reader都使
  21. //用了wait()方法等待。因篇幅有限,就不貼出源碼了。
  22.           reader.finishAdd();
  23.         }
  24.       }
  25.     }

reader被喚醒,reader接着執行doRead()方法。

下面貼出Server.Listener.Reader類中的doRead()方法和Server.Connection類中的readAndProcess()方法源碼:

    

  1. 方法一:
  2.  void doRead(SelectionKey key) throws InterruptedException {
  3.       int count = 0;
  4.       Connection c = (Connection)key.attachment(); //獲得connection對象
  5.       if (c == null) {
  6.         return;
  7.       }
  8.       c.setLastContact(System.currentTimeMillis());
  9.       try {
  10.         count = c.readAndProcess(); // 接受並處理請求
  11.       } catch (InterruptedException ieo) {
  12.        ???
  13.       }
  14.      ???
  15. }
  16.     
  17. 方法二:
  18. public int readAndProcess() throws IOException, InterruptedException {
  19.       while (true) {
  20.         ???
  21.         if (!rpcHeaderRead) {
  22.           if (rpcHeaderBuffer == null) {
  23.             rpcHeaderBuffer = ByteBuffer.allocate(2);
  24.           }
  25.          //讀取請求頭
  26.           count = channelRead(channel, rpcHeaderBuffer);
  27.           if (count < 0 || rpcHeaderBuffer.remaining() > 0) {
  28.             return count;
  29.           }
  30.         // 讀取請求版本號
  31.           int version = rpcHeaderBuffer.get(0);
  32.           byte[] method = new byte[] {rpcHeaderBuffer.get(1)};
  33.         ???
  34.     
  35.           data = ByteBuffer.allocate(dataLength);
  36.         }
  37.         // 讀取請求
  38.         count = channelRead(channel, data);
  39.     
  40.         if (data.remaining() == 0) {
  41.          ???
  42.           if (useSasl) {
  43.          ???
  44.           } else {
  45.             processOneRpc(data.array());//處理請求
  46.           }
  47.         ???
  48.           }
  49.         }
  50.         return count;
  51.       }
  52.     }

獲得call對象 
下面貼出Server.Connection類中的processOneRpc()方法和processData()方法的源碼

  1. 方法一:
  2.  private void processOneRpc(byte[] buf) throws IOException,
  3.         InterruptedException {
  4.       if (headerRead) {
  5.         processData(buf);
  6.       } else {
  7.         processHeader(buf);
  8.         headerRead = true;
  9.         if (!authorizeConnection()) {
  10.           throw new AccessControlException("Connection from " + this
  11.               + " for protocol " + header.getProtocol()
  12.               + " is unauthorized for user " + user);
  13.         }
  14.       }
  15. }
  16. 方法二:
  17.     private void processData(byte[] buf) throws IOException, InterruptedException {
  18.       DataInputStream dis =
  19.         new DataInputStream(new ByteArrayInputStream(buf));
  20.       int id = dis.readInt(); // 嘗試讀取id
  21.       Writable param = ReflectionUtils.newInstance(paramClass, conf);//讀取參數
  22.       param.readFields(dis);
  23.     
  24.       Call call = new Call(id, param, this); //封裝成call
  25.       callQueue.put(call); // 將call存入callQueue
  26.       incRpcCount(); // 增加rpc請求的計數
  27.     }

處理call對象 
你還記得Server類中還有個Handler內部類嗎?呵呵,對call對象的處理就是它干的。下面貼出Server.Handler類中run()方法中的關鍵代碼:

  1. while (running) {
  2.       try {
  3.         final Call call = callQueue.take(); //彈出call,可能會阻塞
  4.         ???
  5.         //調用ipc.Server類中的call()方法,但該call()方法是抽象方法,具體實現在RPC.Server類中
  6.         value = call(call.connection.protocol, call.param, call.timestamp);
  7.         synchronized (call.connection.responseQueue) {
  8.           setupResponse(buf, call,
  9.                       (error == null) ? Status.SUCCESS : Status.ERROR,
  10.                       value, errorClass, error);
  11.            ???
  12.           //給客戶端響應請求
  13.           responder.doRespond(call);
  14.         }
  15. }

終於看到了call 方法 我們下面看看服務端實際的call方法是怎么執行的吧

  1. public Writable call(Class<?> protocol, Writable param, long receivedTime)
  2.     throws IOException {
  3.       try {
  4.         Invocation call = (Invocation)param;
  5.         if (verbose) log("Call: " + call);
  6.     
  7.         Method method =
  8.           protocol.getMethod(call.getMethodName(),
  9.                                    call.getParameterClasses());
  10.         method.setAccessible(true);
  11.     
  12.         long startTime = System.currentTimeMillis();
  13.         Object value = method.invoke(instance, call.getParameters());

最后一句我們發現實際上是用了反射。 反射中的那個實際對象 instance 就是在namenode起來的時候創建的namenode對象。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM