根據快照名稱讀取hbase快照中的數據,在網上查了好多資料,很少有資料能夠給出清晰的方案,根據自己的摸索終於實現,現將代碼貼出,希望能給大家有所幫助:
public void read(org.apache.hadoop.conf.Configuration hadoopConf, Pipeline pipeline, ReaderParam readerParam, int batchSize) { limiter = RateLimiter.create(readerParam.getFetchSize() * M_BYTE_SIZE); //用於記錄讀取行數 AtomicInteger totalCount = new AtomicInteger(); JobConf conf = new JobConf(hadoopConf); String sourceRcFilePath = readerParam.getFilePath(); logger.info(String.format("Start Read Rcfile [%s].", sourceRcFilePath)); String defaultFS=String.format("hdfs://%s", readerParam.getFsdefaultname()); try { int size = 1; BatchData batchData; List<Record> recordList = new ArrayList<>(batchSize); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); //離線任務必須設置 conf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray())); //序列化 InputFormat<ImmutableBytesWritable, Result> in = new TableSnapshotInputFormat(); Path rootDir = FSUtils.getRootDir(conf); String[] tableNameSplit = readerParam.getFileName().split(":"); String namespace_table = tableNameSplit[0]+"_"+tableNameSplit[1]; Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin(); boolean tableExist = admin.tableExists(TableName.valueOf(readerParam.getFileName())); // List<HBaseProtos.SnapshotDescription> list = admin.listSnapshots("^"+namespace_table); // TableName[] tables = admin.listTableNames(); // List<HBaseProtos.SnapshotDescription> list = admin.listSnapshots(); // for(HBaseProtos.SnapshotDescription snapshotDescription : list){ // String snapshotName = snapshotDescription.getMsg(); // String table = snapshotDescription.getTable(); // } FileSystem fs = FileSystem.get(conf); Path rootPath = new Path(conf.get("hbase.rootdir")); Path snapshotDir = new Path(conf.get("hbase.rootdir")+HBASE_SNAPSHOT_BASE_PATH); snapshotDir = SnapshotDescriptionUtils.getSnapshotRootDir(new Path(conf.get("hbase.rootdir"))); FileStatus[] listStatus = fs.listStatus(snapshotDir); // HBaseProtos.SnapshotDescription snapshotDescription = SnapshotDescriptionUtils.readSnapshotInfo(fs, new Path(conf.get("hbase.rootdir")+"/.snapshots/completed")); // Arrays.stream(listStatus).forEach(x-> System.out.println(x.getPath().toString())); // System.out.println("-----------------------------------------"); List<String> snapshotList = new ArrayList<String>(); Arrays.stream(listStatus).filter(x-> !x.getPath().getName().startsWith(".")).forEach(x->{ String snapshotName = x.getPath().getName(); Path snapshotPath = SnapshotDescriptionUtils.getCompletedSnapshotDir(snapshotName, rootPath); try { HBaseProtos.SnapshotDescription s = SnapshotDescriptionUtils.readSnapshotInfo(fs, snapshotPath); System.out.println("tableName:"+ s.getTable()+"\t snapshot:"+s.getName()); if (s.getTable().equalsIgnoreCase(readerParam.getFileName())){ snapshotList.add(s.getName()); } } catch (CorruptedSnapshotException e) { e.printStackTrace(); } }); // List<String> snapshotList = Arrays.stream(listStatus).filter(x-> !x.getPath().getMsg().startsWith(".")).map(x -> String.valueOf(x.getPath())).filter(x -> x.contains(namespace_table)).sorted(Comparator.reverseOrder()).collect(Collectors.toList()); snapshotList.stream().forEach(x -> System.out.println(x)); if (snapshotList.isEmpty()){ String message = String.format("讀取Hbase快照信息發生異常,沒有找到對應表快照,請聯系系統管理員。", readerParam.getFilePath()); logger.error(message); throw DiException.asDiException(CommonErrorCode.CONFIG_ERROR, message); } String snapshotName = snapshotList.stream().sorted(Comparator.reverseOrder()).findFirst().get(); String restorTmp = String.format("%s/user/%s/restoretmp/%s", conf.get("fs.defaultFS"), "di", namespace_table); Path restorPath = new Path(restorTmp); // Path restorPath = new Path("hdfs://RouterSit/user/di/restoretmp/ns_di_snapshot_test2"); TableSnapshotInputFormatImpl.setInput(conf, snapshotName, restorPath); List<String> columns = Arrays.asList(readerParam.getReadColumns().split(",")); //Each file as a split InputSplit[] splits = in.getSplits(conf, 1); for (InputSplit split : splits){ recordReader = in.getRecordReader(split, conf, Reporter.NULL); ImmutableBytesWritable key = recordReader.createKey(); Result value = recordReader.createValue(); List<Object> recordFields; while (start && recordReader.next(key, value)) { Record record = result2Record(value, columns); limiter.acquire(record.getMemorySize()); recordList.add(record); size++; } } } catch (Exception e) { String message = String.format("讀取Hbase快照數據發生異常,請聯系系統管理員。", readerParam.getFilePath()); logger.error(message); throw DiException.asDiException(CommonErrorCode.CONFIG_ERROR, message, e); } finally { stop(); } }
如果讀取快照數據時,數據列簇使用的是lzo壓縮的話,可能會遇到lzo解壓縮問題,可以參照:hbase讀取快照數據-lzo壓縮遇到的問題