HDFS租約實踐


一、租約詳解

Why租約

HDFS的讀寫模式為 "write-once-read-many",為了實現write-once,需要設計一種互斥機制,租約應運而生
租約本質上是一個有時間約束的鎖,即:在一定時間內對租約持有者(也就是客戶端)賦予一定的權限

HDFS租約模型

<Lease>
Lease和DFSClient的對應關系為一對一(即:在Hdfs-Server端,為每個DFSClient建立一個Lease),Lease包含的主要信息有:
  * holder:租約持有者(即:DFSClient)
  * lastUpdate:上次續約時間
  * paths:該租約持有的文件路徑
  * softLimit和hardLimit
    > 當前時間減去Lease的lastUpdate超過softLimit,允許其它DFSClient搶占該Client持有的filepath(softLimit的默認值為1min)
    > 當前時間減去Lease的astUpdate超過hardLimit,允許LeaseManager強制將該租約回收銷毀(hardLimit的默認值為1hour)

<LeaseManager>
顧名思義,LeaseManager是租約的管理者,運行於HDFS-Server端,其主要功能特性有:
  * 維護了DFSClient和Lease的映射關系(參見leases屬性)
  * 維護了filePath和Lease的映射關系(參見sortedLeasesByPath屬性)
  * 對租約進行生命周期和狀態的管理:
    > 創建租約或正常情況下的銷毀租約
    > 賦予(或撤銷)FilePath權限給租約(撤銷FilePath,如:執行文件流的close方法)
    > 接受續約請求,對租約進行續約處理(即:更新Lease的lastUpdate字段)
    > 對超過hardLimit的租約進行銷毀處理(參見:LeaseManager.Monitor類)

<LeaseRenewer>
顧名思義,LeaseRenewer是租約的續約者,運行於HDFS-Client端,其主要功能特性有:
  * LeaseRenewer維護了一個DFSClient列表和一個定時線程,循環不斷的為各個DFSClient進行續約操作
  * LeaseRenew本質上是一個heartbeat,方便對超時的DFSClient進行容錯處理
  * 從client到server端續約的主流程如下:LeaseRenewer -> DFSClient -> NameNodeRpcServer -> FSNamesystem -> LeaseManager
  * 其它細節此處不再闡述,直接看源碼即可

<FSNamesystem>
LeaseManager用來管理租約,那么,FSNamesystem用來協調租約,其主要功能特性有:
  * FSNamesystem中和租約相關最核心的一個方法是recoverLeaseInternal,startFile方法、appendFile方法和recoverLease方法都會調用該方法,該方法主要功能有:
    > 驗證ReCreate
     如果待操作的文件path已經存在於該DFSClient的Lease的paths列表中,則拋AlreadyBeingCreatedException,
     提示 "current leaseholder is trying to recreate file"
    > 驗證OtherCreate
     如果待操作的文件path已經存在於其它DFSClient的Lease的paths列表中,此時有兩種策略:如果那個DFSClient的Lease的softLimit已經過期,
     系統會嘗試進行Lease-Recovery,然后把path從那個DFSClient的Lease的paths中remove掉,這樣新的Client便獲取了該path的占有權限;
     如果那個DFSClient的Lease的softLimit還未過期,則拋AlreadyBeingCreatedException,提示 "because this file is already being created by ... on ..."    
    > 驗證Recovery
     這個比較簡單,如果待操作的文件還處於租約的Recovery狀態,則拋異常RecoveryInProgressException,提示稍后重試
    > ForceRecovery
     recoverLeaseInternal方法提供了force參數,如果force為true,系統會強制進行Lease-Recovery,具體功能見recoverLease方法的注釋即可,如下:
       * Immediately revoke the lease of the current lease holder and start lease
       * recovery so that the file can be forced to be closed.
      
force recovery的使用場景下文會有介紹

Recovery機制

recovery是一種容錯機制,主要分為block的recovery和lease的recovery,此處不詳述,具體可參考下面的鏈接:
http://blog.cloudera.com/blog/2015/02/understanding-hdfs-recovery-processes-part-1/ 

二、場景介紹

如上圖所示,我們的應用場景介紹如下:
* Worker:一個Worker是一個進程,每個Worker負責運行一批Task
* Task:Task負責把抓取到的數據源源不斷的實時同步到Hdfs,每個Task負責管理Hdfs中的N個文件(如上圖,Task-1在hdfs中對應了Task-1-file-1和Task-1-file-2)
* (Re-)balance:Task和Worker之間的關系是動態的(即:Task在Worker上是平均分配的),當新Worker加入,現有Worker退出、新增Task和刪除Task的時候,會觸發Rebalance,Task重新分配。比如上圖中,增加一個Worker-3,Reblance完成之后的結果為:worker-1運行Task-1和Task-2,worker-2運行Task-3和Task-4,worker-3運行Task-5

三、設計方案

結合租約的特點和我們的場景需求,需要進行針對性的設計,才能避免觸發【租約異常】,下面以問答的形式闡述核心設計方案

一個進程內如何同時訪問多個hadoop集群?

 若要一個進程內同時訪問多個hadoop集群,那就需要針對每個集群分別創建各自的FileSystem實例,需要做的有兩點:
* 其一:保證針對這多個集群的Configuration實例中的 "fs.defaultFS" 的配置是不同的
* 其二:HADOOP_USER_NAME屬性不能通過System.setProperty方法設置,應該調用FileSystem的get方法時動態傳入

對文件流應該怎樣管理?

Ps:DFSClient是一個進程級實例,即對應同一個hadoop集群,在一個worker進程中只有一個DFSClient
一個文件對應了一個文件流,創建文件流時FSNamesystem會把流對應的path放到Lease中,關閉文件流時FSNamesystem會把流對應的path從Lease中移除,對文件流的管理需要保證以下幾個原則
* 其一:流的生命周期應該和Task保持一致,Task運行過程中流隨用隨創建,Task關閉時把其占有的所有流也關閉,這樣才能保證在發生Reblance后,不會出現租約被其它DFSClient占用的問題
* 其二:超時不用的流要及時清理,保證其它使用者有機會獲取權限,比如發生日切之后,所有的數據都寫到新文件中了,前一天的文件不會再有寫入操作,那么應該及時關閉前一天的文件流

如何解決Other-Create問題?

何時會觸發Other-Create問題?
其一:Worker宕機,其負責的Task漂移到其它Worker,漂移后的Task便會收到Other-Create異常,只有當超過softLimit之后,異常才會解除,即恢復時間需要1分鍾
其二:其它程序原因,如:在發生Reblance時,Task會先被關閉再漂移,如果Task在關閉的過程中流關閉的有問題(比如觸發了超時),也可能會觸發Other-Create異常
如何應對?
Other-Create異常中包含了other-Dfsclient的IP信息,我們可以調用other-worker提供的接口,遠程關閉出問題的流,如果關閉失敗或者訪問出現超時(宕機的時候會超時),再進行force recovery操作

如何解決Re-Create問題?

流關閉時可能會出現異常,如果出現異常,需要進行force recovery操作,否則的話租約將一直不可釋放,一直報Re-Create異常

四、源碼解析

【設計方案】部分的描述比較抽象,下面我們結合源碼進行更詳細的介紹,所有的關鍵描述都放到了源碼注釋里,直接看注釋即可

package com.ucar.hdfs.lease.demo;

import com.ucar.hdfs.lease.demo.stream.FileStreamHolder;
import com.ucar.hdfs.lease.demo.stream.FileStreamToken;
import com.ucar.hdfs.lease.demo.util.FileLockUtils;
import com.ucar.hdfs.lease.demo.util.HdfsConfig;
import com.ucar.hdfs.lease.demo.util.RemoteUtil;
import org.apache.hadoop.fs.FSDataOutputStream;

import java.text.MessageFormat;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 一個Demo類,可以自己寫一些單元測試類,驗證租約相關的原理
 */
public class TaskDemo {

    private final FileStreamHolder fileStreamHolder;
    private final HdfsConfig hdfsConfig;
    private final String filePathPrefix;

    public TaskDemo(FileStreamHolder fileStreamHolder, HdfsConfig hdfsConfig, String filePathPrefix) {
        this.fileStreamHolder = fileStreamHolder;
        this.hdfsConfig = hdfsConfig;
        this.filePathPrefix = filePathPrefix;
    }

    public void start() {
        fileStreamHolder.start();
    }

    public void stop() {
        fileStreamHolder.close();
    }

    public void writeData(String fileName, List<String> content) throws Exception {
        String hdfsFilePath = this.hdfsConfig.getHdfsAddress() + filePathPrefix + fileName + ".txt";

        ReentrantLock lock = FileLockUtils.getLock(hdfsFilePath);
        FileStreamToken fileStreamToken = null;
        try {
            lock.lock();
            fileStreamToken = fileStreamHolder.getStreamToken(hdfsFilePath, this.hdfsConfig);
            writeDataInternal(fileStreamToken.getFileStream(), content);
        } catch (Exception e) {
            if (fileStreamToken != null) {
                // 出現異常的時候,必須把流回收一下,否則的話異常會持續不斷的報,並且無法自動恢復
                // 我們曾經遇到過的根本無法自動恢復的異常有:

                /*
                Caused by: java.net.ConnectException: Connection timed out
                at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_121]
                at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_121]
                at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) ~[na:na]
                at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:530) ~[na:na]
                at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1610) ~[na:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1123) ~[na:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1112) ~[na:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1253) ~[na:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:594) ~[na:na]
                */

                /*
                Caused by: java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being available to try. (Nodes: current=[xxx:50010, xxx:50010], original=[xxx:50010, xxx:50010]). The current failed datanode replacement policy is DEFAULT, and a client may configure this via 'dfs.client.block.write.replace-datanode-on-failure.policy' in its configuration.
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.findNewDatanode(DFSOutputStream.java:1040) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1106) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1253) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1004) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:548) ~[hadoop-hdfs-2.6.3.jar:na]
                */

                /*
                Caused by: java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/xxx:36061 remote=/xxx:50010]
                at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164) ~[hadoop-common-2.6.3.jar:na]
                at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) ~[hadoop-common-2.6.3.jar:na]
                at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131) ~[hadoop-common-2.6.3.jar:na]
                at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118) ~[hadoop-common-2.6.3.jar:na]
                at java.io.FilterInputStream.read(FilterInputStream.java:83) ~[na:1.8.0_121]
                at java.io.FilterInputStream.read(FilterInputStream.java:83) ~[na:1.8.0_121]
                at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2205) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:1142) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:1112) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1253) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:1004) ~[hadoop-hdfs-2.6.3.jar:na]
                at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:548) ~[hadoop-hdfs-2.6.3.jar:na]
                */
                fileStreamHolder.close(hdfsFilePath);
            } else {
                //可能是Other-Create異常,嘗試遠程關閉
                RemoteUtil.tryRemoteClose(hdfsFilePath, e);
            }
            throw new RuntimeException(MessageFormat.format("Data Append failed for file - {0}.", hdfsFilePath), e);
        } finally {
            if (fileStreamToken != null) {
                fileStreamToken.setLastUpdateTime(System.currentTimeMillis());
            }
            if (lock != null) {
                lock.unlock();
            }
        }
    }

    private void writeDataInternal(FSDataOutputStream fsOut, List<String> transferData) throws Exception {
        StringBuffer sb = new StringBuffer();
        for (String row : transferData) {
            sb.append(row);
            sb.append("\n");
        }

        byte[] bytes = sb.toString().getBytes("UTF-8");
        fsOut.write(bytes);
        fsOut.hsync();
    }
}
package com.ucar.hdfs.lease.demo.util;

import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.ipc.RemoteException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class RemoteUtil {
    private static final Logger logger = LoggerFactory.getLogger(RemoteUtil.class);
    private static final String RECREATE_IDENTIFIER = "because this file is already being created by";

    public static void tryRemoteClose(String hdfsFilePath, Exception e) {
        try {
            if (e instanceof RemoteException) {
                RemoteException re = (RemoteException) e;
                String className = re.getClassName();
                if (className.equals(AlreadyBeingCreatedException.class.getName()) && e.getMessage().contains(RECREATE_IDENTIFIER)) {
                    logger.info("stream remote close begin for file : " + hdfsFilePath);
                    colseInternal(hdfsFilePath, parseIp(e.getMessage()));
                    logger.info("stream remote close end for file : " + hdfsFilePath);
                }
            }
        } catch (Exception ex) {
            logger.error("stream remote close failed for file : " + hdfsFilePath, ex);
        }
    }

    static void colseInternal(String hdfsFilePath, String address) {
        //TODO
        // 遠程調用其它進程,進行流關閉操作
        // 此處應該進行更細致的處理
        // 1. 如果流關閉失敗了,我們也應該執行一下FileSystem的recoverLease方法
        // 2. 如果訪問超時了,那么應該出現了宕機的情況,雖然通過softLimit會自動恢復,但是如果實時性要求高,應該也執行一下FileSystem的recoverLease方法
        // 3. 或者根據自己的場景,壓根兒就不遠程關閉,強制執行recoverLease方法也行
    }

    private static String parseIp(String message) {
        Pattern p = Pattern.compile("\\[.*?\\]");
        Matcher m = p.matcher(message);
        String ip = null;
        while (m.find()) {
            ip = m.group().replace("[", "").replace("]", "");
        }
        return ip;
    }
}
package com.ucar.hdfs.lease.demo.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;

import java.net.URI;

/**
 * e.g.
 *
 * hdfsAddress      ->  hdfs://hadoop2cluster
 * zkUrl            ->  192.168.0.1,192.168.0.2,192.168.0.3
 * zkPort           ->  2181
 * hadoopUser       ->  hadoop
 * haNameNode1      ->  namenode01.10101111.com:8020
 * haNameNode2      ->  namenode02.10101111.com:8020
 * hdfsPacketSize   ->  20971520
 *
 */
public class HdfsConfig {
    private volatile String hdfsAddress;
    private volatile URI hdfsUri;
    private volatile String zkUrl;
    private volatile String zkPort;
    private volatile String hadoopUser;
    private volatile String haNameNode1;
    private volatile String haNameNode2;
    private volatile Long hdfsPacketSize;
    private volatile Configuration configuration;

    public HdfsConfig(String hdfsAddress, String zkUrl, String zkPort, String hadoopUser, String haNameNode1, String haNameNode2, long hdfsPacketSize) {
        this.hdfsAddress = hdfsAddress;
        this.zkUrl = zkUrl;
        this.zkPort = zkPort;
        this.hadoopUser = hadoopUser;
        this.haNameNode1 = haNameNode1;
        this.haNameNode2 = haNameNode2;
        this.hdfsPacketSize = hdfsPacketSize;
        this.hdfsUri = URI.create(this.hdfsAddress);
        this.buildConfiguration();
    }

    private void buildConfiguration() {
        this.configuration = HBaseConfiguration.create();
        this.configuration.set("fs.defaultFS", this.hdfsAddress);
        this.configuration.set("dfs.support.append", "true");
        this.configuration.set("hbase.zookeeper.quorum", this.zkUrl);
        this.configuration.set("hbase.zookeeper.property.clientPort", this.zkPort);
        this.configuration.set("dfs.client-write-packet-size", String.valueOf(hdfsPacketSize));

        // 高可用設置
        String key = hdfsUri.getAuthority();
        this.configuration.set("dfs.nameservices", key);
        this.configuration.set(String.format("dfs.ha.namenodes.%s", key), "nn1,nn2");
        this.configuration.set(String.format("dfs.namenode.rpc-address.%s.nn1", key), this.haNameNode1);
        this.configuration.set(String.format("dfs.namenode.rpc-address.%s.nn2", key), this.haNameNode2);
        this.configuration.set(String.format("dfs.client.failover.proxy.provider.%s", key), "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    }

    public String getHdfsAddress() {
        return hdfsAddress;
    }

    public URI getHdfsUri() {
        return hdfsUri;
    }

    public String getZkUrl() {
        return zkUrl;
    }

    public String getZkPort() {
        return zkPort;
    }

    public String getHadoopUser() {
        return hadoopUser;
    }

    public String getHaNameNode1() {
        return haNameNode1;
    }

    public String getHaNameNode2() {
        return haNameNode2;
    }

    public Configuration getConfiguration() {
        return configuration;
    }

    public long getHdfsPacketSize() {
        return hdfsPacketSize;
    }
}
package com.ucar.hdfs.lease.demo.util;

import com.google.common.cache.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class FileLockUtils {

    private static final Logger logger = LoggerFactory.getLogger(FileLockUtils.class);

    private static final LoadingCache<String, ReentrantLock> lockCache = CacheBuilder
            .newBuilder()
            .expireAfterAccess(24, TimeUnit.HOURS)
            .removalListener(new RemovalListener<Object, Object>() {
                @Override
                public void onRemoval(RemovalNotification<Object, Object> notification) {
                    logger.info(String.format("Lock for [%s] was removed , cause is [%s]",
                                    notification.getKey(),
                                    notification.getCause())
                    );
                }
            })
            .build(new CacheLoader<String, ReentrantLock>() {
                @Override
                public ReentrantLock load(String key) throws Exception {
                    return new ReentrantLock();
                }
            });

    public static ReentrantLock getLock(String hdfsFilePath) {
        return lockCache.getUnchecked(hdfsFilePath);
    }
}
package com.ucar.hdfs.lease.demo.stream;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.ucar.hdfs.lease.demo.util.HdfsConfig;
import org.apache.hadoop.fs.FileSystem;

import java.util.concurrent.TimeUnit;

public class FileSystemManager {

    private static final LoadingCache<HdfsConfig, FileSystem> fileSystemCache = CacheBuilder
            .newBuilder()
            .expireAfterAccess(24, TimeUnit.HOURS)
            .build(new CacheLoader<HdfsConfig, FileSystem>() {
                @Override
                public FileSystem load(HdfsConfig hdfsConfig) throws Exception {
                    // FileSystem一共有兩個get方法,需要調用這個get方法才能支持"一個進程內同時訪問多個hadoop集群"
                    return FileSystem.get(
                            hdfsConfig.getHdfsUri(),
                            hdfsConfig.getConfiguration(),
                            hdfsConfig.getHadoopUser());
                }
            });

    public static FileSystem getFileSystem(HdfsConfig hdfsConfig) {
        return fileSystemCache.getUnchecked(hdfsConfig);
    }
}
package com.ucar.hdfs.lease.demo.stream;


import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;

public class FileStreamToken {

    private volatile String pathString;
    private volatile Path path;
    private volatile DistributedFileSystem fileSystem;
    private volatile FSDataOutputStream fileStream;
    private volatile long lastUpdateTime;

    public FileStreamToken(String pathString, Path path, DistributedFileSystem fileSystem, FSDataOutputStream fileStream) {
        this.pathString = pathString;
        this.path = path;
        this.fileSystem = fileSystem;
        this.fileStream = fileStream;
        this.lastUpdateTime = System.currentTimeMillis();
    }

    public String getPathString() {
        return pathString;
    }

    public void setPathString(String pathString) {
        this.pathString = pathString;
    }

    public Path getPath() {
        return path;
    }

    public void setPath(Path path) {
        this.path = path;
    }

    public DistributedFileSystem getFileSystem() {
        return fileSystem;
    }

    public void setFileSystem(DistributedFileSystem fileSystem) {
        this.fileSystem = fileSystem;
    }

    public FSDataOutputStream getFileStream() {
        return fileStream;
    }

    public void setFileStream(FSDataOutputStream fileStream) {
        this.fileStream = fileStream;
    }

    public long getLastUpdateTime() {
        return lastUpdateTime;
    }

    public void setLastUpdateTime(long lastUpdateTime) {
        this.lastUpdateTime = lastUpdateTime;
    }
}
package com.ucar.hdfs.lease.demo.stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class FileStreamKeeper {
    private static final Logger logger = LoggerFactory.getLogger(FileStreamKeeper.class);
    private static final long CLOSE_CHECK_PERIOD = 60000;// 單位ms
    private static final long STREAM_LEISURE_LIMIT = 60000;//單位ms

    private static ScheduledExecutorService executorService;
    private static List<FileStreamHolder> holders = new ArrayList<>();

    //定時關閉不用的流
    public static void start() {
        executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(
                FileStreamKeeper::leisureCheck,
                CLOSE_CHECK_PERIOD,
                CLOSE_CHECK_PERIOD,
                TimeUnit.MILLISECONDS
        );
        logger.info("File Stream Keeper is started.");
    }

    public static void closeStreamLocal(String hdfsFilePath) {
        if (holders != null && !holders.isEmpty()) {
            holders.stream().forEach(h -> {
                Set<Map.Entry<String, FileStreamToken>> set = h.getTokens().entrySet();
                for (Map.Entry<String, FileStreamToken> entry : set) {
                    try {
                        if (hdfsFilePath.equals(entry.getKey())) {
                            h.close(entry.getKey());
                            return;
                        }
                    } catch (Throwable t) {
                        logger.error("stream close failed for file : " + entry.getKey());
                    }
                }
            });
        }
    }

    static synchronized void register(FileStreamHolder fileStreamHolder) {
        holders.add(fileStreamHolder);
    }

    static synchronized void unRegister(FileStreamHolder fileStreamHolder) {
        holders.remove(fileStreamHolder);
    }

    private static void leisureCheck() {
        try {
            if (holders != null && !holders.isEmpty()) {
                holders.stream().forEach(h -> {

                    logger.info("timer stream close begin.");
                    Set<Map.Entry<String, FileStreamToken>> set = h.getTokens().entrySet();
                    for (Map.Entry<String, FileStreamToken> entry : set) {
                        try {
                            FileStreamToken vo = entry.getValue();
                            if (vo.getLastUpdateTime() + STREAM_LEISURE_LIMIT < System.currentTimeMillis()) {
                                h.close(entry.getKey());//超時關閉
                            }
                        } catch (Throwable t) {
                            logger.error("timer stream close failed for file : " + entry.getKey());
                        }
                    }
                    logger.info("timer stream close end.");
                });
            }
        } catch (Throwable t) {
            logger.error("something goes wrong when do leisure check.", t);
        }
    }
}
package com.ucar.hdfs.lease.demo.stream;


import com.ucar.hdfs.lease.demo.util.FileLockUtils;
import com.ucar.hdfs.lease.demo.util.HdfsConfig;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class FileStreamHolder {

    private static final Logger logger = LoggerFactory.getLogger(FileStreamHolder.class);

    private final Map<String, FileStreamToken> tokens;
    private final ReentrantReadWriteLock readWriteLock;
    private volatile boolean running;

    public FileStreamHolder() {
        this.tokens = new ConcurrentHashMap<>();
        this.readWriteLock = new ReentrantReadWriteLock();
    }

    public void start() {
        this.running = true;
        FileStreamKeeper.register(this);
        logger.info("FileStreamHolder is started.");
    }

    public void close() {
        try {
            this.readWriteLock.writeLock().lock();
            this.running = false;
            if (this.tokens.size() > 0) {
                this.tokens.keySet().forEach(this::close);
            }
        } finally {
            FileStreamKeeper.unRegister(this);
            this.readWriteLock.writeLock().unlock();
        }
        logger.info("FileStreamHolder is closed.");
    }

    public FileStreamToken getStreamToken(String pathString, HdfsConfig hdfsConfig)
            throws Exception {
        try {
            this.readWriteLock.readLock().lock();
            if (!running) {
                throw new RuntimeException("FileStreamHolder has closed, StreamToken gotten failed.");
            }

            return getStreamTokenInternal(pathString, hdfsConfig);
        } finally {
            this.readWriteLock.readLock().unlock();
        }
    }

    private FileStreamToken getStreamTokenInternal(String pathString, HdfsConfig hdfsConfig)
            throws Exception {
        DistributedFileSystem hadoopFS = (DistributedFileSystem) FileSystemManager.getFileSystem(hdfsConfig);

        ReentrantLock lock = FileLockUtils.getLock(pathString);
        try {
            lock.lock();
            FileStreamToken token = tokens.get(pathString);
            if (token == null) {
                FSDataOutputStream fileStream;
                Path path = new Path(pathString);

                if (!hadoopFS.exists(path)) {
//create方法最終會調用server端FSNamesystem的startFile方法 fileStream
= hadoopFS.create(path, false, hdfsConfig.getConfiguration().getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT), (short) 3, 64 * 1024 * 1024L); logger.info("stream create succeeded for file : " + pathString); } else {
//append方法最終會調用server端FSNamesystem的appendFile方法 fileStream
= hadoopFS.append(path); logger.info("stream append succeeded for file : " + pathString); } token = new FileStreamToken(pathString, path, hadoopFS, fileStream); tokens.put(pathString, token); } return token; } finally { lock.unlock(); } } public void close(String pathString) { ReentrantLock lock = FileLockUtils.getLock(pathString); try { lock.lock(); FileStreamToken vo = tokens.get(pathString); if (vo != null) { try { vo.getFileStream().close(); logger.info("stream close succeeded for file : " + pathString); } catch (Throwable e) { logger.error("stream close failed for file : " + pathString, e); try { //流關閉失敗的時候,必須執行一下recoverLease方法(即:force recovery) //出現異常,流肯定不能接着用了,我們也不知道服務端究竟有沒有感知到關閉操作 //如果沒有感知到close動作,租約一直沒有被release,將導致Re-Create問題 vo.getFileSystem().recoverLease(vo.getPath()); logger.info("lease recover succeeded for file : " + pathString); } catch (Exception ex) { logger.error("lease recover failed for file : " + pathString, ex); } } finally { //不管有沒有異常,我們都需要進行remove //沒有異常,說明流關閉成功(嚴格意義上講,沒有異常也不代表流關閉成功了,假設第一次關的時候出異常了,沒有進行處理,隨后再次執行close,就不會報異常,具體可參考流的close方法),正常remove沒有問題 //有異常,說明流關閉失敗,關了一半兒,流已經有問題了,不能再用了,必須remove掉,不remove的話,后續接着用會報ClosedChannelException異常 /*Caused by: java.nio.channels.ClosedChannelException at org.apache.hadoop.hdfs.DFSOutputStream.checkClosed(DFSOutputStream.java:1622) at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:104) at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:58) at java.io.DataOutputStream.write(DataOutputStream.java:107) at java.io.FilterOutputStream.write(FilterOutputStream.java:97) at com.ucar.datalink.writer.hdfs.handle.RdbEventRecordHandler.writeData(RdbEventRecordHandler.java:246) at com.ucar.datalink.writer.hdfs.handle.RdbEventRecordHandler.doWriteData(RdbEventRecordHandler.java:221) at com.ucar.datalink.writer.hdfs.handle.RdbEventRecordHandler.toWriteData(RdbEventRecordHandler.java:187)*/ tokens.remove(pathString); } } } finally { if (lock != null) { lock.unlock(); } } } Map<String, FileStreamToken> getTokens() { return tokens; } }

五、參考資料

http://blog.csdn.net/androidlushangderen/article/details/48012001

https://www.tuicool.com/articles/meuuaqU

https://www.tuicool.com/articles/IJjq6v

http://blog.cloudera.com/blog/2015/02/understanding-hdfs-recovery-processes-part-1/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM