轉自:http://blog.csdn.net/androidlushangderen/article/details/52850349
在HDFS中,當每次客戶端用戶往某個文件中寫入數據的時候,為了保持數據的一致性,此時其它客戶端程序是不允許向此文件同時寫入數據的。那么HDFS是如何做到這一點的呢?答案是租約(Lease)。換句話說,租約是HDFS給予客戶端的一個寫文件操作的臨時許可證,無此證件者將不被允許操作此文件。本文我們將要深入分析HDFS內部的租約機制,包括租約的添加、移除、管理操作等等。
HDFS租約的概念
HDFS租約可能很多使用HDFS的人都或多或少都知道一些,大致的理解一般如下:“客戶端在每次讀寫HDFS文件的時候獲取租約對文件進行讀寫,文件讀取完畢了,然后再釋放此租約”。但是是否有人仔細研究過這個租約內部到底包含了什么信息呢,它與租約持有者也就是客戶端用戶是一種怎樣的邏輯關系呢?首先我們需要了解的就是這個問題。下面是HDFS租約的相關定義:
- 每個客戶端用戶持有一個租約。
- 每個租約內部包含有一個租約持有者信息,還有此租約對應的文件Id列表,表示當前租約持有者正在寫這些文件Id對應的文件。
- 每個租約內包含有一個最新近更新時間,最近更新時間將會決定此租約是否已過期。過期的租約會導致租約持有者無法繼續執行寫數據到文件中,除非進行租約的更新。
綜合上述3點,租約的結構關系如圖1-1所示。
圖 1-1 租約內部結構
租約類代碼中的定義如下:
class Lease { // 租約持有者 private final String holder; // 最近更新時間 private long lastUpdate; // 當前租約持有者打開的文件 private final HashSet<Long> files = new HashSet<>(); ...
每次當客戶端用戶新寫入一個文件的時候,它會將此文件Id加入到它所對應的租約中,同時更新lastUpdateTime值。講述完租約的概念,下面我們在來看租約的管理。
HDFS租約的管理
在HDFS中,每天會有許許多多的應用程序在讀寫文件,於是就會有各個租約的生成。那么這些租約是如何管理的呢?如果有些客戶端用戶寫某文件后未及時關閉此文件,導致此租約一直未釋放,從而造成其他用戶無法對此文件進行寫操作,面對這種情況,HDFS中的做法是怎樣的呢?以上提到的問題就是租約管理的內容了。
LeaseManager租約管理器
HDFS租約管理的操作集中在一個類上:LeaseManager。它與CacheManager(緩存管理類),SnapshotManager(快照管理類)類似,是一個中心管理類,運行在Active NameNode的服務中。租約類的定義就是在LeaseManager中的。在LeaseManager租約管理器中,它所做的事情主要歸納為兩類。
第一個,維護HDFS內部當前所有的租約,並以多種映射關系進行保存。保存的映射關系分為如下3種:
- 租約持有者對租約的映射關系。
- 文件Id對租約的映射關系。
- 按照時間排序進行租約集合的保存,此關系並不是一種映射關系。
以上3種關系的代碼定義如下:
public class LeaseManager { ... // 租約持有者對租約的映射圖 private final SortedMap<String, Lease> leases = new TreeMap<>(); // 按照時間進行排序的租約隊列 private final PriorityQueue<Lease> sortedLeases = new PriorityQueue<>(512, new Comparator<Lease>() { @Override public int compare(Lease o1, Lease o2) { return Long.signum(o1.getLastUpdate() - o2.getLastUpdate()); } }); // 文件Id對租約的映射圖 private final HashMap<Long, Lease> leasesById = new HashMap<>(); ...
圖形展示效果如圖1-2所示。
圖 1-2 HDFS內部保存的租約映射關系
HDFS保存多種映射關系是為了方便租約的多維度查詢,至少目前來看,按照租約持有者,正在寫的文件Id都可以直接查到對應的租約對象。
在LeaseManager租約管理器中,還有一件重要的事情是定期釋放過期的租約對象。這個操作可以避免文件租約長期不釋放導致其他客戶端文件無法寫文件的問題。
因為在某些異常情況下,客戶端程序可能在寫完文件后,沒有正常關閉文件,導致文件始終處於正在寫的狀態中,此文件在對應的租約中沒有被真正的移除掉。
LeaseManager中的解決辦法是啟動一個定時的監控線程,來釋放過期的租約。周期檢測線程主方法如下:
public void run() { for(; shouldRunMonitor && fsnamesystem.isRunning(); ) { boolean needSync = false; try { fsnamesystem.writeLockInterruptibly(); try { // 如果當前NameNode已經離開安全模式 if (!fsnamesystem.isInSafeMode()) { // 則進行租約進行檢測操作 needSync = checkLeases(); } } finally { ... } // 進行租約間隔檢測時間的睡眠,默認2秒 Thread.sleep(fsnamesystem.getLeaseRecheckIntervalMs()); } catch(InterruptedException ie) { if (LOG.isDebugEnabled()) { LOG.debug(name + " is interrupted", ie); } } catch(Throwable e) { LOG.warn("Unexpected throwable: ", e); } } }
從上面的代碼我們可以看出,這是一個持續運行的操作,我們進入checkLease方法,先來看checkLease的頭幾行代碼的執行邏輯,
synchronized boolean checkLeases() { boolean needSync = false; assert fsnamesystem.hasWriteLock(); // 獲取租約檢測的起始時間 long start = monotonicNow(); // 滿足一下3個條件,則進入租約釋放操作: // 1.如果租約隊列不為空 // 2.租約隊列中最老的租約已經出現了超時 // 3.沒到租約檢測的最大時間期限 while(!sortedLeases.isEmpty() && sortedLeases.peek().expiredHardLimit() && !isMaxLockHoldToReleaseLease(start)) { // 獲取更新時間最老的租約,同樣也是已過期的租約時間 Lease leaseToCheck = sortedLeases.peek(); LOG.info(leaseToCheck + " has expired hard limit"); ...
因為sortedLeases租約隊列已經是按最近更新時間值排序好的,所以取出的Lease對象就是最舊的一個租約。在這里還要介紹上面的第3個條件的意思,也就是下面這行代碼的意思:
isMaxLockHoldToReleaseLease(start)
因為HDFS為了避免每次租約檢測花費過長的時間,在此進行租約檢測時間的判斷,如果時間超過了,則終止當前的操作,等待下一次的checkLease操作。
我們繼續來看while循環內下半部分的代碼:
... final List<Long> removing = new ArrayList<>(); // 獲取待釋放租約中包含的文件Id Collection<Long> files = leaseToCheck.getFiles(); Long[] leaseINodeIds = files.toArray(new Long[files.size()]); FSDirectory fsd = fsnamesystem.getFSDirectory(); String p = null; // 遍歷這些文件Id for(Long id : leaseINodeIds) { try { // 獲取這些文件Id對應的INode path對象 INodesInPath iip = INodesInPath.fromINode(fsd.getInode(id)); p = iip.getPath(); // Sanity check to make sure the path is correct if (!p.startsWith("/")) { throw new IOException("Invalid path in the lease " + p); } // 進行文件的關閉,在此過程中,此文件Id將從此租約中移除 boolean completed = fsnamesystem.internalReleaseLease( leaseToCheck, p, iip, HdfsServerConstants.NAMENODE_LEASE_HOLDER); ... } catch (IOException e) { LOG.error("Cannot release the path " + p + " in the lease " + leaseToCheck, e); // 如果在關閉文件的過程中發生異常,則將文件Id加入到移除列表中 removing.add(id); } // 如果發現租約檢測時間到了,則終止當前操作 if (isMaxLockHoldToReleaseLease(start)) { LOG.debug("Breaking out of checkLeases after " + fsnamesystem.getMaxLockHoldToReleaseLeaseMs() + "ms."); break; } } // 從租約中移除異常文件Id for(Long id : removing) { // 如果此租約中已無文件Id,則此租約將從HDFS中徹底移除 removeLease(leaseToCheck, id); } } return needSync; }
通過上述代碼,租約檢測的操作可以歸納為如下步驟:
- 第1步,獲取最老的已過期的租約。
- 第2步,得到此租約中保存的文件Id。
- 第3步,關閉這些文件Id對應的文件,並將這些文件Id從此租約中移除。
- 第4步,如果此租約中已經沒有打開的文件Id,則將此租約從系統中進行移除。
租約檢測過程如圖1-3所示。
圖 1-3 HDFS租約的周期性檢測過程
LeaseRenewer租約更新器
LeaseRenewer對象的作用在於定時更新DFSClient用戶所持有的租約。每個用戶對應一個LeaseRenewer更新器對象,而每個LeaseRenewer對象內部會維護一個DFSClient客戶端列表。在LeaseRenewer的主方法中,會定期的執行DFSClient客戶端對應租約的renew操作。當DFSClient端所操作的文件都被關閉了,此DFSClient將從LeaseRenewer的客戶端列表中進行移除,這就意味着此DFSClient所對應的租約將不再被更新,最后將會被LeaseManager進行過期移除操作。
HDFS租約的添加、檢測、釋放
講述完租約的概念以及管理之后,我們來分析租約的添加到釋放的過程。以我們對於租約的一個傳統概念應該是這樣一個過程:首先在進行文件寫操作時,進行租約的添加,然后操作結束之后,進行租約的釋放。但是猜想歸猜想,事實上究竟是否如此呢?下面我們從HDFS代碼層面對此進行分析。
首先是HDFS租約的添加,租約的添加的確是在每次HDFS寫文件操作的時候進行的,以追加寫操作為例:
static LocatedBlock prepareFileForAppend(final FSNamesystem fsn, final INodesInPath iip, final String leaseHolder, final String clientMachine, final boolean newBlock, final boolean writeToEditLog, final boolean logRetryCache) throws IOException { assert fsn.hasWriteLock(); final INodeFile file = iip.getLastINode().asFile(); final QuotaCounts delta = verifyQuotaForUCBlock(fsn, file, iip); file.recordModification(iip.getLatestSnapshotId()); file.toUnderConstruction(leaseHolder, clientMachine); // 在追加寫操作之前進行租約的添加 fsn.getLeaseManager().addLease( file.getFileUnderConstructionFeature().getClientName(), file.getId()); ...
類似的方法還有FSDirWriteFileOp的startFile方法。對於租約的移除,本人在查閱相關代碼時,並沒有明顯發現在關閉文件操作的時候進行租約的移除動作。所以租約的移除並不是一個簡單的過程,此過程的移除還是依賴於LeaseManager的租約過期移除操作。文件在關閉的過程中,會將自身從相應的DFSClient客戶端對象中進行移除,繼而使得此DFSClient從LeaseRenewer對象中移除,最后讓它的租約不再更新。此過程原理見上小節LeaseRenewer對象的原理介紹。在DFSClient端的寫文件操作方法中,會執行LeaseRenewer的添加動作,代碼如下:
private DFSOutputStream append(String src, int buffersize, EnumSet<CreateFlag> flag, String[] favoredNodes, Progressable progress) throws IOException { checkOpen(); final DFSOutputStream result = callAppend(src, flag, progress, favoredNodes); // 將當前文件Id加入, beginFileLease(result.getFileId(), result); return result; }
這里的beginFileLease操作的意思不是添加新租約的意思,而是說開始對此文件所屬的租約開啟定時更新操作,執行的更新操作是LeaseRenewer的run方法。
最后我們來看租約的檢查,我們看看HDFS如何利用租約來保證只有一個客戶端程序可以寫數據到某個文件的。HDFS租約的檢查方法為FSNamesystem的checkLease方法。此方法在getAdditionalDatanode和fsync方法中被調用,這表明了租約檢查發生以下在兩個時候:
- 第一個,為新寫的block選擇目標存儲節點時,進行租約的檢查。
- 第二個,進行數據同步到磁盤的時候,又一次進行租約的檢查。
這里我們以getAdditionalDatanode方法為例:
LocatedBlock getAdditionalDatanode(String src, long fileId, final ExtendedBlock blk, final DatanodeInfo[] existings, final String[] storageIDs, final Set<Node> excludes, final int numAdditionalNodes, final String clientName ) throws IOException { ... readLock(); try { ... //進行租約的檢查 final INodeFile file = checkLease(iip, clientName, fileId); ...
最后進入checkLease租約檢查方法,
INodeFile checkLease(INodesInPath iip, String holder, long fileId) throws LeaseExpiredException, FileNotFoundException { String src = iip.getPath(); INode inode = iip.getLastINode(); assert hasReadLock(); if (inode == null) { throw new FileNotFoundException("File does not exist: " + leaseExceptionString(src, fileId, holder)); } ... // 獲取當前文件的操作者即租約持有者 final String owner = file.getFileUnderConstructionFeature().getClientName(); // 如果當前操作者不是租約持有者,則拋出異常 if (holder != null && !owner.equals(holder)) { throw new LeaseExpiredException("Client (=" + holder + ") is not the lease owner (=" + owner + ": " + leaseExceptionString(src, fileId, holder)); } return file; }
所以當我們在HDFS的日志中看到諸如“Client (=xxx) is not the lease owner…”這種錯誤的時候,就表明當前有多個客戶端程序同時在寫某個文件。
OK,以上就是本文所要講述的HDFS租約的相關內容了,希望本文能讓大家對HDFS租約有一個更深入的了解。個人感覺HDFS的整個租約邏輯還是有一定復雜度的,還需要大家進行反復地閱讀,理解。