[How to] 使用HBase協處理器---Endpoint服務端的實現


1.簡介

  前篇文章[How to] 使用HBase協處理器---基本概念和regionObserver的簡單實現中提到了兩種不同的協處理器,並且實現了regionObserver。

本文將介紹如何使用EndPoint協處理器類型。

  與Observer類型不同的是,Endpoint協處理器需要與服務區直接通信,服務端是對於Protobuf Service的實現,所以兩者直接會有一個機遇protocl的RPC接口,客戶端和服務端都需要進行基於接口的代碼邏輯實現。

 

2.Endpoint的服務端實現

  如前所述,Endpoint類比於數據庫中的存儲過程,他出發服務端的基於region的同步運行,再將各個結果在客戶端搜集后歸並計算。特點類似於傳統的MR框架,在服務端MAP在客戶端Reduce。相對於Observer來說開發難度大一點。

  

  需求確定:

  和Observer一樣,在完成實現前先來了解需求,本次樣例完成的功能是機遇前篇博客的表

1.可以統計表coprocessor_table下某字段字段的值總和
2.可以統計表coprocessor_table的記錄行數  

  用 Protobuf 編寫和定義 RPC:

  如前所述,客戶端和服務端之間存在RPC通信,所以兩者間需要確定接口,HBase的協處理器是通過Protobuf協議來實現數據交換的,所以需要通過Protobuf來定義接口。

option java_package = "cn.com.newbee.feng.Statistics.protointerface";
 
option java_outer_classname = "MyStatisticsInterface";
option java_generic_services = true;
option optimize_for = SPEED;
 
message getStatisticsRequest{
 required string type = 1;
 optional string famillyName = 2;
 optional string columnName = 3;
}
 
message getStatisticsResponse {
 optional int64 result = 1;
}
 
 
service myStatisticsService {
 rpc getStatisticsResult(getStatisticsRequest)
 returns(getStatisticsResponse);
}

  上述文本中定義了一個protolbuf的服務接口myStatisticsService,這個接口提供一個方法叫做getStatisticsResult,方法的請求被包裝在getStatisticsRequest中,其中type計算類型參數是必須的,而列族好列名是不必須的,因為根據需求到計算類型為計算記錄行數的時候並不需要這兩個參數。返回值被包裝在getStatisticsResponse,根據需求返回整數值即可。

上述文件被保存在以.proto結尾的文本文件中。

  編譯接口文件

  當接口文件確定和產生后需要使用protol編譯器進行編譯,我們的運行環境是jdk,所以將其編譯成java代碼。使用如下命令:

protoc --java_out=/Users/apple/Desktop/workspace/Test-HBase-Endpoint/src statistics.proto

  在編譯時候遇到一個坑:在我的環境中必須要到proto文件所在的目錄才能執行上述語句。

  編譯完成后將會在option java_package下生成option java_outer_classname指定的類。此類不要進行任何的修改。后續都是圍繞這個接口類進行代碼的實現。

  

 

 

  服務端代碼實現

  所謂基於框架的功能實現無非就是兩點:

     1. 按照要求搭建框架代碼

   2. 基於框架代碼填充自己的業務邏輯

 

  Endpoint的基本框架代碼我剝離出來如下:需要領悟的是每一個Endpoint的協處理器是運行在單個region上的,所以每一個結算結果都是對於單個region的數據范圍的計算結果。

public class Endpoint類名稱 extends
        protol類.服務接口名稱 implements Coprocessor,
        CoprocessorService {
  // 單個region的上下文環境信息
    private RegionCoprocessorEnvironment envi;

    // rpc服務,返回本身即可,因為此類實例就是一個服務實現
    @Override
    public Service getService() {
        return this;
    }

    // 協處理器是運行於region中的,每一個region都會加載協處理器
    // 這個方法會在regionserver打開region時候執行(還沒有真正打開)
    @Override
    public void start(CoprocessorEnvironment env) throws IOException {// 需要檢查當前環境是否在region上
        if (env instanceof RegionCoprocessorEnvironment) {
            this.envi = (RegionCoprocessorEnvironment) env;

        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }

    }

    // 這個方法會在regionserver關閉region時候執行(還沒有真正關閉)
    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {// nothing to do

    }

    // 服務端(每一個region上)的接口實現方法
    // 第一個參數是固定的,其余的request參數和response參數是proto接口文件中指明的。
    @Override
    public void 服務接口方法(RpcController controller,
            服務接口參數 request,
            RpcCallback<服務接口response> done) {// 單個region上的計算結果值,根據需要實現
        int result = 0;

        // 定義返回response

     Endpoint類名稱.返回response.Builder responseBuilder = Endpoint類名稱.返回response .newBuilder();     
     // 計算結果值邏輯代碼

     // 返回結果值
        responseBuilder.setResult(result);
        done.run(responseBuilder.build());
        return;
    }

}

 

  根據框架代碼,根據需求的、來填充完善業務邏輯,就本次需求的具體實現代碼如下:

/**
 * 統計行數和統計指定列值加和的Endpoint實現
 * 
 * @author newbee-feng
 *
 */
public class MyStatisticsEndpoint extends
        MyStatisticsInterface.myStatisticsService implements Coprocessor,
        CoprocessorService {
    private static final Log LOG = LogFactory.getLog(MyStatisticsEndpoint.class);

    private static final String STATISTICS_COUNT = "COUNT";

    private static final String STATISTICS_SUM = "SUM";

    // 單個region的上下文環境信息
    private RegionCoprocessorEnvironment envi;

    // rpc服務,返回本身即可,因為此類實例就是一個服務實現
    @Override
    public Service getService() {
        return this;
    }

    // 協處理器是運行於region中的,每一個region都會加載協處理器
    // 這個方法會在regionserver打開region時候執行(還沒有真正打開)
    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        LOG.info("MyStatisticsEndpoint start");
        // 需要檢查當前環境是否在region上
        if (env instanceof RegionCoprocessorEnvironment) {
            this.envi = (RegionCoprocessorEnvironment) env;

        } else {
            throw new CoprocessorException("Must be loaded on a table region!");
        }

    }

    // 這個方法會在regionserver關閉region時候執行(還沒有真正關閉)
    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        LOG.info("MyStatisticsEndpoint stop");
        // nothing to do

    }

    // 服務端(每一個region上)的接口實現方法
    // 第一個參數是固定的,其余的request參數和response參數是proto接口文件中指明的。
    @Override
    public void getStatisticsResult(RpcController controller,
            getStatisticsRequest request,
            RpcCallback<getStatisticsResponse> done) {
        LOG.info("MyStatisticsEndpoint##getStatisticsResult call");
        // 單個region上的計算結果值
        int result = 0;

        // 定義返回response
        MyStatisticsInterface.getStatisticsResponse.Builder responseBuilder = MyStatisticsInterface.getStatisticsResponse
                .newBuilder();

        // type就是在proto中定義參數字段,如果有多個參數字段可以都可以使用request.getXxx()來獲取
        String type = request.getType();

        // 查看當前需要做和計算
        if (null == type) {
            LOG.error("the type is null");
            responseBuilder.setResult(result);
            done.run(responseBuilder.build());
            return;
        }

        // 當進行行數統計的時
        if (STATISTICS_COUNT.equals(type)) {
            LOG.info("MyStatisticsEndpoint##getStatisticsResult call " + STATISTICS_COUNT);
            InternalScanner scanner = null;
            try {
                Scan scan = new Scan();
                scanner = this.envi.getRegion().getScanner(scan);
                List<Cell> results = new ArrayList<Cell>();
                boolean hasMore = false;

                do {
                    hasMore = scanner.next(results);
                    result++;
                } while (hasMore);
            } catch (IOException ioe) {
                LOG.error("error happend when count in "
                        + this.envi.getRegion().getRegionNameAsString()
                        + " error is " + ioe);
            } finally {
                if (scanner != null) {
                    try {
                        scanner.close();
                    } catch (IOException ignored) {
                        // nothing to do 
                    }
                }
            }
            
        }
        // 當進行指定列值統計的時候
        else if (STATISTICS_SUM.equals(type)) {
            LOG.info("MyStatisticsEndpoint##getStatisticsResult call " + STATISTICS_SUM);
            // 此時需要去檢查客戶端是否指定了列族和列名
            String famillyName = request.getFamillyName();
            String columnName = request.getColumnName();

            // 此條件下列族和列名是必須的
            if (!StringUtils.isBlank(famillyName)
                    && !StringUtils.isBlank(columnName)) {
                
                InternalScanner scanner = null;
                try {
                    Scan scan = new Scan();
                    scan.addColumn(Bytes.toBytes(famillyName), Bytes.toBytes(columnName));
                    scanner = this.envi.getRegion().getScanner(scan);
                    List<Cell> results = new ArrayList<Cell>();
                    boolean hasMore = false;
                    do {
                        hasMore = scanner.next(results);
                        if (results.size() == 1)
                        {
                            // 按行讀取數據,並進行加和操作
                            result = result + Integer.valueOf(Bytes.toString(CellUtil.cloneValue(results.get(0))));
                        }
                        
                        results.clear();
                    } while (hasMore);
                    
                } catch (Exception e) {
                    LOG.error("error happend when count in "
                            + this.envi.getRegion().getRegionNameAsString()
                            + " error is " + e);
                } finally {
                    if (scanner != null) {
                        try {
                            scanner.close();
                        } catch (IOException ignored) {
                            // nothing to do 
                        }
                    }
                }
                

            } else {
                LOG.error("did not specify the famillyName or columnName!");
            }

        }
        // 如果不在此范圍內則不計算
        else {
            LOG.error("the type is not match!");
        }
        
        LOG.info("MyStatisticsEndpoint##getStatisticsResult call end");

        responseBuilder.setResult(result);
        done.run(responseBuilder.build());
        return;

    }

}

   以上代碼的邏輯實現請參考代碼內注釋。需要注意的是我們可以使用log4j進行日志的打印,這些日志會被服務端程序打印,便於定位問題。

 

  部署Endpoint協處理器

  Endpoint的部署和Observer部署一樣,在這里我還是使用shell命令行的方式進行部署。

  首先需要將Endpoint實現打成jar,然后上傳至HBase所在的環境的hdfs中,在hbase shell中執行如下命令:

 

hbase(main):033:0> alter 'coprocessor_table' , METHOD =>'table_att','coprocessor'=>'hdfs://ns1/testdata/Test-HBase-Endpoint.jar|cn.com.newbee.feng.MyStatisticsEndpoint|1002'
Updating all regions with the new schema...
0/1 regions updated.
1/1 regions updated.
Done.
0 row(s) in 2.2770 seconds

  檢查是否加載成功:現在表上連同之前的Observer協處理器,一共有兩個協處理器存在。

hbase(main):034:0> describe
describe             describe_namespace
hbase(main):034:0> describe 'coprocessor_table'
Table coprocessor_table is ENABLED                                               
coprocessor_table, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs://ns1/testdata/Te
st-HBase-Observer.jar|cn.com.newbee.feng.MyRegionObserver|1001', coprocessor$2 =>
 'hdfs://ns1/testdata/Test-HBase-Endpoint.jar|cn.com.newbee.feng.MyStatisticsEndp
oint|1002'}                                                                      
COLUMN FAMILIES DESCRIPTION                                                      
{NAME => 'F', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLICATION_SC
OPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '
FOREVER', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE => '65536', IN_MEMORY => 'fals
e', BLOCKCACHE => 'true'}                                                        
1 row(s) in 0.0210 seconds

   

 3. 總結

  服Endpoint協處理器如前所述,是運行於region上的,換句話你說每一個加載了Endpoint協處理器的region即是一個RPC的服務端。Endpoint的業務接口中的數據只限於計算所在region的數據,

客戶端可以向不同region獲取服務,計算結果並在服務端將結果合並。當然向哪些region請求服務也是可以控制的,后續在介紹Endpoint客戶端代碼實現的時候介紹。

  

參考:

  http://www.ibm.com/developerworks/cn/opensource/os-cn-hbase-coprocessor1/index.html

代碼下載:

  https://github.com/xufeng79x/Test-HBase-Endpoint

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM