HDP2.4安裝系列介紹了通過ambari創建hbase集群的過程,但工作中一直采用.net的技術路線,如何去訪問基於Java搞的Hbase呢? Hbase提供基於Java的本地API訪問,同時擴展了通過 Thrift、Rest 實現Web訪問的API。 so 決定開發基於.net的 sdk,通過其提供的 rest webAPI 來訪問 hbase, 其中c#與Java的數據交互過程采用protobuf協議。
目錄:
- 參考資料
- 基本原理
- c#、java數據交互
- hbase filter 實體
- WebRequester
- hbaseClient
參考資料:
- hbase官網 (http://hbase.apache.org/book.html#hbase_apis)
- hbase 源碼 (github下載)
- Microsoft.HBase.Client 源碼 (github下載)
基本原理:
- HBase Rest 是建立在HBase java 客戶端基礎之上提供的web 服務,示意圖如下:
- 可以通過 start /stop 等命令來啟動或停止Hbase的 Rest server 服務,如下:
-
- 命令:hbase rest start (默認的方式啟動rest服務,端口是8080)
- 命令:hbase rest start 9000 (這種方式以端口9000方式啟動)
- 命令:hbase-daemon.sh start rest -p 9000
- 當服務啟動的時候,系統內嵌的jetty servlet container啟動並部署servlet.服務默認的監聽端口為8080,可通過修改hbase 配置文件來替換其它端口。
- 簡單概述需求:將下面列表中的訪問及傳遞參數用c#進行封裝
-
- http://192.168.2.21: 為HBase master 對應的IP地址
- 8080: 是HBase Rest Server對應的端口
- yourTable: 操作HBase 數據庫的表名
- schema/regions/scanner: 約定關鍵字
c#與java通過protobuf數據交互:
- Hbase 為java與其它開發語言通過protobuf進行數據交互制定一個特定的數據結構(見hbase官網REST Protobufs Schema 的結構描述),網上有一堆的工具可根據據protobufs schemal 文件生成java、c#源碼。意思是雙方都遵守這個數據文件格式,來實現誇平台的數據交互與共享。這個就是做了一個平台無關的文件與平台和語言相關的數據對象之間的適配轉化工作,如很多xml解析器一樣的原理。
- 協議文件是.proto為后綴的文件,格式如下代碼示例
package org.apache.hadoop.hbase.rest.protobuf.generated; message TableInfo { required string name = 1; message Region { required string name = 1; optional bytes startKey = 2; optional bytes endKey = 3; optional int64 id = 4; optional string location = 5; } repeated Region regions = 2; }
- package:在Java里面代表這個文件所在的包名,在c#里面代表該文件的命名空間
- message:代表一個類;
- required: 代表該字段必填;
- optional: 代表該字段可選,並可以為其設置默認值
- 從github上下載window版的轉換工具,將解壓后包中的ProtoGen.exe.config,protoc.exe,ProtoGen.exe及Google.ProtocolBuffers.dll文件放到某個新建的文件夾( 如:c:\zhu)
- 將hbase 規定的協議文件同時copy至該目錄 (hbase源碼包中 \hbase\hbase-rest\src\main\resources\org\apache\hadoop\hbase\rest\protobuf 下的文件)
- 以TableInfoMessage.proto 為例進行說明, windows系統下打開命令提示符,切換至 c:\zhu 文件夾下
- 執行:protoc --descriptor_set_out=TableInfoMessage.protobin --include_imports TableInfoMessage.proto
- 上述命令之后,c:\zhu 文件夾內生成了一個TableInfoMessage.protobin文件
- 執行:protogen AddressBook.protobin (目錄下會生成名為TableInfoMessage.cs文件,這就是生成的c#源碼)
- 當然你可以寫一個批處理命令來執行,完成后生成的9個文件引入到你的Visual studio 工程即可使用。
hbase filter 實體:
- 在hbase讀取數據時設置的過濾參數,參照 (hbase\hbase-client\src\main\java\org\apache\hadoop\hbase\filter)源碼,用c#翻譯一次
- 完成后如下圖
WebRequester:
- 封裝http請求 WebRequester 類
public class WebRequester { private string url = string.Empty; /// <summary> /// /// </summary> /// <param name="urlString"></param> public WebRequester(string urlString) { this.url = urlString; } /// <summary> /// Issues the web request. /// </summary> /// <param name="endpoint">The endpoint.</param> /// <param name="method">The method.</param> /// <param name="input">The input.</param> /// <param name="options">request options</param> /// <returns></returns> public HttpWebResponse IssueWebRequest(string endpoint, string method, Stream input, RequestOptions options) { return IssueWebRequestAsync(endpoint, method, input,options).Result; } /// <summary> /// Issues the web request asynchronous. /// </summary> /// <param name="endpoint">The endpoint.</param> /// <param name="method">The method.</param> /// <param name="input">The input.</param> /// <param name="options">request options</param> /// <returns></returns> public async Task<HttpWebResponse> IssueWebRequestAsync(string endpoint, string method, Stream input, RequestOptions options) { string uri = string.Format("{0}/{1}", this.url, endpoint); HttpWebRequest httpWebRequest = HttpWebRequest.CreateHttp(uri); httpWebRequest.Timeout = options.TimeoutMillis; httpWebRequest.PreAuthenticate = true; httpWebRequest.Method = method; httpWebRequest.ContentType = options.ContentType; if (options.AdditionalHeaders != null) { foreach (var kv in options.AdditionalHeaders) { httpWebRequest.Headers.Add(kv.Key, kv.Value); } } if (input != null) { using (Stream req = await httpWebRequest.GetRequestStreamAsync()) { await input.CopyToAsync(req); } } return (await httpWebRequest.GetResponseAsync()) as HttpWebResponse; } }
- http 操作實體類
public class RequestOptions { public string AlternativeEndpoint { get; set; } public bool KeepAlive { get; set; } public int TimeoutMillis { get; set; } public int SerializationBufferSize { get; set; } public int ReceiveBufferSize { get; set; } public bool UseNagle { get; set; } public int Port { get; set; } public Dictionary<string, string> AdditionalHeaders { get; set; } public string AlternativeHost { get; set; } public string ContentType { get; set; } public static RequestOptions GetDefaultOptions() { return new RequestOptions() { KeepAlive = true, TimeoutMillis = 30000, ReceiveBufferSize = 1024 * 1024 * 1, SerializationBufferSize = 1024 * 1024 * 1, UseNagle = false, //AlternativeEndpoint = Constants.RestEndpointBase, //Port = 443, AlternativeEndpoint = string.Empty, Port = 8080, AlternativeHost = null, ContentType = "application/x-protobuf" }; } }
hbaseClient:
- 定義hbase 常用操作接口IHbaseClient(包含基於表的操作以及數據的讀寫),示例如下
public interface IHBaseClient { /// <summary> /// /// </summary> /// <param name="options"></param> /// <returns></returns> Task<org.apache.hadoop.hbase.rest.protobuf.generated.Version> GetVersionAsync(RequestOptions options = null); /// <summary> /// /// </summary> /// <param name="schema"></param> /// <param name="options"></param> /// <returns></returns> Task<bool> CreateTableAsync(TableSchema schema, RequestOptions options = null); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="options"></param> /// <returns></returns> Task DeleteTableAsync(string table, RequestOptions options = null); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="options"></param> /// <returns></returns> Task<TableInfo> GetTableInfoAsync(string table, RequestOptions options = null); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="options"></param> /// <returns></returns> Task<TableSchema> GetTableSchemaAsync(string table, RequestOptions options = null); /// <summary> /// /// </summary> /// <param name="options"></param> /// <returns></returns> Task<TableList> ListTablesAsync(RequestOptions options = null); /// <summary> /// /// </summary> /// <param name="tableName"></param> /// <param name="scannerSettings"></param> /// <param name="options"></param> /// <returns></returns> Task<ScannerInformation> CreateScannerAsync(string tableName, Scanner scannerSettings, RequestOptions options); /// <summary> /// /// </summary> /// <param name="scannerInfo"></param> /// <param name="options"></param> /// <returns></returns> Task<CellSet> ScannerGetNextAsync(ScannerInformation scannerInfo, RequestOptions options); /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="cells"></param> /// <param name="options"></param> /// <returns></returns> Task<bool> StoreCellsAsync(string table, CellSet cells, RequestOptions options = null); }
- 實現接口類 HBaseClient
public class HBaseClient : IHBaseClient { private WebRequester _requester; private readonly RequestOptions _globalRequestOptions; /// <summary> /// /// </summary> /// <param name="endPoints"></param> /// <param name="globalRequestOptions"></param> public HBaseClient(string url, RequestOptions globalRequestOptions = null) { _globalRequestOptions = globalRequestOptions ?? RequestOptions.GetDefaultOptions(); _requester = new WebRequester(url); } /// <summary> /// /// </summary> /// <param name="options"></param> /// <returns></returns> public async Task<org.apache.hadoop.hbase.rest.protobuf.generated.Version> GetVersionAsync(RequestOptions options = null) { var optionToUse = options ?? _globalRequestOptions; return await GetRequestAndDeserializeAsync<org.apache.hadoop.hbase.rest.protobuf.generated.Version>(EndPointType.Version, optionToUse); } /// <summary> /// /// </summary> /// <param name="schema"></param> /// <param name="options"></param> /// <returns></returns> public async Task<bool> CreateTableAsync(TableSchema schema, RequestOptions options = null) { if (string.IsNullOrEmpty(schema.name)) throw new ArgumentException("schema.name was either null or empty!", "schema"); var optionToUse = options ?? _globalRequestOptions; string endpoint = string.Format("{0}/{1}", schema.name, EndPointType.Schema); using (HttpWebResponse webResponse = await PutRequestAsync(endpoint,schema, optionToUse)) { if (webResponse.StatusCode == HttpStatusCode.Created) { return true; } // table already exits if (webResponse.StatusCode == HttpStatusCode.OK) { return false; } // throw the exception otherwise using (var output = new StreamReader(webResponse.GetResponseStream())) { string message = output.ReadToEnd(); throw new WebException( string.Format("Couldn't create table {0}! Response code was: {1}, expected either 200 or 201! Response body was: {2}", schema.name,webResponse.StatusCode,message)); } } } /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="options"></param> /// <returns></returns> public async Task DeleteTableAsync(string table, RequestOptions options = null) { var optionToUse = options ?? _globalRequestOptions; string endPoint = string.Format("{0}/{1}", table, EndPointType.Schema); using (HttpWebResponse webResponse = await ExecuteMethodAsync<HttpWebResponse>(WebMethod.Delete, endPoint, null, optionToUse)) { if (webResponse.StatusCode != HttpStatusCode.OK) { using (var output = new StreamReader(webResponse.GetResponseStream())) { string message = output.ReadToEnd(); throw new WebException( string.Format("Couldn't delete table {0}! Response code was: {1}, expected 200! Response body was: {2}", table, webResponse.StatusCode, message)); } } } } /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="options"></param> /// <returns></returns> public async Task<TableInfo> GetTableInfoAsync(string table, RequestOptions options = null) { var optionToUse = options ?? _globalRequestOptions; string endPoint = string.Format("{0}/{1}", table, EndPointType.Regions); return await GetRequestAndDeserializeAsync<TableInfo>(endPoint, optionToUse); } /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="options"></param> /// <returns></returns> public async Task<TableSchema> GetTableSchemaAsync(string table, RequestOptions options = null) { var optionToUse = options ?? _globalRequestOptions; string endPoint = string.Format("{0}/{1}", table, EndPointType.Schema); return await GetRequestAndDeserializeAsync<TableSchema>(endPoint, optionToUse); } /// <summary> /// /// </summary> /// <param name="options"></param> /// <returns></returns> public async Task<TableList> ListTablesAsync(RequestOptions options = null) { var optionToUse = options ?? _globalRequestOptions; return await GetRequestAndDeserializeAsync<TableList>("", optionToUse); } /// <summary> /// /// </summary> /// <param name="tableName"></param> /// <param name="scannerSettings"></param> /// <param name="options"></param> /// <returns></returns> public async Task<ScannerInformation> CreateScannerAsync(string tableName, Scanner scannerSettings, RequestOptions options) { string endPoint = string.Format("{0}/{1}", tableName, EndPointType.Scanner); using (HttpWebResponse response = await ExecuteMethodAsync(WebMethod.Post, endPoint, scannerSettings, options)) { if (response.StatusCode != HttpStatusCode.Created) { using (var output = new StreamReader(response.GetResponseStream())) { string message = output.ReadToEnd(); throw new WebException( string.Format( "Couldn't create a scanner for table {0}! Response code was: {1}, expected 201! Response body was: {2}", tableName, response.StatusCode, message)); } } string location = response.Headers.Get("Location"); if (location == null) { throw new ArgumentException("Couldn't find header 'Location' in the response!"); } return new ScannerInformation(new Uri(location), tableName, response.Headers); } } /// <summary> /// /// </summary> /// <param name="scannerInfo"></param> /// <param name="options"></param> /// <returns></returns> public async Task<CellSet> ScannerGetNextAsync(ScannerInformation scannerInfo, RequestOptions options) { string endPoint = string.Format("{0}/{1}/{2}", scannerInfo.TableName, EndPointType.Scanner, scannerInfo.ScannerId); using (HttpWebResponse webResponse = await GetRequestAsync(endPoint, options)) { if (webResponse.StatusCode == HttpStatusCode.OK) { return Serializer.Deserialize<CellSet>(webResponse.GetResponseStream()); } return null; } } /// <summary> /// /// </summary> /// <param name="table"></param> /// <param name="cells"></param> /// <param name="options"></param> /// <returns></returns> public async Task<bool> StoreCellsAsync(string table, CellSet cells, RequestOptions options = null) { var optionToUse = options ?? _globalRequestOptions; string path = table + "/somefalsekey"; using (HttpWebResponse webResponse = await PutRequestAsync(path, cells, options)) { if (webResponse.StatusCode == HttpStatusCode.NotModified) { return false; } if (webResponse.StatusCode != HttpStatusCode.OK) { using (var output = new StreamReader(webResponse.GetResponseStream())) { string message = output.ReadToEnd(); throw new WebException( string.Format("Couldn't insert into table {0}! Response code was: {1}, expected 200! Response body was: {2}", table, webResponse.StatusCode, message)); } } } return true; } /// <summary> /// /// </summary> /// <typeparam name="T"></typeparam> /// <param name="endpoint"></param> /// <param name="options"></param> /// <returns></returns> private async Task<T> GetRequestAndDeserializeAsync<T>(string endpoint, RequestOptions options) { using (WebResponse response = await _requester.IssueWebRequestAsync(endpoint, WebMethod.Get, null, options)) { using (Stream responseStream = response.GetResponseStream()) { return Serializer.Deserialize<T>(responseStream); } } } /// <summary> /// /// </summary> /// <typeparam name="TReq"></typeparam> /// <param name="endpoint"></param> /// <param name="query"></param> /// <param name="request"></param> /// <param name="options"></param> /// <returns></returns> private async Task<HttpWebResponse> PutRequestAsync<TReq>(string endpoint, TReq request, RequestOptions options) where TReq : class { return await ExecuteMethodAsync(WebMethod.Post, endpoint, request, options); } /// <summary> /// /// </summary> /// <typeparam name="TReq"></typeparam> /// <param name="method"></param> /// <param name="endpoint"></param> /// <param name="request"></param> /// <param name="options"></param> /// <returns></returns> private async Task<HttpWebResponse> ExecuteMethodAsync<TReq>(string method,string endpoint,TReq request,RequestOptions options) where TReq : class { using (var input = new MemoryStream(options.SerializationBufferSize)) { if (request != null) { Serializer.Serialize(input, request); } input.Seek(0, SeekOrigin.Begin); return await _requester.IssueWebRequestAsync(endpoint,method, input, options); } } /// <summary> /// /// </summary> /// <param name="endpoint"></param> /// <param name="query"></param> /// <param name="options"></param> /// <returns></returns> private async Task<HttpWebResponse> GetRequestAsync(string endpoint, RequestOptions options) { return await _requester.IssueWebRequestAsync(endpoint, WebMethod.Get, null, options); } }
- 按步驟完成上面的代碼,編譯通過即OK,下一篇進入sdk的測試驗證之旅