sparksql讀寫hbase


  1         //寫入hbase(hfile方式)
  2         org.apache.hadoop.hbase.client.Connection conn = null;
  3         try {
  4              SparkLog.debug("開始讀取hbase信息...");
  5             if (StringUtils.isNotBlank(type) && type.equalsIgnoreCase("hbase")) {
  6                 SparkLog.debug("==========================================");
  7                 String hbasetable = dict.getStringItem("table", "");
  8                 String hbase_site_path = dict.getStringItem("path_site", "");
  9                 String hfile_path = dict.getStringItem("hfile_path", "");
 10                 Configuration  conf = ss.sparkContext().hadoopConfiguration(); 11                 
 12                 if (StringUtils.isBlank(hbase_site_path)) {
 13                     SparkLog.warn("參數配置錯誤,未配置hbase-site信息!");
 14                 }
 15                 if (StringUtils.isNotBlank(hbase_site_path)) {
 16                     hbase_site_path = hbase_site_path + (hbase_site_path.contains("hbase-site.xml") ? "" : "/hbase-site.xml");
 17                     conf.addResource(new Path(hbase_site_path));
 18                 }
 19                 
 20                 SparkLog.debug("讀取hbase信息完成");
 21                 conf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "60000");
 22                 if (!P_Spark.delHDFSDir(hfile_path)) {
 23                     return TCResult.newFailureResult("SPARK_ERROR", "刪除舊文件失敗");
 24                 }
 25                 conf.setInt("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", 1024);
 26                 SparkLog.debug(conf);
 27                 
 28                 SparkLog.debug("創建hbase的鏈接...");
 29                 // 創建hbase的鏈接,利用默認的配置文件,實際上讀取的hbase的master地址
 30                 conn = ConnectionFactory.createConnection(conf);
 31                 
 32                 SparkLog.debug("開始生成hfile文件...");
 33                 //必須按照key排序
 34                 data.flatMapToPair(new PairFlatMapFunction<Row, ImmutableBytesWritable, KeyValue>() {
 35 
 36                 private static final long serialVersionUID = -8033772725296906227L;
 37     
 38                 @Override
 39                 public Iterator<Tuple2<ImmutableBytesWritable, KeyValue>> call(Row s) throws Exception {
 40                         byte[] rowkey = Bytes.toBytes((new SimpleDateFormat("yyyyMMddHHmmss.sss")).format(System.currentTimeMillis())+Math.random()*100); 
 41                         List<Tuple2<ImmutableBytesWritable,KeyValue>> cols = new ArrayList<>();
 42                         byte[] family = Bytes.toBytes("fm");
 43                         
 44                         String hostname =s.get(0)==null ? "":s.getString(0);
 45                         String request_date =s.get(1)==null ? "":s.getString(1);
 46                         String post_id=s.get(2)==null ? "":Integer.toString(s.getInt(2));
 47                         String title=s.get(3)==null ? "":s.getString(3);
 48                         String author=s.get(4)==null ? "":s.getString(4);
 49                         String country=s.get(5)==null ? "":s.getString(5);
 50                         String category=s.get(6)==null ? "":s.getString(6);
 51                         
 52 
 53                         //String filds[] =s.schema().fieldNames();//獲取列名
 54 
 55 
 56                         //列名需要排序,按順序加入
 57                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "author".getBytes(),  Bytes.toBytes(author))));
 58                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "category".getBytes(),  Bytes.toBytes(category))));
 59                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "country".getBytes(),  Bytes.toBytes(country))));
 60                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "hostname".getBytes(),  Bytes.toBytes(hostname))));
 61                          cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "post_id".getBytes(),  Bytes.toBytes(post_id))));
 62                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "request_date".getBytes(),  Bytes.toBytes(request_date))));
 63                         cols.add(new Tuple2<ImmutableBytesWritable, KeyValue>(new ImmutableBytesWritable(rowkey),new KeyValue(rowkey, family, "title".getBytes(),  Bytes.toBytes(title))));
 64                         return cols.iterator();
 65                 } 
 66             }).sortByKey().saveAsNewAPIHadoopFile(hfile_path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, conf);
 67                 SparkLog.debug("生成hfile文件成功");
 68                 LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
 69                 // 根據表名獲取表
 70                 SparkLog.debug("根據表名獲取表...");
 71                 Table table = conn.getTable(TableName.valueOf(hbasetable));
 72                 SparkLog.debug(table.toString());
 73 
 74                 // 獲取hbase表的region分布
 75                 SparkLog.debug("獲取hbase表的region分布...");
 76                 RegionLocator regionLocator = conn.getRegionLocator(TableName
 77                         .valueOf(hbasetable));
 78                 // 創建一個hadoop的mapreduce的job
 79                 Job job = Job.getInstance(conf);
 80                 // 設置job名稱
 81                 job.setJobName("DumpFile");
 82                 // 此處最重要,需要設置文件輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable
 83                 job.setMapOutputKeyClass(ImmutableBytesWritable.class);
 84                 // 輸出文件的內容KeyValue
 85                 job.setMapOutputValueClass(KeyValue.class);
 86                 // 配置HFileOutputFormat2的信息
 87                 HFileOutputFormat2.configureIncrementalLoad(job, table,
 88                         regionLocator);
 89 
 90                 // 開始導入
 91                 SparkLog.debug("開始導入...");
 92                 load.doBulkLoad(new Path(hfile_path), conn.getAdmin(), table,
 93                         regionLocator);
 94                 // load.doBulkLoad(new Path(path),new , table,regionLocator);
 95                 // load.doBulkLoad(new Path(path), (HTable)table);這個目前也可用
 96                 table.close();
 97 
 98             }
 99 
100         } catch (Throwable e) {
101             return TCResult.newFailureResult("SPARK_ERROR", e);
102         } finally {
103             try {
104                 conn.close();
105             } catch (Throwable e) {
106                 return TCResult.newFailureResult("SPARK_ERROR", e);
107             }
108         }

 1         //讀取hbase
 2         Configuration  conf = sc.hadoopConfiguration(); 
3
if (null != hbase_site_path) { 4 hbase_site_path = hbase_site_path.contains("hbase-site.xml") ? hbase_site_path : hbase_site_path 5 + "/hbase-site.xml"; 6 conf.addResource(new Path(hbase_site_path)); 7 } else { 8 if(zn_parent == null || zn_parent.equals("")){ 9 zn_parent="/hbase"; 10 } 11 conf.set("hbase.zookeeper.quorum", quorum); 12 conf.set("hbase.zookeeper.property.clientPort", zkport); 13 conf.set("zookeeper.znode.parent", zn_parent); 14 15 } 16 conf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "60000"); 17 JavaRDD<String> javardd = null; 18 try { 19 conf.set(TableInputFormat.INPUT_TABLE, tablename); 20 JavaPairRDD<ImmutableBytesWritable, Result> hbRDD = sc 21 .newAPIHadoopRDD(conf, TableInputFormat.class, 22 ImmutableBytesWritable.class, Result.class); 23 24 javardd = hbRDD.values().map(new Function<Result, String>(){ 25 26 /** 27 * 28 */ 29 private static final long serialVersionUID = 1L; 30 31 @Override 32 public String call(Result r) throws Exception { 33 // TODO 自動生成的方法存根 34 String s = ""; 35 for (Cell cell : r.rawCells()) { 36 s += "Rowkey:" 37 + Bytes.toString(CellUtil.cloneRow(cell)) 38 + ",column=" 39 + Bytes.toString(CellUtil.cloneFamily(cell)) 40 + ":" 41 + Bytes.toString( 42 CellUtil.cloneQualifier(cell)) 43 .replaceAll("Quilifier:", "") 44 + ",timestamp=" + cell.getTimestamp() 45 + ",value:" 46 + Bytes.toString(CellUtil.cloneValue(cell)); 47 } 48 return s; 49 } 50 51 }); 52 SparkLog.debug("hbase table records num = " + javardd.count()); 53 54 } catch (Throwable e) { 55 return TCResult.newFailureResult("SPARK_ERROR",e); 56 }

 

//hbase讀取方式2(saveAsNewAPIHadoopDataset && put方式)
String hbasetable = dict.getStringItem("table", "");
                String hbase_site_path = dict.getStringItem("path_site", "");
                //String hfile_path = dict.getStringItem("hfile_path", "");
                Configuration  conf = ss.sparkContext().hadoopConfiguration();
                
                
                if (StringUtils.isBlank(hbase_site_path)) {
                    SparkLog.warn("參數配置錯誤,未配置hbase-site信息!");
                }
                if (StringUtils.isNotBlank(hbase_site_path)) {
                    hbase_site_path = hbase_site_path + (hbase_site_path.contains("hbase-site.xml") ? "" : "/hbase-site.xml");
                    conf.addResource(new Path(hbase_site_path));
                }
                
                SparkLog.debug("讀取hbase信息完成");
                conf.set(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, "60000");
                conf.set(TableOutputFormat.OUTPUT_TABLE, hbasetable);


                Job job = Job.getInstance(conf);
                // 設置job名稱
                job.setJobName("DumpFile");
                // 此處最重要,需要設置文件輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable
                job.setMapOutputKeyClass(ImmutableBytesWritable.class);
                // 輸出文件的內容Result
                job.setMapOutputValueClass(Put.class);
                
                job.setOutputFormatClass((TableOutputFormat.class));
                 
                data.mapToPair(new PairFunction<Row, ImmutableBytesWritable, Put>() {

                private static final long serialVersionUID = -8033772725296906227L;
    
                @Override
                public Tuple2<ImmutableBytesWritable, Put> call(Row s) throws Exception {
                        byte[] rowkey = Bytes.toBytes((new SimpleDateFormat("yyyyMMddHHmmss.sss")).format(System.currentTimeMillis())+Math.random()*100); 
                    
                        byte[] family = Bytes.toBytes("fm");
                        
                        String hostname =s.get(0)==null ? "":s.getString(0);
                        String request_date =s.get(1)==null ? "":s.getString(1);
                        String post_id=s.get(2)==null ? "":Integer.toString(s.getInt(2));
                        String title=s.get(3)==null ? "":s.getString(3);
                        String author=s.get(4)==null ? "":s.getString(4);
                        String country=s.get(5)==null ? "":s.getString(5);
                        String category=s.get(6)==null ? "":s.getString(6);
                        

                        Put put = new Put(rowkey);
                
                        put.addImmutable(family,Bytes.toBytes("hostname"),Bytes.toBytes(hostname));
                        put.addImmutable(family,Bytes.toBytes("request_date"),Bytes.toBytes(request_date));
                        put.addImmutable(family,Bytes.toBytes("post_id"),Bytes.toBytes(post_id));
                        put.addImmutable(family,Bytes.toBytes("title"),Bytes.toBytes(title));
                        put.addImmutable(family,Bytes.toBytes("author"),Bytes.toBytes(author));
                        put.addImmutable(family,Bytes.toBytes("country"),Bytes.toBytes(country));
                        put.addImmutable(family,Bytes.toBytes("category"),Bytes.toBytes(category));
                         
                       return new Tuple2<ImmutableBytesWritable, Put>(new ImmutableBytesWritable(rowkey),put);

                      
                } 
            }).saveAsNewAPIHadoopDataset(job.getConfiguration());

 

 1            //創建hbase表結構
 2             org.apache.hadoop.hbase.client.Connection conn = null;
 3         try {
 4 
 5             //創建hbase的鏈接,利用默認的配置文件,實際上讀取的hbase的master地址
 6             conn = ConnectionFactory.createConnection(conf);
 7             // 根據表名獲取表
 8             HBaseAdmin admin =(HBaseAdmin) conn.getAdmin();
 9 
10             if (!admin.tableExists(tablename)) {  
11                   SparkLog.info("Table Not Exists! Create Table"); 
12                   HTableDescriptor tableDesc = new HTableDescriptor(TableName.valueOf(tablename));
13                   String fms[] =Col_family.split(",");
14                   for(int i=0;i<fms.length;i++){
15                       tableDesc.addFamily(new HColumnDescriptor(fms[i].getBytes()));
16                   }
17                   admin.createTable(tableDesc); 
18                 }else{
19                      SparkLog.info("Table  Exists!  Do not Create Table"); 
20                 }  
21         }catch (Throwable e) {
22             return TCResult.newFailureResult("SPARK_ERROR", e);
23         } finally {
24             try {
25                 conn.close();
26             } catch (Throwable e) {
27                 return TCResult.newFailureResult("SPARK_ERROR", e);
28             }
29         }
 1 // 添加一條數據  
 2 public static void addRow(String tableName, String rowKey, String columnFamily, String column, String value)   
 3         throws IOException {  
 4     // 建立一個數據庫的連接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 獲取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 通過rowkey創建一個put對象  
 9     Put put = new Put(Bytes.toBytes(rowKey));  
10     // 在put對象中設置列族、列、值  
11     put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
12     // 插入數據,可通過put(List<Put>)批量插入  
13     table.put(put);  
14     // 關閉資源  
15     table.close();  
16     conn.close();  
17 }
// 通過rowkey獲取一條數據  
public static void getRow(String tableName, String rowKey) throws IOException {  
    // 建立一個數據庫的連接  
    Connection conn = ConnectionFactory.createConnection(conf);  
    // 獲取表  
    HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
    // 通過rowkey創建一個get對象  
    Get get = new Get(Bytes.toBytes(rowKey));  
    // 輸出結果  
    Result result = table.get(get);  
    for (Cell cell : result.rawCells()) {  
        System.out.println(  
                "行鍵:" + new String(CellUtil.cloneRow(cell)) + "\t" +  
                "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +   
                "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +   
                "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +  
                "時間戳:" + cell.getTimestamp());  
    }  
    // 關閉資源  
    table.close();  
    conn.close();  
}
 1 // 全表掃描  
 2     public static void scanTable(String tableName) throws IOException {  
 3         // 建立一個數據庫的連接  
 4         Connection conn = ConnectionFactory.createConnection(conf);  
 5         // 獲取表  
 6         HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7         // 創建一個掃描對象  
 8         Scan scan = new Scan();  
 9         // 掃描全表輸出結果  
10         ResultScanner results = table.getScanner(scan);  
11         for (Result result : results) {  
12             for (Cell cell : result.rawCells()) {  
13                 System.out.println(  
14                         "行鍵:" + new String(CellUtil.cloneRow(cell)) + "\t" +  
15                         "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +   
16                         "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +   
17                         "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +  
18                         "時間戳:" + cell.getTimestamp());  
19             }  
20         }  
21         // 關閉資源  
22         results.close();  
23         table.close();  
24         conn.close();  
25 }
 1 // 刪除一條數據  
 2 public static void delRow(String tableName, String rowKey) throws IOException {  
 3     // 建立一個數據庫的連接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 獲取表  
 6     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7     // 刪除數據  
 8     Delete delete = new Delete(Bytes.toBytes(rowKey));  
 9     table.delete(delete);  
10     // 關閉資源  
11     table.close();  
12     conn.close();  
13 }
 1 // 刪除多條數據  
 2 public static void delRows(String tableName, String[] rowkeys) throws IOException {  
 3     // 建立一個數據庫的連接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 獲取表  
 6     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7     // 刪除多條數據  
 8     List<Delete> list = new ArrayList<Delete>();  
 9     for (String row : rowkeys) {  
10         Delete delete = new Delete(Bytes.toBytes(row));  
11         list.add(delete);  
12     }  
13     table.delete(list);  
14     // 關閉資源  
15     table.close();  
16     conn.close();  
17 }
 1 // 刪除列族  
 2 public static void delColumnFamily(String tableName, String columnFamily) throws IOException {  
 3     // 建立一個數據庫的連接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 創建一個數據庫管理員  
 6     HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
 7     // 刪除一個表的指定列族  
 8     hAdmin.deleteColumn(tableName, columnFamily);  
 9     // 關閉資源  
10     conn.close();  
11 }
 1 // 刪除數據庫表  
 2 public static void deleteTable(String tableName) throws IOException {  
 3     // 建立一個數據庫的連接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 創建一個數據庫管理員  
 6     HBaseAdmin hAdmin = (HBaseAdmin) conn.getAdmin();  
 7     if (hAdmin.tableExists(tableName)) {  
 8         // 失效表  
 9         hAdmin.disableTable(tableName);  
10         // 刪除表  
11         hAdmin.deleteTable(tableName);  
12         System.out.println("刪除" + tableName + "表成功");  
13         conn.close();  
14     } else {  
15         System.out.println("需要刪除的" + tableName + "表不存在");  
16         conn.close();  
17         System.exit(0);  
18     }  
19 }
 1 // 追加插入(將原有value的后面追加新的value,如原有value=a追加value=bc則最后的value=abc)  
 2 public static void appendData(String tableName, String rowKey, String columnFamily, String column, String value)   
 3         throws IOException {  
 4     // 建立一個數據庫的連接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 獲取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 通過rowkey創建一個append對象  
 9     Append append = new Append(Bytes.toBytes(rowKey));  
10     // 在append對象中設置列族、列、值  
11     append.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
12     // 追加數據  
13     table.append(append);  
14     // 關閉資源  
15     table.close();  
16     conn.close();  
17 }
 1 // 符合條件后添加數據(只能針對某一個rowkey進行原子操作)  
 2 public static boolean checkAndPut(String tableName, String rowKey, String columnFamilyCheck, String columnCheck, String valueCheck, String columnFamily, String column, String value) throws IOException {  
 3     // 建立一個數據庫的連接  
 4     Connection conn = ConnectionFactory.createConnection(conf);  
 5     // 獲取表  
 6     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 7     // 設置需要添加的數據  
 8     Put put = new Put(Bytes.toBytes(rowKey));  
 9     put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));  
10     // 當判斷條件為真時添加數據  
11     boolean result = table.checkAndPut(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck),   
12             Bytes.toBytes(columnCheck), Bytes.toBytes(valueCheck), put);  
13     // 關閉資源  
14     table.close();  
15     conn.close();  
16       
17     return result;  
18 }
 1 // 符合條件后刪除數據(只能針對某一個rowkey進行原子操作)  
 2 public static boolean checkAndDelete(String tableName, String rowKey, String columnFamilyCheck, String columnCheck,   
 3         String valueCheck, String columnFamily, String column) throws IOException {  
 4     // 建立一個數據庫的連接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 獲取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 設置需要刪除的delete對象  
 9     Delete delete = new Delete(Bytes.toBytes(rowKey));  
10     delete.addColumn(Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck));  
11     // 當判斷條件為真時添加數據  
12     boolean result = table.checkAndDelete(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamilyCheck), Bytes.toBytes(columnCheck),   
13             Bytes.toBytes(valueCheck), delete);  
14     // 關閉資源  
15     table.close();  
16     conn.close();  
17   
18     return result;  
19 }
 1 // 計數器(amount為正數則計數器加,為負數則計數器減,為0則獲取當前計數器的值)  
 2 public static long incrementColumnValue(String tableName, String rowKey, String columnFamily, String column, long amount)   
 3         throws IOException {  
 4     // 建立一個數據庫的連接  
 5     Connection conn = ConnectionFactory.createConnection(conf);  
 6     // 獲取表  
 7     HTable table = (HTable) conn.getTable(TableName.valueOf(tableName));  
 8     // 計數器  
 9     long result = table.incrementColumnValue(Bytes.toBytes(rowKey), Bytes.toBytes(columnFamily), Bytes.toBytes(column), amount);  
10     // 關閉資源  
11     table.close();  
12     conn.close();  
13       
14     return result;  
15 }

 

內置過濾器的使用

HBase為篩選數據提供了一組過濾器,通過這個過濾器可以在HBase中數據的多個維度(行、列、數據版本)上進行對數據的篩選操作,也就是說過濾器最終能夠篩選的數據能夠細化到具體的一個存儲單元格上(由行鍵、列名、時間戳定位)。通常來說,通過行鍵、值來篩選數據的應用場景較多。需要說明的是,過濾器會極大地影響查詢效率。所以,在數據量較大的數據表中,應盡量避免使用過濾器。

下面介紹一些常用的HBase內置過濾器的用法:

1、RowFilter:篩選出匹配的所有的行。使用BinaryComparator可以篩選出具有某個行鍵的行,或者通過改變比較運算符(下面的例子中是CompareFilter.CompareOp.EQUAL)來篩選出符合某一條件的多條數據,如下示例就是篩選出行鍵為row1的一行數據。

1 // 篩選出匹配的所有的行  
2 Filter rf = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("row1")));   

2、PrefixFilter:篩選出具有特定前綴的行鍵的數據。這個過濾器所實現的功能其實也可以由RowFilter結合RegexComparator來實現,不過這里提供了一種簡便的使用方法,如下示例就是篩選出行鍵以row為前綴的所有的行。

// 篩選匹配行鍵的前綴成功的行  
Filter pf = new PrefixFilter(Bytes.toBytes("row"));  

3、KeyOnlyFilter:這個過濾器唯一的功能就是只返回每行的行鍵,值全部為空,這對於只關注於行鍵的應用場景來說非常合適,這樣忽略掉其值就可以減少傳遞到客戶端的數據量,能起到一定的優化作用。

// 返回所有的行鍵,但值全是空  
Filter kof = new KeyOnlyFilter();  

4、RandomRowFilter:按照一定的幾率(<=0會過濾掉所有的行,>=1會包含所有的行)來返回隨機的結果集,對於同樣的數據集,多次使用同一個RandomRowFilter會返回不同的結果集,對於需要隨機抽取一部分數據的應用場景,可以使用此過濾器。

// 隨機選出一部分的行  
Filter rrf = new RandomRowFilter((float) 0.8);     

5、InclusiveStopFilter:掃描的時候,我們可以設置一個開始行鍵和一個終止行鍵,默認情況下,這個行鍵的返回是前閉后開區間,即包含起始行,但不包含終止行。如果我們想要同時包含起始行和終止行,那么可以使用此過濾器。

// 包含了掃描的上限在結果之內  
Filter isf = new InclusiveStopFilter(Bytes.toBytes("row1"));   

6、FirstKeyOnlyFilter:如果想要返回的結果集中只包含第一列的數據,那么這個過濾器能夠滿足要求。它在找到每行的第一列之后會停止掃描,從而使掃描的性能也得到了一定的提升。

// 篩選出每行的第一個單元格  
Filter fkof = new FirstKeyOnlyFilter();     

7、ColumnPrefixFilter:它按照列名的前綴來篩選單元格,如果我們想要對返回的列的前綴加以限制的話,可以使用這個過濾器。

// 篩選出前綴匹配的列  
Filter cpf = new ColumnPrefixFilter(Bytes.toBytes("qual1"));    

8、ValueFilter:按照具體的值來篩選單元格的過濾器,這會把一行中值不能滿足的單元格過濾掉,如下面的構造器,對於每一行的一個列,如果其對應的值不包含ROW2_QUAL1,那么這個列就不會返回給客戶端。

 

// 篩選某個(值的條件滿足的)特定的單元格  
Filter vf = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("ROW2_QUAL1")); 

9、ColumnCountGetFilter:這個過濾器在遇到一行的列數超過我們所設置的限制值的時候,結束掃描操作。

// 如果突然發現一行中的列數超過設定的最大值時,整個掃描操作會停止  
Filter ccf = new ColumnCountGetFilter(2);    

10、SingleColumnValueFilter:用一列的值決定這一行的數據是否被過濾,可對它的對象調用setFilterIfMissing方法,默認的參數是false。其作用是,對於咱們要使用作為條件的列,如果參數為true,這樣的行將會被過濾掉,如果參數為false,這樣的行會包含在結果集中。

