本文主要是使用Java RMI 實現一個簡單的GFS(谷歌文件系統,google file system),這里提供演示運行視頻、系統實現以及源代碼相關。
[為了更好的閱讀以及查看其他篇章,請查看原文:https://www.cnblogs.com/maogen/p/gfs_2.html]
🧨 大年初二,走親訪友🏮 🧧
🏮祝大家新年快樂!🏮
ʰᵅᵖᵖʸ ⁿeᵚ ʸᵉᵅʳ家人閑坐 燈火可親
辭舊迎新 新年可期
系統整體介紹、背景以及設計信息:
介紹篇:
https://www.cnblogs.com/maogen/p/gfs_0.html
背景與設計篇:
https://www.cnblogs.com/maogen/p/gfs_1.html
作者:晨星1032-博客園:https://www.cnblogs.com/maogen/
演示運行視頻
[MyGFS演示--Java RMI 實現一個簡單的GFS_騰訊視頻]:https://v.qq.com/x/page/h3226nudkk0.html
1. 系統組織結構
如圖所示,整個MyGFS分布式文件系統由SPI、Common API,Master,ChunkServer和Client五個模塊組成:
-
SPI
:定義Master與ChunkServer需要實現的接口,並實現存放Chunk及其信息的抽象類。MasterApi與ChunkServerApi均繼承自Remote接口,標識着這是一個遠程接口。 -
Master
:實現遠程接口實現類MasterEngine,繼承自UnicastRemoteObject類並實現MasterApi接口,負責與Client的通信與對ChunkServer的管理。 -
ChunkServer
:實現遠程接口實現類ChunkServerEngine,繼承自UnicastRemoteObject類並實現ChunkServerApi接口,接收Master的調度並負責對Chunk的管理。 -
Client
:使用分布式文件系統的本地端,通過與Master直接通信來間接地對文件系統進行操作。 -
Common
:該模塊負責實現工具類與配置文件,例如生成UUID,將文件讀入內存等操作。
其具體的三方通訊流程如下圖所示:

2. Master模塊
2.1 心跳機制
使用Java RMI方式,在Master端檢測每個ChunkServer是否在線。具體操作如下:
-
通過RMI方式來檢測Chunk服務器的心跳,直接以try-catch方式判斷。若服務器宕機則加入failedChunkServerList中。
- 檢查正常ChunkServer上的所有Chunk的Hash值,若不一致則加入到Chunk失敗列表中。
-
最后進行錯誤處理。
public synchronized void heartbeatScan() {
System.out.println("heartbeat checking...");
// 錯誤Chunk列表
Map<String, List<ChunkInfo>> failedChunkMap = new LinkedHashMap<>();
// 錯誤Server列表
List<String> failedChunkServerList = new ArrayList<>();
ChunkServerApi chunkServerApi;
Map<Long, String> hashMap;
int index = 0;
for(String chunkServer : chunkServerList) {
// 使用RMI檢測心跳
try{
chunkServerApi = (ChunkServerApi) Naming.lookup("rmi://" + chunkServer + "/chunkServer");
// 獲取Hash,用來檢測Chunk錯誤
hashMap = chunkServerApi.getHashMap();
} catch (Exception e) {
// 服務器宕機
System.out.println("ChunkServer: " + chunkServer + " is down!");
failedChunkServerList.add(chunkServer);
}
try {
List<ChunkInfo> failedList = new ArrayList<>();
for (ChunkInfo chunkInfo : serverInfoMap.get(chunkServer)) {
String hash = hashMap.get(chunkInfo.getChunk().getChunkId());
if (hash == null || !hash.equals(chunkInfo.getHash())) {
System.out.println("chunk:" + chunkInfo.getChunk().getChunkFileName() + " ERROR!");
chunkInfo.removeReplicaServerName(chunkServer);
int idx = nameNodeList.indexOf(chunkInfo.getNameNode());
nameNodeList.get(idx).setChunkInfo(chunkInfo, chunkInfo.getSeq());
serverInfoMap.get(chunkServer).set(index, chunkInfo);
failedList.add(chunkInfo);
}
index++;
}
failedChunkMap.put(chunkServer, failedList);
}catch (Exception e) {
System.out.println("檢測chunk失敗...");
}
}//for
// 錯誤處理
handleFaults(failedChunkMap, failedChunkServerList);
System.out.println("heartbeat check end...");
}
2.2 故障恢復和容錯機制
若ChunkServer掉線,則需分配新的服務器負載均衡,並將取出該ChunkServer上對應的Chunk文件,對其進行復制。
System.out.println("正在處理宕機的服務器:" + serverName + "...");
// 當宕機服務器沒有Chunk時,直接去除
if(serverInfoMap.get(serverName).size() == 0) {
// 去除此服務器
chunkServerList.remove(serverName);
System.out.println("處理宕機服務器成功");
}
for(ChunkInfo chunkInfo : serverInfoMap.get(serverName)) {
System.out.println("備份failed chunkServer" + serverName + "中的Chunk "
+ chunkInfo.getChunk().getChunkFileName());
try {
chunkServerList.remove(serverName);
chunkInfo.removeReplicaServerName(serverName);
// 服務器節點分配
allocateNode(chunkInfo, chunkInfo.getFirstReplicaServerName());
// 處理NameNode
int idx = nameNodeList.indexOf(chunkInfo.getNameNode());
nameNodeList.get(idx).setChunkInfo(chunkInfo, chunkInfo.getSeq());
if(chunkInfo.getFirstReplicaServerName() == null) {
continue;
}
chunkServerApi = (ChunkServerApi) Naming.lookup(
"rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
chunkServerApi.backupChunk(chunkInfo.getChunk(), chunkInfo.getLastReplicaServerName());
System.out.println("處理宕機服務器成功");
}catch (Exception e) {
System.out.println("處理宕機服務器失敗!");
e.printStackTrace();
}
}
若該ChunkServer上的Chunk文件的Hash數據與Master上不一致則使用該Chunk文件的副本對其進行替換。
// chunk failed! 本地文件恢復
for(Map.Entry<String, List<ChunkInfo>> failedChunk : failedChunkMap.entrySet()) {
String serverName = failedChunk.getKey();
List<ChunkInfo> chunkInfos = failedChunk.getValue();
for(ChunkInfo chunkInfo : chunkInfos) {
System.out.println("從服務器" + serverName + "上正在恢復錯誤的Chunk:" + chunkInfo.getChunk().getChunkFileName());
try {
if(chunkInfo.getFirstReplicaServerName() == null ||
chunkInfo.getFirstReplicaServerName().equals(serverName)){
System.out.println("沒有備份,恢復失敗!");
continue;
}
chunkInfo.setLastReplicaServerName(serverName);
int idx = nameNodeList.indexOf(chunkInfo.getNameNode());
nameNodeList.get(idx).setChunkInfo(chunkInfo, chunkInfo.getSeq());
chunkServerApi = (ChunkServerApi) Naming.lookup(
"rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
chunkServerApi.backupChunk(chunkInfo.getChunk(), serverName);
System.out.println(chunkInfo.getChunk().getChunkFileName() + "恢復成功!");
}catch (Exception e) {
System.out.println("恢復失敗!");
e.printStackTrace();
}
}
}
運行截圖如圖所示:
3. ChunkServer模塊
3.1 內存命中機制
public class ChunkServerMemory {
private final LinkedList<ChunkMemory> memoryList;
private final int maxContain;
public ChunkServerMemory(int maxContain) {
this.memoryList = new LinkedList<>();
this.maxContain = maxContain;
}
public void push(Chunk chunk,byte[] data) {
if(memoryList.size()>maxContain){
memoryList.removeLast();
}
memoryList.push(new ChunkMemory(chunk,data));
}
public ChunkMemory search(Chunk chunk){
ChunkMemory res=null;
for (int i = 0; i < memoryList.size(); i++) {
if(memoryList.get(i).isMatch(chunk)){
res=memoryList.get(i);
moveToHead(i);
System.out.println(chunk.getChunkFileName()+"內存命中");
}
}
return res;
}
private void moveToHead(int i){
ChunkMemory tmp=memoryList.get(i);
memoryList.remove(i);
memoryList.push(tmp);
}
public void remove(long chunkId){
for (int i = 0; i < memoryList.size(); i++) {
if(memoryList.get(i).isMatch(chunkId)){
memoryList.remove(i);
return;
}
}
}
}
3.2 狀態維護
一分鍾更新一次本地Chunk的Hash值。
try{
Thread.sleep(60000);
System.out.println("開始檢查Chunk信息");
for(Long chunkId : chunkIdList) {
String md5Str = SecurityUtil.getMd5(filePath + getChunkName(chunkId));
if(md5Str == null) {
md5Str = "check error: no file!";
}
chunkHash.put(chunkId, md5Str);
}
System.out.println("檢查Chunk信息結束");
} catch (Exception e) {
e.printStackTrace();
break;
}
3.3副本管理
GFS默認Chunk主副本三個,但為了實際演示方便,這里設置為主副本各一個,下圖為windows服務器和Linux服務器上的存儲。
4. Client模塊
4.1 上傳
在Client端上傳文件時,會先將文件相關信息添加到Master中,同時Master會分配服務器到各個Chunk文件,然后Client通過分配的信息向指定的ChunkServer進行傳送數據流。
public void upLoadFile(String fileAddr) {
System.out.println("文件正在上傳...");
try{
int length, seq = 0;
byte[] buffer = new byte[CHUNK_SIZE];
File file = new File(fileAddr);
// 向Master添加該Name結點
masterApi.addNameNode(file.getName());
InputStream input = new FileInputStream(file);
input.skip(0);
while ((length = input.read(buffer, 0, CHUNK_SIZE)) > 0) {
byte[] upLoadBytes = new byte[length];
System.arraycopy(buffer, 0, upLoadBytes, 0, length);
String hash = SecurityUtil.getMd5(upLoadBytes);
uploadChunk(file.getName(), seq, length, upLoadBytes, hash);
seq++;
}
input.close();
System.out.println("文件已上傳!");
} catch (Exception e) {
System.out.println("文件上傳失敗");
System.out.println(e.getLocalizedMessage());
}
}
演示效果如圖所示,分別為Client端和ChunkServer端的情況。

4.2 下載
用戶在Client端下載文件時,會先向Master請求所下載文件的信息,然后通過Master返回的Chunk所在ChunkServer信息進行數據請求獲取。
public String downloadFile(String fileName) throws Exception {
System.out.println("文件正在下載...");
String fileAddr = prefixPath + "new_" + fileName;
File localFile = new File(fileAddr);
OutputStream output = new FileOutputStream(localFile);
List<ChunkInfo> chunkInfoList = masterApi.getChunkInfos(fileName);
for(ChunkInfo chunkInfo : chunkInfoList) {
output.write(downloadChunk(chunkInfo.getChunk(), chunkInfo.getFirstReplicaServerName()));
}
output.close();
return fileAddr;
}
4.3 追加
每一個Chunk默認最大為64Mb,追加操作需要對最后一個Chunk的剩余空間進行判斷:
- 若最后一個Chunk剩余空間 > 所追加文件大小,則直接添加最后一個即可。
- 若最后一個Chunk剩余空間 < 所追加文件大小,則首先將最后一個Chunk空間加滿,然后再新建Chunk直到 > 所追加文件大小
public void appendFile(String fileName, String appendFileAddr) throws Exception {
List<ChunkInfo> chunkInfoList = masterApi.getChunkInfos(fileName);
if(chunkInfoList.isEmpty()) {
System.out.println("Master找不到該文件!");
return;
}
System.out.println("文件正在進行修改...");
byte[] bytes = ConvertUtil.file2Byte(appendFileAddr);
// 獲取最后一個Chunk信息
int num = chunkInfoList.size();
ChunkInfo chunkInfo = chunkInfoList.get(num-1);
int chunkLen = (int)chunkInfo.getChunk().getByteSize();
int appendLen = bytes.length;
int len = CHUNK_SIZE - chunkLen;
// 可以繼續追加
if(len >= appendLen) {
byte[] newBytes = new byte[appendLen];
System.arraycopy(bytes, 0, newBytes, 0, appendLen);
chunkServerApi = (ChunkServerApi) Naming.lookup("rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
chunkServerApi.appendChunk(chunkInfo.getChunk(), newBytes, chunkInfo.getLastReplicaServerName());
masterApi.updateNameNode(fileName, chunkLen + appendLen);
}else {
// 需要新建Chunk
// 最后一個Chunk剩余大小->加滿
byte[] leftBytes = new byte[len];
System.arraycopy(bytes, 0, leftBytes, 0, len);
// 更新chunkServer
chunkServerApi = (ChunkServerApi) Naming.lookup("rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
chunkServerApi.appendChunk(chunkInfo.getChunk(), leftBytes, chunkInfo.getLastReplicaServerName());
// Master更新
masterApi.updateNameNode(fileName, CHUNK_SIZE);
// 其余處理
String hash;
while (len + CHUNK_SIZE <= appendLen) {
leftBytes = new byte[CHUNK_SIZE];
System.arraycopy(bytes, len, leftBytes, 0, CHUNK_SIZE);
hash = SecurityUtil.getMd5(leftBytes);
uploadChunk(fileName, num, CHUNK_SIZE, leftBytes, hash);
len += CHUNK_SIZE;
num++;
}
if (len < appendLen) {
int lastSize = appendLen - len;
leftBytes = new byte[lastSize];
System.arraycopy(bytes, len, leftBytes, 0, lastSize);
hash = SecurityUtil.getMd5(leftBytes);
uploadChunk(fileName, num, lastSize, leftBytes, hash);
}
}
System.out.println("文件已修改!");
}
4.4 刪除
刪除文件僅將Master上的信息進行刪除,ChunkServer本地上的文件未刪除(軟刪除)
public void deleteFile(String fileName) throws Exception {
masterApi.deleteNameNode(fileName);
System.out.println("文件刪除成功!");
}
4.5 文件列表
public void getFileList() throws Exception {
List<String> fileList = masterApi.getFileList();
if(fileList.size() == 0) {
System.out.println("空");
}
for(String fileName : fileList) {
System.out.println(fileName);
}
}
源代碼
具體詳情請查看源代碼
daisy-RMI實現GFS:https://gitee.com/maogen_ymg/daisy
系統整體介紹、背景以及設計信息,盡在其他篇章:
介紹篇:
https://www.cnblogs.com/maogen/p/gfs_0.html
背景與設計篇:
https://www.cnblogs.com/maogen/p/gfs_1.html
作者:晨星1032-博客園:https://www.cnblogs.com/maogen/