在前面說hadoop整體實現的時候, 說過DataNode的需要完成的首要任務是K-V存儲。
第二個功能是 完成和namenode 通信 ,這個通過IPC 心跳連接實現。此外還有和客戶端 其它datanode之前的信息交換。
第 三個功能是 完成和客戶端還有其它節點的大規模通信,這個需要直接通過socket 協議實現。
下面開始分析源代碼,看看DataNode是如何實現這些功能的。
分析代碼采取自定向下的分析方式, 看看代碼中做了啥,然后分析這些代碼的作用。
首先看Datanode實現的接口。
-
public class DataNode extends Configured
-
implements InterDatanodeProtocol, ClientDatanodeProtocol, FSConstants,
-
Runnable, DataNodeMXBean {
它實現了 InterDatanodeProtocol, ClientDatanodeProtocol, 這兩個重要接口。 作用和之前分析haoop IPC的時候提到過, 為了是客戶端 和其它datanode節點遠程調用本dataNode節點方法的時候,提供方法實際運行的對象。
我們可以看到它並沒有實現和datanode的接口,因為datanode是主動和nameNode聯系,nameNode從來不會主動調用dataNode上的方法。
在main 方法中主要 通過一系列調用創建了datanode對象。
之后datanode的初始化工作主要由 startDataNode()來完成, 這是一個很復雜的方法,我們來一點一點的分析。
-
void startDataNode(Configuration conf,
-
AbstractList<File> dataDirs, SecureResources resources
-
) throws IOException {
-
if(UserGroupInformation.isSecurityEnabled() && resources == null)
-
throw new RuntimeException("Cannot start secure cluster without " +
-
"privileged resources.");
-
// connect to name node
-
this.namenode = (DatanodeProtocol)
-
RPC.waitForProxy(DatanodeProtocol.class,
-
DatanodeProtocol.versionID,
-
nameNodeAddr,
-
conf);
-
這個是通過反射獲取同dataNode節點通信的代理對象
-
// get version and id info from the name-node
-
NamespaceInfo nsInfo = handshake(); //立刻與名字節點通信
-
StartupOption startOpt = getStartupOption(conf);
-
assert startOpt != null : "Startup option must be set.";
-
storage.recoverTransitionRead(nsInfo, dataDirs, startOpt);
-
// adjust
-
this.dnRegistration.setStorageInfo(storage);
-
// initialize data node internal structure
-
this.data = new FSDataset(storage, conf);
-
// 創建數據存儲KV 的對象 這個后面還要再細分析。
-
}
-
this.threadGroup = new ThreadGroup("dataXceiverServer");
-
this.dataXceiverServer = new Daemon(threadGroup,
-
new DataXceiverServer(ss, conf, this));
-
this.threadGroup.setDaemon(true); // 創建流接口服務器 DataXServer 這個需要后面再分析
-
ipcServer = RPC.getServer(this, ipcAddr.getHostName(), ipcAddr.getPort(),
-
conf.getInt("dfs.datanode.handler.count", 3), false, conf, //創建IPC服務器。
-
blockTokenSecretManager);
-
-
}
上面代碼分析中我們留了兩個之后還要分析的方法,現在來看一下。
第一個是FsdataSet.
我們需要考慮的問題是 hadoop以64M大小為單位作為一個文件的大小 存儲在linux 文件系統 上。 當文件多了,就有一個效率問題,同一個文件夾下有過多的文件
和文件目錄過深都不利於檢索速度(這個與linux文件系統inode結構有關,這里暫不討論這個) 。所以我們這里要設計一個結構 需要創建文件夾 但文件夾目錄不能過深。
此外 hadoop 還考慮了一個優化問題,如果一個datanode節點上插有多塊硬盤的話,怎么提高並行吞吐量。好,有了這些我們來看具體實現。
一個FSdir對於着一個存儲目錄,一個FSVolume 對應着一個用戶配置的數據目錄(應該為一個磁盤最好) FsVolumeSet存儲着所有的FSVolume對象。
在FsDataSet中海油一個最重要的成員變量,volumeMap 就是這個成員變量存儲了 每一個Block 和它對應的存儲路徑等信息。
-
HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;
第二個是 DataXServer
當往數據節點中填入數據或者數據節點之間做負載均衡的時候顯然無法 使用Hdoop IPC 因為hadoop的IPC 在socket之上封裝了方法的調用,如果在這之上封裝一個大規模數據傳輸的方法,顯然效率上不如直接用socket通信。
-
ServerSocket ss;
-
if(secureResources == null) {
-
ss = (socketWriteTimeout > 0) ?
-
ServerSocketChannel.open().socket() : new ServerSocket();
-
Server.bind(ss, socAddr, 0);
-
} else {
-
ss = resources.getStreamingSocket();
-
}
-
ss.setReceiveBufferSize(DEFAULT_DATA_SOCKET_SIZE);
-
//初始化處理類dataXceiverServer
-
this.threadGroup = new ThreadGroup("dataXceiverServer");
-
this.dataXceiverServer = new Daemon(threadGroup, new DataXceiverServer(ss, conf, this));
-
this.threadGroup.setDaemon(true); // auto destroy when empty
DataXceiverServer 是個線程 我們看一下它的ruan方法
-
Socket s = ss.accept();
-
s.setTcpNoDelay(true);
-
new Daemon(datanode.threadGroup,
-
new DataXceiver(s, datanode, this)).start();
我們再看一下 DataXceiver的run方法
-
public void run() {
-
DataInputStream in=null;
-
try {
-
in = new DataInputStream(
-
new BufferedInputStream(NetUtils.getInputStream(s),
-
SMALL_BUFFER_SIZE));
-
short version = in.readShort();
-
if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
-
throw new IOException( "Version Mismatch" );
-
}
-
boolean local = s.getInetAddress().equals(s.getLocalAddress());
-
byte op = in.readByte();
-
// Make sure the xciver count is not exceeded
-
int curXceiverCount = datanode.getXceiverCount();
-
if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
-
throw new IOException("xceiverCount " + curXceiverCount
-
+ " exceeds the limit of concurrent xcievers "
-
+ dataXceiverServer.maxXceiverCount);
-
}
-
long startTime = DataNode.now();
-
switch ( op ) {
-
case DataTransferProtocol.OP_READ_BLOCK:
-
readBlock( in );
-
datanode.myMetrics.addReadBlockOp(DataNode.now() - startTime);
-
if (local)
-
datanode.myMetrics.incrReadsFromLocalClient();
-
else
-
datanode.myMetrics.incrReadsFromRemoteClient();
-
break;
-
case DataTransferProtocol.OP_WRITE_BLOCK:
-
writeBlock( in );
-
datanode.myMetrics.addWriteBlockOp(DataNode.now() - startTime);
-
if (local)
-
datanode.myMetrics.incrWritesFromLocalClient();
-
else
-
datanode.myMetrics.incrWritesFromRemoteClient();
-
break;
-
case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
-
replaceBlock(in);
-
datanode.myMetrics.addReplaceBlockOp(DataNode.now() - startTime);
-
break;
-
case DataTransferProtocol.OP_COPY_BLOCK:
-
// for balancing purpose; send to a proxy source
-
copyBlock(in);
-
datanode.myMetrics.addCopyBlockOp(DataNode.now() - startTime);
-
break;
-
case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
-
getBlockChecksum(in);
-
datanode.myMetrics.addBlockChecksumOp(DataNode.now() - startTime);
-
break;
-
default:
-
throw new IOException("Unknown opcode " + op + " in data stream");
-
}
-
} catch (Throwable t) {
-
LOG.error(datanode.dnRegistration + ":DataXceiver",t);
-
} finally {
-
LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
-
+ datanode.getXceiverCount());
-
IOUtils.closeStream(in);
-
IOUtils.closeSocket(s);
-
dataXceiverServer.childSockets.remove(s);
-
}
-
}
重點在這句
-
byte op = in.readByte();
應該是根據流中的事先約定 來 第一個字節 來決定是