復制代碼
// 將滿足條件的列所在的行過濾掉  
SingleColumnValueFilter scvf = new SingleColumnValueFilter(    
•          Bytes.toBytes("colfam1"),     
•          Bytes.toBytes("qual2"),     
•          CompareFilter.CompareOp.NOT_EQUAL,     
•          new SubstringComparator("BOGUS"));    
scvf.setFilterIfMissing(true);  
復制代碼

11、SingleColumnValueExcludeFilter:這個過濾器與第10種過濾器唯一的區別就是,作為篩選條件的列,其行不會包含在返回的結果中。

12、SkipFilter:這是一種附加過濾器,其與ValueFilter結合使用,如果發現一行中的某一列不符合條件,那么整行就會被過濾掉。

// 發現某一行中的一列需要過濾時,整個行就會被過濾掉  
Filter skf = new SkipFilter(vf);  

13、WhileMatchFilter:使用這個過濾器,當遇到不符合設定條件的數據的時候,整個掃描結束。

// 當遇到不符合過濾器rf設置的條件時,整個掃描結束  
Filter wmf = new WhileMatchFilter(rf);  

14. FilterList:可以用於綜合使用多個過濾器。其有兩種關系: Operator.MUST_PASS_ONE表示關系AND,Operator.MUST_PASS_ALL表示關系OR,並且FilterList可以嵌套使用,使得我們能夠表達更多的需求。

// 綜合使用多個過濾器,AND和OR兩種關系  
List<Filter> filters = new ArrayList<Filter>();    
filters.add(rf);    
filters.add(vf);    
FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL,filters);  

下面給出一個使用RowFilter過濾器的完整示例:

復制代碼
public class HBaseFilter {  
      
    private static final String TABLE_NAME = "table1";  
  
    public static void main(String[] args) throws IOException {  
        // 設置配置  
        Configuration conf = HBaseConfiguration.create();  
        conf.set("hbase.zookeeper.quorum", "localhost");  
        conf.set("hbase.zookeeper.property.clientPort", "2181");  
        // 建立一個數據庫的連接  
        Connection conn = ConnectionFactory.createConnection(conf);  
        // 獲取表  
        HTable table = (HTable) conn.getTable(TableName.valueOf(TABLE_NAME));  
        // 創建一個掃描對象  
        Scan scan = new Scan();  
        // 創建一個RowFilter過濾器  
        Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("abc")));  
        // 將過濾器加入掃描對象  
        scan.setFilter(filter);  
        // 輸出結果  
        ResultScanner results = table.getScanner(scan);  
        for (Result result : results) {  
            for (Cell cell : result.rawCells()) {  
                System.out.println(  
                        "行鍵:" + new String(CellUtil.cloneRow(cell)) + "\t" +  
                        "列族:" + new String(CellUtil.cloneFamily(cell)) + "\t" +   
                        "列名:" + new String(CellUtil.cloneQualifier(cell)) + "\t" +   
                        "值:" + new String(CellUtil.cloneValue(cell)) + "\t" +  
                        "時間戳:" + cell.getTimestamp());  
            }  
        }  
        // 關閉資源  
        results.close();  
        table.close();  
        conn.close();  
          
    }  
  
}   
復制代碼

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM