2、通過HBase API進行開發


一、將HBase的jar包及hbase-site.xml添加到IDE

1、到安裝HBase集群的任意一台機器上找到HBase的安裝目錄,到lib目錄下下載HBase需要的jar包,然后再到conf目錄下下載hbase-site.xml

2、在ide中新建一個java項目,然后再右擊"項目名",新建2個文件夾,分別是"lib""conf"

3、將1步驟中下載的jar包放到2步驟中的lib目錄下,並且將hbase-site.xml放到conf目錄下,並將2個文件夾添加到classpath下。

二、使用HBase的基本API操作HBase

通過編碼(java)的形式對HBase進行一系列的管理涉及到對表的管理、數據的操作等。

下面這段是公共代碼片段:

private static Configuration conf = null;
static {
  Configuration HBASE_CONFIG = new Configuration();
  // 與hbase/conf/hbase-site.xml 中 hbase.zookeeper.quorum 配置的值相同
  HBASE_CONFIG.set("hbase.zookeeper.quorum", "c7004,c7003,c7005");
  // 與hbase/conf/hbase-site.xml 中 hbase.zookeeper.property.clientPort 配置的值相同
  HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
  conf = HBaseConfiguration.create(HBASE_CONFIG);
}

 

1、對表的創建、刪除、顯示以及修改等,可以用Admin, 一旦創建了表,那么可以通過HTable的 實例來訪問表,每次可以往表里增加數據。

/**
* 創建表
* @param tableName 表名稱
* @param familys   列族
* @param force     是否強制建表
* @throws Exception
*/
public static void creatTable(String tableName, String[] familys,boolean force) throws Exception {
  //建立連接
  Connection conn = ConnectionFactory.createConnection(conf);
  //表管理類
  Admin admin = conn.getAdmin();
  if (admin.tableExists(TableName.valueOf(tableName))) {
    if(force){
      //禁用表
      admin.disableTable(TableName.valueOf(tableName));
      //刪除表
      admin.deleteTable(TableName.valueOf(tableName));
      System.out.println("開始創建表!");
    }else{
      System.out.println("表已存在!");
    }
  } else {
    //定義表名
    HTableDescriptor tblDesc = new HTableDescriptor(TableName.valueOf(tableName));
    for (int i = 0; i < familys.length; i++) {
      //定義列族
      HColumnDescriptor clmDesc = new HColumnDescriptor(familys[i]);
      //將列族添加到表中
      tblDesc.addFamily(clmDesc);
    }
    //執行建表
    admin.createTable(tblDesc);
    System.out.println("創建表" + tableName + "成功!");
  }
  //關閉表管理
  admin.close();
  //關閉連接
  conn.close();
}

2、插入數據

創建一個Put對象,在這個Put對象里可以指定要給哪個列增加數據,以及當前的時間戳等值,然后通過調用HTable.put(Put)來 提交操作,子猴在這里提請注意的是:在創建Put對象的時候,你必須指定一個行(Row)值,在構造Put對象的時候作為參數傳入。

/**
* 向表中插入數據
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @param value
* @throws IOException 
*/
public static void insertData2Tbl(String tableName,String rowKey,String family,String qualifier,String value) throws IOException{
  //建立連接
  Connection conn = ConnectionFactory.createConnection(conf);
  //建立表連接
  Table tbl=conn.getTable(TableName.valueOf(tableName));
  //用行鍵實例化Put
  Put put=new Put(rowKey.getBytes());
  //指定列族名、列名和值
  put.addColumn(family.getBytes(), qualifier.getBytes(), value.getBytes());
  //執行put
  tbl.put(put);
  //關閉表
  tbl.close();
  //關閉連接
  conn.close();
}

3、獲取數據

要獲取數據,使用Get對 象,Get對象同Put對象一樣有好幾個構造函數,通常在構造的時候傳入行值,表示取第幾行的數據,通過HTable.get(Get)來 調用。

/**
* 從表中取值
* @param tableName  表名
* @param rowKey     行鍵
* @param family     列族
* @param qualifier  列
* @return
* @throws IOException
*/
public static String readData4Tbl(String tableName,String rowKey,String family,String qualifier) throws IOException{
  //建立連接
  Connection conn = ConnectionFactory.createConnection(conf);
  //建立表連接
  Table tbl=conn.getTable(TableName.valueOf(tableName));
  //用行鍵實例化Get
  Get get=new Get(rowKey.getBytes());
  //增加列族名和列名條件
  get.addColumn(family.getBytes(),qualifier.getBytes() );
  //執行,返回結果
  Result result=tbl.get(get);
  //取出結果
  String valStr=Bytes.toString(result.getValue(family.getBytes(), qualifier.getBytes()));
  //關閉表
  tbl.close();
  //關閉連接
  conn.close();
  return valStr;
}

4、掃描

通過Scan可以對表中的行鍵范圍進行瀏覽,得到每一行的信息,比如列名,時間戳等,Scan 相當於一個游標,通過next()來瀏覽下一個,通過調用HTable.getScanner(Scan) 來返回一個ResultScanner對象。HTable.get(Get)HTable.getScanner(Scan) 都是返回一個Result Result是一個KeyValue的鏈表。

/**
* 掃描行鍵范圍取值
* @param tableName   表名
* @param startRow    起始行鍵
* @param stopRow     結束行鍵
* @param family      列族
* @param qualifier   列
* @throws IOException
*/
public static void scanRowRange4Tbl(String tableName,String startRow,String stopRow,String family,String qualifier) throws IOException{
  //建立連接
  Connection conn = ConnectionFactory.createConnection(conf);
  //建立表連接
  Table tbl=conn.getTable(TableName.valueOf(tableName));
  //初始化Scan實例
  Scan scan=new Scan();
  //指定開始行鍵
  scan.setStartRow(startRow.getBytes());
  //指定結束行鍵
  scan.setStopRow(stopRow.getBytes());
  //增加過濾條件
  scan.addColumn(family.getBytes(), qualifier.getBytes());
  //返回結果 
  ResultScanner rss=tbl.getScanner(scan);
  //迭代並取出結果
  for(Result rs:rss){
    String valStr=Bytes.toString(rs.getValue(family.getBytes(), qualifier.getBytes()));
    System.out.println(valStr);
  }
  //關閉表
  tbl.close();
  //關閉連接
  conn.close();
}

5、刪除

使用Delete來 刪除記錄,通過調用HTable.delete(Delete)來 執行刪除操作。(注:刪除這里有些特別,也就是刪除並不是馬上將數據從表中刪除。)

/**
* 刪除行鍵
* @param tableName
* @param rowKey
* @throws IOException
*/
public static void delRowKey4Tbl(String tableName, String rowKey) throws IOException {
  // 建立連接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 建立表連接
  Table tbl = conn.getTable(TableName.valueOf(tableName));
  // 用行鍵來實例化Delete實例
  Delete del = new Delete(rowKey.getBytes());
  // 執行刪除
  tbl.delete(del);
  // 關閉表
  tbl.close();
  // 關閉連接
  conn.close();
}

/**
* 刪除單元格(即列)
* @param tableName
* @param rowKey
* @param family
* @param qualifier
* @throws IOException
*/
public static void delCell4Tbl(String tableName, String rowKey, String family, String qualifier) throws IOException {
  // 建立連接
  Connection conn = ConnectionFactory.createConnection(conf);
  // 建立表連接
  Table tbl = conn.getTable(TableName.valueOf(tableName));
  // 用行鍵來實例化Delete實例
  Delete del = new Delete(rowKey.getBytes());
  // 增加過濾條件
  del.addColumn(family.getBytes(), qualifier.getBytes());
  // 執行刪除
  tbl.delete(del);
  // 關閉表
  tbl.close();
  // 關閉連接
  conn.close();
}

6、鎖

7、新增、獲取、刪除在操作過程中會對所操作的行加一個鎖,而瀏覽卻不會。

8、簇(cluster)的訪問

客戶端代碼通過ZooKeeper來訪問找到簇,也就是說ZooKeeper quorum將被使用,那么相關的類(包)應該在客戶端的類(classes)目錄下,即客戶端一定要找到文件hbase-site.xml

三、Hbase的高級API

HBase高級API主要分為三類:過濾器、計數器和協處理器。

1、過濾器

在設置Scal、Get的時候有一個setFilter(filter),可以在查詢時添加更多的限制條件,如正則匹配、根據列值進行匹配等。HBase內置了一些常用過濾器,用戶也可以通過實現Filter接口,編寫自己的過濾器。

過濾器是服務器端的操作。它在客戶端被創建,通過RPC傳送到服務器端,然后在服務器端進行過濾操作。

HBase內置常用過濾器如下:

  • 行過濾器(RowFilter):基於行鍵過濾數據。

Filter filter=new RowFilter(CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("row-100")));

  • 前綴過濾器(PrefixFilter):所有與前綴匹配的行都會返回給客戶端。

Filter filter=new PrefixFilter(Bytes.toBytes("1990"));

  • 首次行鍵過濾器(FirstKeyOnlyFilter):在找到所有行第一列的值時,就會返回數據。

Filter filter=new FirstKeyOnlyFilter();

  • 單列值過濾器(SingleColumnValueFilter):根據某列的值進行過濾。

Filter filter=new SingleColumnValueFilter(Bytes.toBytes("cf"),Bytes.toBytes("qual"),CompareOp.LESS_OR_EQUAL,new BinaryComparator(Bytes.toBytes("value-100")));

2、計數器

由於HBase沒有二級索引,故統計功能相對較弱。為應對此種情況,HBase使用計數器(counter)用於實時統計的業務場景下。

計數器實際時HBase表中某一列的值,當進行寫操作時,使用Table類的API對該列加1即可。計數器有兩種:單計數據器和多計數器。

  • 單計數據器:用戶自己設定計數器的行、列族和列。

// 建立表連接
Table tbl = conn.getTable(TableName.valueOf(tableName));
// 計數器增加1
long a=tbl.incrementColumnValue(
  Bytes.toBytes("row-count"), 
  Bytes.toBytes("info"), 
  Bytes.toBytes("q1"), 1);//比0大,則按給定值增加計數器的數值,若比0小,則按值減少計數器中的值
// 返回計數器當前值
long b=tbl.incrementColumnValue(
  Bytes.toBytes("row-count"), 
  Bytes.toBytes("info"), 
  Bytes.toBytes("q1"), 0);//0表示返回當前計數器的值

  • 多計數器:允許用戶同時更新多個計數器的值,但這些計數器的值都必須處於同一行。

//用行鍵初始化計數器
Increment increment=new Increment(Bytes.toBytes("row-count"));
//添加多個列
increment.addColumn(Bytes.toBytes("info"), Bytes.toBytes("q2"), 1);
increment.addColumn(Bytes.toBytes("info"), Bytes.toBytes("q3"), 1);
Result result=tbl.increment(increment);
// 打印計數器返回的結果
for(KeyValue kv:result.raw()){
  System.out.println("KV:"+kv+" Value:"+Bytes.toLong(kv.getValue()));
}

3、協處理器

協處理器(coprocessor)允許用戶在RegionServer上運行自己的代碼。協處理器框架主要有Observer和Endpoint兩大類,用戶可以繼承這些類實現自己的邏輯。

Observer:類似於RDMS的觸發器,可以重寫一些在特定事件發生時執行的回調函數。RegionObserverMasterObserverWALObserver三種。RegionObserver可以被用來處理數據修改事件,它發生的地點是Region;MasterObserver可以被用來管理表,如新定義表;WALObserver提供了控制WAL的回調函數。

示例:Observer實現二級索引

import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

public class TestCoprocessor extends BaseRegionObserver {

  @Override
  public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability)throws IOException {
    Configuration conf=new Configuration();
    Connection conn=ConnectionFactory.createConnection(conf);
    //索引表
    Table tbl=conn.getTable(TableName.valueOf("idx_tbl"));
    //取出要插入的數據
    List<Cell> cells=put.get("cf".getBytes(), "info".getBytes());
    Iterator<Cell> kvIt=cells.iterator();
    while(kvIt.hasNext()){
      Cell tmp=kvIt.next();
      //用值作為行鍵
      Put idxPut=new Put(tmp.getValue());
      idxPut.add("cf".getBytes(),tmp.getRow(),Bytes.toBytes(System.currentTimeMillis()));
      //插入索引表
      tbl.put(idxPut);
    }
    tbl.close();
    conn.close();
  }

}

Endpoint:類似於RDMS的存儲過程,允許用戶將自定義操作添加到服務端。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM