Java RMI 實現一個簡單的GFS(谷歌文件系統)——演示與實現篇


本文主要是使用Java RMI 實現一個簡單的GFS(谷歌文件系統,google file system),這里提供演示運行視頻、系統實現以及源代碼相關。

[為了更好的閱讀以及查看其他篇章,請查看原文:https://www.cnblogs.com/maogen/p/gfs_2.html]

🧨 大年初二,走親訪友🏮 🧧

🏮祝大家新年快樂!🏮
ʰᵅᵖᵖʸ ⁿeᵚ ʸᵉᵅʳ

家人閑坐 燈火可親
辭舊迎新 新年可期

系統整體介紹、背景以及設計信息:

介紹篇:https://www.cnblogs.com/maogen/p/gfs_0.html

背景與設計篇:https://www.cnblogs.com/maogen/p/gfs_1.html

作者:晨星1032-博客園:https://www.cnblogs.com/maogen/


演示運行視頻

[MyGFS演示--Java RMI 實現一個簡單的GFS_騰訊視頻]:https://v.qq.com/x/page/h3226nudkk0.html

1. 系統組織結構

3_gfs_1

如圖所示,整個MyGFS分布式文件系統由SPI、Common API,Master,ChunkServer和Client五個模塊組成:

  • SPI:定義Master與ChunkServer需要實現的接口,並實現存放Chunk及其信息的抽象類。MasterApi與ChunkServerApi均繼承自Remote接口,標識着這是一個遠程接口。

  • Master:實現遠程接口實現類MasterEngine,繼承自UnicastRemoteObject類並實現MasterApi接口,負責與Client的通信與對ChunkServer的管理。

  • ChunkServer:實現遠程接口實現類ChunkServerEngine,繼承自UnicastRemoteObject類並實現ChunkServerApi接口,接收Master的調度並負責對Chunk的管理。

  • Client:使用分布式文件系統的本地端,通過與Master直接通信來間接地對文件系統進行操作。

  • Common:該模塊負責實現工具類與配置文件,例如生成UUID,將文件讀入內存等操作。

其具體的三方通訊流程如下圖所示:

2. Master模塊

2.1 心跳機制

​ 使用Java RMI方式,在Master端檢測每個ChunkServer是否在線。具體操作如下:

  1. 通過RMI方式來檢測Chunk服務器的心跳,直接以try-catch方式判斷。若服務器宕機則加入failedChunkServerList中。

    1. 檢查正常ChunkServer上的所有Chunk的Hash值,若不一致則加入到Chunk失敗列表中。
  2. 最后進行錯誤處理。

public synchronized void heartbeatScan() {
        System.out.println("heartbeat checking...");
        // 錯誤Chunk列表
        Map<String, List<ChunkInfo>> failedChunkMap = new LinkedHashMap<>();
        // 錯誤Server列表
        List<String> failedChunkServerList = new ArrayList<>();

        ChunkServerApi chunkServerApi;
        Map<Long, String> hashMap;
        int index = 0;
        for(String chunkServer : chunkServerList) {
            // 使用RMI檢測心跳
            try{
                chunkServerApi = (ChunkServerApi) Naming.lookup("rmi://" + chunkServer + "/chunkServer");
                // 獲取Hash,用來檢測Chunk錯誤
                hashMap = chunkServerApi.getHashMap();
            } catch (Exception e) {
                // 服務器宕機
                System.out.println("ChunkServer: " + chunkServer + " is down!");
                failedChunkServerList.add(chunkServer);
            }

             try {
                 List<ChunkInfo> failedList = new ArrayList<>();
                 for (ChunkInfo chunkInfo : serverInfoMap.get(chunkServer)) {
                     String hash = hashMap.get(chunkInfo.getChunk().getChunkId());

                     if (hash == null || !hash.equals(chunkInfo.getHash())) {
                         System.out.println("chunk:" + chunkInfo.getChunk().getChunkFileName() + " ERROR!");
                         chunkInfo.removeReplicaServerName(chunkServer);
                         int idx = nameNodeList.indexOf(chunkInfo.getNameNode());

                         nameNodeList.get(idx).setChunkInfo(chunkInfo, chunkInfo.getSeq());
                         serverInfoMap.get(chunkServer).set(index, chunkInfo);

                         failedList.add(chunkInfo);
                     }
                     index++;
                 }
                 failedChunkMap.put(chunkServer, failedList);
             }catch (Exception e) {
                 System.out.println("檢測chunk失敗...");
             }
        }//for

        // 錯誤處理
        handleFaults(failedChunkMap, failedChunkServerList);
        System.out.println("heartbeat check end...");
    }

2.2 故障恢復和容錯機制

​ 若ChunkServer掉線,則需分配新的服務器負載均衡,並將取出該ChunkServer上對應的Chunk文件,對其進行復制。

System.out.println("正在處理宕機的服務器:" + serverName + "...");
// 當宕機服務器沒有Chunk時,直接去除
if(serverInfoMap.get(serverName).size() == 0) {
    // 去除此服務器
    chunkServerList.remove(serverName);
    System.out.println("處理宕機服務器成功");
}

for(ChunkInfo chunkInfo : serverInfoMap.get(serverName)) {
    System.out.println("備份failed chunkServer" + serverName + "中的Chunk "
                       + chunkInfo.getChunk().getChunkFileName());
    try {
        chunkServerList.remove(serverName);
        chunkInfo.removeReplicaServerName(serverName);
        // 服務器節點分配
        allocateNode(chunkInfo, chunkInfo.getFirstReplicaServerName());
        // 處理NameNode
        int idx = nameNodeList.indexOf(chunkInfo.getNameNode());
        nameNodeList.get(idx).setChunkInfo(chunkInfo, chunkInfo.getSeq());

        if(chunkInfo.getFirstReplicaServerName() == null) {
            continue;
        }

        chunkServerApi = (ChunkServerApi) Naming.lookup(
            "rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
        chunkServerApi.backupChunk(chunkInfo.getChunk(), chunkInfo.getLastReplicaServerName());
        System.out.println("處理宕機服務器成功");
    }catch (Exception e) {
        System.out.println("處理宕機服務器失敗!");
        e.printStackTrace();
    }
}

​ 若該ChunkServer上的Chunk文件的Hash數據與Master上不一致則使用該Chunk文件的副本對其進行替換。

// chunk failed! 本地文件恢復
for(Map.Entry<String, List<ChunkInfo>> failedChunk : failedChunkMap.entrySet()) {
    String serverName = failedChunk.getKey();
    List<ChunkInfo> chunkInfos = failedChunk.getValue();
    for(ChunkInfo chunkInfo : chunkInfos) {
        System.out.println("從服務器" + serverName + "上正在恢復錯誤的Chunk:" + chunkInfo.getChunk().getChunkFileName());
        try {
            if(chunkInfo.getFirstReplicaServerName() == null || 
               chunkInfo.getFirstReplicaServerName().equals(serverName)){
                System.out.println("沒有備份,恢復失敗!");
                continue;
            }

            chunkInfo.setLastReplicaServerName(serverName);
            int idx = nameNodeList.indexOf(chunkInfo.getNameNode());
            nameNodeList.get(idx).setChunkInfo(chunkInfo, chunkInfo.getSeq());

            chunkServerApi = (ChunkServerApi) Naming.lookup(
                "rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
            chunkServerApi.backupChunk(chunkInfo.getChunk(), serverName);
            System.out.println(chunkInfo.getChunk().getChunkFileName() + "恢復成功!");
        }catch (Exception e) {
            System.out.println("恢復失敗!");
            e.printStackTrace();
        }
    }
}

​ 運行截圖如圖所示:

3_gfs_3

3. ChunkServer模塊

3.1 內存命中機制

public class ChunkServerMemory {
   private final LinkedList<ChunkMemory> memoryList;
   private final int maxContain;

   public ChunkServerMemory(int maxContain) {
       this.memoryList = new LinkedList<>();
       this.maxContain = maxContain;
   }

   public void push(Chunk chunk,byte[] data) {
       if(memoryList.size()>maxContain){
           memoryList.removeLast();
       }
       memoryList.push(new ChunkMemory(chunk,data));
   }

   public ChunkMemory search(Chunk chunk){
       ChunkMemory res=null;
       for (int i = 0; i < memoryList.size(); i++) {
           if(memoryList.get(i).isMatch(chunk)){
               res=memoryList.get(i);
               moveToHead(i);
               System.out.println(chunk.getChunkFileName()+"內存命中");
           }
       }
       return res;
   }

   private void moveToHead(int i){
       ChunkMemory tmp=memoryList.get(i);
       memoryList.remove(i);
       memoryList.push(tmp);
   }

   public void remove(long chunkId){
       for (int i = 0; i < memoryList.size(); i++) {
           if(memoryList.get(i).isMatch(chunkId)){
               memoryList.remove(i);
               return;
           }
       }
   }
}

3.2 狀態維護

​ 一分鍾更新一次本地Chunk的Hash值。

try{
   Thread.sleep(60000);
   System.out.println("開始檢查Chunk信息");
   for(Long chunkId : chunkIdList) {
       String md5Str = SecurityUtil.getMd5(filePath + getChunkName(chunkId));
       if(md5Str == null) {
           md5Str = "check error: no file!";
       }
       chunkHash.put(chunkId, md5Str);
   }
   System.out.println("檢查Chunk信息結束");
} catch (Exception e) {
   e.printStackTrace();
   break;
}

3.3副本管理

​ GFS默認Chunk主副本三個,但為了實際演示方便,這里設置為主副本各一個,下圖為windows服務器和Linux服務器上的存儲。

3_gfs_4

4. Client模塊

4.1 上傳

​ 在Client端上傳文件時,會先將文件相關信息添加到Master中,同時Master會分配服務器到各個Chunk文件,然后Client通過分配的信息向指定的ChunkServer進行傳送數據流。

public void upLoadFile(String fileAddr) {
   System.out.println("文件正在上傳...");
   try{
       int length, seq = 0;
       byte[] buffer = new byte[CHUNK_SIZE];

       File file = new File(fileAddr);
       // 向Master添加該Name結點
       masterApi.addNameNode(file.getName());

       InputStream input = new FileInputStream(file);
       input.skip(0);
       while ((length = input.read(buffer, 0, CHUNK_SIZE)) > 0) {
           byte[] upLoadBytes = new byte[length];
           System.arraycopy(buffer, 0, upLoadBytes, 0, length);
           String hash = SecurityUtil.getMd5(upLoadBytes);
           uploadChunk(file.getName(), seq, length, upLoadBytes, hash);
           seq++;
       }
       input.close();
       System.out.println("文件已上傳!");
   } catch (Exception e) {
       System.out.println("文件上傳失敗");
       System.out.println(e.getLocalizedMessage());
   }
}

​ 演示效果如圖所示,分別為Client端和ChunkServer端的情況。

4.2 下載

​ 用戶在Client端下載文件時,會先向Master請求所下載文件的信息,然后通過Master返回的Chunk所在ChunkServer信息進行數據請求獲取。

public String downloadFile(String fileName) throws Exception {
   System.out.println("文件正在下載...");
   String fileAddr = prefixPath + "new_" + fileName;
   File localFile = new File(fileAddr);
   OutputStream output = new FileOutputStream(localFile);

   List<ChunkInfo> chunkInfoList = masterApi.getChunkInfos(fileName);
   for(ChunkInfo chunkInfo : chunkInfoList) {
       output.write(downloadChunk(chunkInfo.getChunk(), chunkInfo.getFirstReplicaServerName()));
   }
   output.close();

   return fileAddr;
}

4.3 追加

​ 每一個Chunk默認最大為64Mb,追加操作需要對最后一個Chunk的剩余空間進行判斷:

  1. 若最后一個Chunk剩余空間 > 所追加文件大小,則直接添加最后一個即可。
  2. 若最后一個Chunk剩余空間 < 所追加文件大小,則首先將最后一個Chunk空間加滿,然后再新建Chunk直到 > 所追加文件大小
public void appendFile(String fileName, String appendFileAddr) throws Exception {
    List<ChunkInfo> chunkInfoList = masterApi.getChunkInfos(fileName);
    if(chunkInfoList.isEmpty()) {
        System.out.println("Master找不到該文件!");
        return;
    }
    System.out.println("文件正在進行修改...");

    byte[] bytes = ConvertUtil.file2Byte(appendFileAddr);

    // 獲取最后一個Chunk信息
    int num = chunkInfoList.size();
    ChunkInfo chunkInfo = chunkInfoList.get(num-1);
    int chunkLen = (int)chunkInfo.getChunk().getByteSize();
    int appendLen = bytes.length;

    int len = CHUNK_SIZE - chunkLen;
    // 可以繼續追加
    if(len >= appendLen) {
        byte[] newBytes = new byte[appendLen];
        System.arraycopy(bytes, 0, newBytes, 0, appendLen);
        chunkServerApi = (ChunkServerApi) Naming.lookup("rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
        chunkServerApi.appendChunk(chunkInfo.getChunk(), newBytes, chunkInfo.getLastReplicaServerName());
        masterApi.updateNameNode(fileName, chunkLen + appendLen);
    }else {
        // 需要新建Chunk
        // 最后一個Chunk剩余大小->加滿
        byte[] leftBytes = new byte[len];
        System.arraycopy(bytes, 0, leftBytes, 0, len);
        // 更新chunkServer
        chunkServerApi = (ChunkServerApi) Naming.lookup("rmi://" + chunkInfo.getFirstReplicaServerName() + "/chunkServer");
        chunkServerApi.appendChunk(chunkInfo.getChunk(), leftBytes, chunkInfo.getLastReplicaServerName());
        // Master更新
        masterApi.updateNameNode(fileName, CHUNK_SIZE);

        // 其余處理
        String hash;
        while (len + CHUNK_SIZE <= appendLen) {
            leftBytes = new byte[CHUNK_SIZE];
            System.arraycopy(bytes, len, leftBytes, 0, CHUNK_SIZE);
            hash = SecurityUtil.getMd5(leftBytes);
            uploadChunk(fileName, num, CHUNK_SIZE, leftBytes, hash);
            len += CHUNK_SIZE;
            num++;
        }
        if (len < appendLen) {
            int lastSize = appendLen - len;
            leftBytes = new byte[lastSize];
            System.arraycopy(bytes, len, leftBytes, 0, lastSize);
            hash = SecurityUtil.getMd5(leftBytes);
            uploadChunk(fileName, num, lastSize, leftBytes, hash);
        }
    }
    System.out.println("文件已修改!");
}

4.4 刪除

​ 刪除文件僅將Master上的信息進行刪除,ChunkServer本地上的文件未刪除(軟刪除)

public void deleteFile(String fileName) throws Exception {
    masterApi.deleteNameNode(fileName);
    System.out.println("文件刪除成功!");
}

4.5 文件列表

public void getFileList() throws Exception {
    List<String> fileList = masterApi.getFileList();
    if(fileList.size() == 0) {
        System.out.println("空");
    }
    for(String fileName : fileList) {
        System.out.println(fileName);
    }
}

源代碼

​ 具體詳情請查看源代碼

​ daisy-RMI實現GFS:https://gitee.com/maogen_ymg/daisy


系統整體介紹、背景以及設計信息,盡在其他篇章:

介紹篇:https://www.cnblogs.com/maogen/p/gfs_0.html

背景與設計篇:https://www.cnblogs.com/maogen/p/gfs_1.html

作者:晨星1032-博客園:https://www.cnblogs.com/maogen/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM