使用TableSnapshotInputFormat讀取Hbase快照數據


根據快照名稱讀取hbase快照中的數據,在網上查了好多資料,很少有資料能夠給出清晰的方案,根據自己的摸索終於實現,現將代碼貼出,希望能給大家有所幫助:

public void read(org.apache.hadoop.conf.Configuration hadoopConf, Pipeline pipeline, ReaderParam readerParam, int batchSize) {
        limiter = RateLimiter.create(readerParam.getFetchSize() * M_BYTE_SIZE);

        //用於記錄讀取行數
        AtomicInteger totalCount = new AtomicInteger();

        JobConf conf = new JobConf(hadoopConf);
        String sourceRcFilePath = readerParam.getFilePath();
        logger.info(String.format("Start Read Rcfile [%s].", sourceRcFilePath));
        String defaultFS=String.format("hdfs://%s", readerParam.getFsdefaultname());

        try {
            int size = 1;
            BatchData batchData;
            List<Record> recordList = new ArrayList<>(batchSize);

            Scan scan = new Scan();
            scan.setCaching(500);
            scan.setCacheBlocks(false);      //離線任務必須設置
            conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray()));

            //序列化
            InputFormat<ImmutableBytesWritable, Result> in = new TableSnapshotInputFormat();
            Path rootDir = FSUtils.getRootDir(conf);
            String[] tableNameSplit = readerParam.getFileName().split(":");
            String namespace_table = tableNameSplit[0]+"_"+tableNameSplit[1];

            Connection conn = ConnectionFactory.createConnection(conf);
            Admin admin = conn.getAdmin();
            boolean tableExist = admin.tableExists(TableName.valueOf(readerParam.getFileName()));
//            List<HBaseProtos.SnapshotDescription> list = admin.listSnapshots("^"+namespace_table);

//            TableName[] tables = admin.listTableNames();
//            List<HBaseProtos.SnapshotDescription> list = admin.listSnapshots();
//            for(HBaseProtos.SnapshotDescription snapshotDescription : list){
//                String snapshotName = snapshotDescription.getMsg();
//                String table = snapshotDescription.getTable();
//            }

            FileSystem fs = FileSystem.get(conf);
            Path rootPath = new Path(conf.get("hbase.rootdir"));
            Path snapshotDir = new Path(conf.get("hbase.rootdir")+HBASE_SNAPSHOT_BASE_PATH);
            snapshotDir = SnapshotDescriptionUtils.getSnapshotRootDir(new Path(conf.get("hbase.rootdir")));
            FileStatus[] listStatus = fs.listStatus(snapshotDir);

//            HBaseProtos.SnapshotDescription snapshotDescription = SnapshotDescriptionUtils.readSnapshotInfo(fs, new Path(conf.get("hbase.rootdir")+"/.snapshots/completed"));
//            Arrays.stream(listStatus).forEach(x-> System.out.println(x.getPath().toString()));
//            System.out.println("-----------------------------------------");

            List<String> snapshotList = new ArrayList<String>();
            Arrays.stream(listStatus).filter(x-> !x.getPath().getName().startsWith(".")).forEach(x->{
                String snapshotName = x.getPath().getName();
                Path snapshotPath = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootPath);
                try {
                   HBaseProtos.SnapshotDescription s = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotPath);
                   System.out.println("tableName:"+ s.getTable()+"\t snapshot:"+s.getName());
                   if (s.getTable().equalsIgnoreCase(readerParam.getFileName())){
                       snapshotList.add(s.getName());
                   }
                } catch (CorruptedSnapshotException e) {
                    e.printStackTrace();
                }
            });
//            List<String> snapshotList = Arrays.stream(listStatus).filter(x-> !x.getPath().getMsg().startsWith(".")).map(x -> String.valueOf(x.getPath())).filter(x -> x.contains(namespace_table)).sorted(Comparator.reverseOrder()).collect(Collectors.toList());
            snapshotList.stream().forEach(x -> System.out.println(x));
            if (snapshotList.isEmpty()){
                String message = String.format("讀取Hbase快照信息發生異常,沒有找到對應表快照,請聯系系統管理員。", readerParam.getFilePath());
                logger.error(message);
                throw DiException.asDiException(CommonErrorCode.CONFIG_ERROR, message);
            }
            String snapshotName = snapshotList.stream().sorted(Comparator.reverseOrder()).findFirst().get();
            String restorTmp = String.format("%s/user/%s/restoretmp/%s", conf.get("fs.defaultFS"), "di", namespace_table);
            Path restorPath = new Path(restorTmp);
//            Path restorPath = new Path("hdfs://RouterSit/user/di/restoretmp/ns_di_snapshot_test2");
            TableSnapshotInputFormatImpl.setInput(conf, snapshotName, restorPath);

            List<String> columns = Arrays.asList(readerParam.getReadColumns().split(","));


            //Each file as a split
            InputSplit[] splits = in.getSplits(conf, 1);
            for (InputSplit split : splits){

                recordReader = in.getRecordReader(split, conf, Reporter.NULL);
                ImmutableBytesWritable key = recordReader.createKey();
                Result value = recordReader.createValue();

                List<Object> recordFields;
                while (start && recordReader.next(key, value)) {

                    Record record = result2Record(value, columns);
                    limiter.acquire(record.getMemorySize());
                    recordList.add(record);
                    size++;
                }
            }

        } catch (Exception e) {
            String message = String.format("讀取Hbase快照數據發生異常,請聯系系統管理員。", readerParam.getFilePath());
            logger.error(message);
            throw DiException.asDiException(CommonErrorCode.CONFIG_ERROR, message, e);
        } finally {
            stop();
        }

    }

  

如果讀取快照數據時,數據列簇使用的是lzo壓縮的話,可能會遇到lzo解壓縮問題,可以參照:hbase讀取快照數據-lzo壓縮遇到的問題

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM