協處理器(coprocessor)
把一部分計算移動到數據的存放端。
簡介
協處理器允許用戶在region服務器上運行自己的代碼,允許用戶執行region級別的操作,並且可以使用與RDBMS中觸發器(trigger)類似的功能。在客戶端,用戶不用關心操作具體在哪里執行,HBase的分布式框架會幫助用戶把這些工作變得透明。
協處理器框架提供了一些類,用戶可以通過繼承這些類來擴展自己的功能。主要分為以下兩大類
observer
這一類協處理器與觸發器(trigger)類似:回調函數(也被稱作鈎子函數,hook)在一些特定事件發生時被執行。這些事件包括一些用戶產生的事件,也包括服務器端內部自動產生的事件。
協處理器框架提供的接口如下
RegionObserver:用戶可以用這種的處理器處理數據修改事件,它們與表的region聯系緊密。
MasterObserver:可以被用作管理或DDL類型的操作,這些是集群級事件。
WALObserver:提供控制WAL的鈎子函數
Observer提供了一些設計好的回調函數,每個操作在集群服務器端都可以被調用。
endpoint
除了事件處理之外還需要將用戶自定義操作添加到服務器端。用戶代碼可以被部署到管理數據的服務器端,例如,做一些服務器端計算的工作。
Endpoint通過添加一下遠程過程調用來動態擴展RPC協議。可以把它們理解為與RDBMS中類似的存儲過程。Endpoint可以與observer的實現組合起來直接作用於服務器端的狀態。
Coprocessor類
所有協處理器的類都必須實現這個接口。它定義了協處理器的基本約定,並使得框架本身的管理變得容易。
Coprocessor.Priority枚舉類定義的優先級
SYSTEM:高優先級,定義最先被執行的協處理器
USER:定義其他的協處理器,按順序執行
協處理器的優先級決定了執行的順序:系統(SYSTEM)級協處理器在用戶(USER)級協處理器之前執行。
在同一個優先級中還有一個序號(sequence number)的概念,用來維護協處理器的加載順序。序號從0開始依次增加。
這個數字的作用並不大,但用戶可以依靠它們來為同一優先級的協處理器排序:在同一優先級下,它們按照其序號遞增的順序執行,即定義了執行順序。
在協處理器生命周期中,它們由框架管理。Coprocessor接口提供了以下兩個方法
void start(CoprocessorEnvironment env) throws IOException; void stop(CoprocessorEnvironment env) throws IOException;
這兩個方法在協處理器開始和結束時被調用。CoprocessorEnvironment用來在協處理器的生命周期中保持其狀態。協處理器實例一直被保存在提供的環境中。
CoprocessorEnvironment類提供的方法
int getVersion(); |
返回Coprocessor版本 |
String getHBaseVersion(); |
以字符串的格式返回HBase版本 |
Coprocessor getInstance(); |
返回加載的協處理器實例 |
int getPriority(); |
返回協處理器的優先級 |
int getLoadSequence(); |
協處理器的序號,當協處理器加載時被設置,這反映了它的執行順序 |
協處理器應當只與提供給它們的環境進行交互。這樣的好處是可以保證沒有會被惡意代碼用來破壞數據的后門。
注意:協處理器應當使用getTable方法訪問表數據。注意這個方法實際上在默認的HTable類上添加了特定的安全措施。例如,協處理器不可以對一行數據加鎖。
在協處理器實例的生命周期中,Coprocessor接口的start和stop方法會被框架隱式調用,處理過程中的每一步都有一個狀態。
Coprocessor.State枚舉類定義的狀態
UNINSTALLED |
協處理器最初的狀態,沒有環境,也沒有被初始化 |
INSTALLED |
實例裝載了它的環境參數 |
STARTING |
協處理器將要開始工作,也就是說start方法將要被調用 |
ACTIVE |
一旦start方法被調用,當前狀態設置為active |
STOPPING |
Stop方法被調用之前的狀態 |
STOPPED |
一旦stop方法將控制權 交給框架,協處理器會被設置為狀態stopped |
CoprocessorHost類,它維護所有協處理器實例和它們專用的環境。它有一些子類,這些子類應用在不同的使用環境,例如,master和region服務器等環境
Coprocessor、CoprocessorEnvironment和CoprocessorHost這3個類形成了協處理器類的基礎,基於這三個類能夠實現更高級的功能。它們支持協處理器的生命周期,管理協處理器的狀態,同時提供了執行時的環境參數,以保證協處理器正確執行。此外,這些類也提供了一個抽象層方便用戶更簡單的構建自己的實現。
客戶端的調用與一系列的協處理器進行交互
協處理器加載
從配置文件中加載
需要將協處理器jar包放入HBASE_CLASSPATH指定的路徑下
在hbase-site.xml中配置,會對所有的表都加上
<property> <name>hbase.coprocessor.region.classes</name> <value>class1,class2</value> </property> <property> <name>hbase.coprocessor.master.classes</name> <value>class1,class2</value> </property> <property> <name>hbase.coprocessor.wal.classes</name> <value>class1,class2</value> </property>
配置完成后重啟HBase.
配置文件中配置項的順序決定了執行順序。所有協處理器都是以系統級優先級進行加載的。
配置文件首先在HBase啟動時被檢查。雖然還可以在其他地方增加系統級優先級的協處理器,但是在配置文件中配置的協處理器是被最先執行的。
從表描述符中加載
針對特定表,所以加載的協處理器只針對這個表的region,同時也只被這些region的region服務器使用。換句話說,用戶只能在與region相關的協處理器上使用這種方法,而不能在master或WAL相關的協處理器上使用。
由於它們是使用表的上下文加載的,所以與配置文件中加載的協處理器影響所有表相比,這種方法加載的協處理器更具有針對性。用戶需要在表描述符中利用HTableDescription.setValue方法定義它們。鍵必須以COPROCESSOR開頭,值必須符合以下格式
<path-to-jar>|<classname>|<priority>
如下,一個使用系統優先級,一個使用用戶優先級
alter 'socialSecurityTest',METHOD=>'table_att','coprocessor'=>'hdfs://cluster1/user/solr/hbase/observer/HBaseCoprocessor.jar|com.hbase.coprocessor.HbaseDataSyncSolrObserver|SYSTEM' alter 'socialSecurityTest',METHOD=>'table_att','coprocessor'=>' /user/solr/hbase/observer/HBaseCoprocessor.jar|com.hbase.coprocessor.HbaseDataSyncSolrObserver|USER
path-to-jar可以是一個完整的HDFS地址或其他Hadoop FileSystem類支持的地址。第二個協處理器使用了本地路徑。
Classname定義了具體的實現類。由於JAR可能包含許多協處理器類,但只能給一張表設定一個協處理器。用戶應該使用標准的java包命名規則來命名指定類。
Priority只能是SYSTEM或USER
注意:不要在協處理器定義中添加空格。解釋十分嚴格,添加頭尾或間隔字符會使整個配置條目無效。
使用${number}后綴可以改變定義中的順序,即協處理器的加載順序。雖然只有COPROCESSOR的前綴會被檢查,但是還是推薦使用數字后綴定義順序。
package com.hbase.coprocessor; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.log4j.Logger; /** * @author:FengZhen * @create:2018年8月30日 * 協處理器 */ public class CoprocessorCreateTest { private static String addr="HDP233,HDP232,HDP231"; private static String port="2181"; Logger logger = Logger.getLogger(getClass()); private static Connection connection; public static void getConnection(){ Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum",addr); conf.set("hbase.zookeeper.property.clientPort", port); try { connection = ConnectionFactory.createConnection(conf); } catch (IOException e) { e.printStackTrace(); } } /* * 關閉連接 * */ public static void close() { /** * close connection **/ if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { getConnection(); try { HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("test_hbase")); Path path = new Path("/user/hdfs/coprocessor/test.jar"); hTableDescriptor.setValue("COPROCESSOR", path.toString() + "|" + "class" + "|" + Coprocessor.PRIORITY_SYSTEM); Admin admin = connection.getAdmin(); admin.createTable(hTableDescriptor); } catch (TableNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } close(); } }
一旦表被啟用,且region被打開,框架會首先加載配置文件中定義的協處理器,然后再加載表描述符中的協處理器。
RegionObserver類
屬於observer協處理器:當一個特定的region級別的操作發生時,它們的鈎子函數會被觸發。
這些操作可以被分為兩類:region生命周期變化和客戶端API調用。
1.處理region生命周期事件
這些observer可以與pending open、open和pending close狀態通過鈎子鏈接。每一個鈎子都被框架隱式地調用。
狀態:pending open。Region將要被打開時會處於這個狀態。監聽的協處理器可以搭載這個過程或阻止這個過程。
void preOpen(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException; void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c);
這些方法會在region被打開前或剛剛打開后被調用。用戶可以在自己的協處理器實現中使用這兩個方法,例如,使用preOpen方法告知框架這次打開操作應當被放棄,或勾住postOpen方法來觸發一次緩存預熱或其它一些操作。
Region經過pending open,且在打開狀態之前,region服務器可能需要從WAL(Write-Ahead-logging,預寫系統日志)中應用一些記錄到region中,這時會觸發以下方法。
void preWALRestore(final ObserverContext<? extends RegionCoprocessorEnvironment> ctx, HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException; void postWALRestore(final ObserverContext<? extends RegionCoprocessorEnvironment> ctx, HRegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException;
這些方法讓用戶可以細粒度地控制在WAL重做時那些修改需要被實施。用戶可以訪問修改記錄,因此用戶就可以監督那些記錄被實施了。
狀態:open。當一個region被部署到一個region服務器中,並可以正常工作時,這個region會被認為處於open狀態。
可用的鈎子函數如下
InternalScanner preFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final InternalScanner scanner) throws IOException; void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final StoreFile resultFile) throws IOException; InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, final InternalScanner scanner, final ScanType scanType, CompactionRequest request) throws IOException; void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store, StoreFile resultFile, CompactionRequest request) throws IOException; void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException; void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException;
pre方法在事件執行前被調用,post方法在事件執行后被調用。例如:使用preSplit鈎子函數可以有效地禁止region拆分,然后手動執行這些操作。
狀態:pending close。最后一組region監聽器的鈎子函數可以監聽pending close狀態。這個狀態在region狀態從open到closed轉變時發生。在region被關閉之前和之后,一下鈎子函數將被執行。
void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) throws IOException; void postClose(final ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested);
abortRequested參數包含了region被關閉的原因。通常情況下,region會在正常操作中被關閉,例如,region由於負載均衡被移動到其它region服務器時被關閉。也有可能是由於region服務器被撤銷,且需避免一些副作用。當這些情況發生時,所有它管理的region都會被撤銷,同時用戶從這個參數中可以看到是否符合這種情況。
2.處理客戶端API事件
與生命周期事件相比,所有的客戶端API調用都顯示地從客戶端應用中傳輸到region服務器。用戶可以在這些調用執行前或剛剛執行后攔截它們。
void preGet() void postGet() |
在客戶端Table.get請求之前和之后調用 |
void prePut() void postPut() |
在客戶端Table.put請求之前和之后調用 |
void preDelete() void postDelete () |
在客戶端Table.delete請求之前和之后調用 |
boolean preCheckAndPut() boolean postCheckAndPut () |
在客戶端Table. checkAndPut請求之前和之后調用 |
boolean preCheckAndDelete() boolean postCheckAndDelete () |
在客戶端Table. checkAndDelete請求之前和之后調用 |
void preGetClosestRowBefore() void postGetClosestRowBefore () |
在客戶端Table.getClosestRowBefore請求之前和之后調用 |
boolean preExists() boolean postExists () |
在客戶端Table.exists請求之前和之后調用 |
long preIncrementColumnValue() long postIncrementColumnValue () |
在客戶端Table.incrementColumnValue請求之前和之后調用 |
void preIncrement() void postIncrement () |
在客戶端Table. increment請求之前和之后調用 |
InternalScanner preScannerOpen() InternalScanner postScannerOpen () |
在客戶端Table.getScannerOpen請求之前和之后調用 |
boolean preScannerNext() void postScannerNext () |
在客戶端ResultScanner.next請求之前和之后調用 |
void preScannerClose() void postScannerClose () |
在客戶端ResultScanner.close請求之前和之后調用 |
3.RegionCoprocessorEnvironment
實現RegionObserver類的協處理器環境的實例是基於RegionCoprocessorEnvironment類的,RegionCoprocessorEnvironment實現了CoprocessorEnvironment接口。
RegionCoprocessorEnvironment類提供的方法及子類方法
Region getRegion(); |
返回監聽器監聽的region的應用 |
RegionServerServices getRegionServerServices(); |
返回共享的RegionServerServices實例 |
getRegion方法可以用於得到目前正在管理的HRegion實例,同時可以執行這個類提供的方法。
RegionServerServices類提供的方法
boolean isStopping(); |
當region服務正在停止服務時,返回true |
WAL getWAL(HRegionInfo regionInfo) throws IOException; |
提供訪問WAL實例的功能 |
CompactionRequestor getCompactionRequester(); |
提供訪問共享的CompactionRequestor實例的功能,可以在協處理器內部發起合並 |
FlushRequester getFlushRequester(); |
提供訪問共享的FlushRequester,可以用於發起memstore刷寫 |
RegionServerAccounting getRegionServerAccounting(); |
提供訪問共享RegionServerAccounting實例的功能。用戶可以利用它得到當前服務進程資源的占用狀態,例如當前memstore的大小 |
void postOpenDeployTasks(final PostOpenDeployContext context) throws KeeperException, IOException; |
內部調用,在region服務器內部使用 |
RpcServerInterface getRpcServer(); |
提供訪問共享RpcServerInterface實例的功能,包含當前服務端RPC統計信息 |
4.ObserverContext類
RegionObserver類提供的所有回調函數都需要一個特殊的上下文作為共同的參數: ObserverContext類,它不僅提供了訪問當前系統環境的入口,同時也添加了一些關鍵功能用以通知協處理器框架在回調函數完成時需要做什么。
所有的協處理器在執行時共用一個上下文實例,並會隨着環境一起變化。
ObserverContext類提供的方法
E getEnvironment() |
返回當前協處理器環境的應用 |
void bypass() |
當用戶代碼調用此方法時,框架將使用用戶提供的值,而不使用框架通常使用的值 |
void complete() |
通知框架后續的處理可以被跳過,剩下沒有被執行的協處理器也會被跳過。這意味着當前協處理器的響應是最后的一個協處理器 |
boolean shouldBypass() |
框架內部用來檢查標志位 |
boolean shouldComplete() |
框架內部用來檢查標志位 |
void prepare(E env) |
使用特定的環境准備上下文。這個方法只供內部使用。它被靜態方法createAndPrepare使用 |
static <T extends CoprocessorEnvironment> ObserverContext<T> createAndPrepare( |
初始化上下文的靜態方法。當提供的context參數是null時,它會創建一個新實例 |
兩個重要的方法是bypass和complete。它們會為用戶的協處理器實現提供了選擇,以控制框架后續行為。Complete的調用會影響后面執行的協處理器,同時bypass方法可以停止當前服務進程的處理過程。例如,可以使用e.bypass停止region的自動拆分。
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException { e.bypass(); }
5.BaseRegionObserver類
這個類可以作為所有用戶實現監聽類型協處理器的基類。它實現了所有RegionObserver接口的空方法,所以在默認情況下繼承這個類的協處理器沒有任何功能。
例子:處理特殊行鍵
示例,協處理器代碼去下
package com.hbase.coprocessor; import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; /** * @author:FengZhen * @create:2018年8月31日 */ public class RegionObserverExample extends BaseRegionObserver{ public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@"); @Override public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException { //檢查請求的行鍵是否匹配 if (Bytes.equals(get.getRow(), FIXED_ROW)) { //創建一個特殊的keyvalue,只包含當前的服務器時間。 KeyValue keyValue = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW, Bytes.toBytes(System.currentTimeMillis())); results.add(keyValue); } } }
1.新建HBase表
create 'test_coprocessor', 'info'
2.協處理器打包上傳至HDFS
上傳至HDFS
/data/fz/hbase/coprocessor/RegionObserverExample.jar
3.裝載協處理器
hbase(main):004:0> disable 'test_coprocessor' hbase(main):006:0> alter 'test_coprocessor',METHOD=>'table_att','COPROCESSOR'=>'hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverExample.jar|com.hbase.coprocessor.RegionObserverExample|1001' hbase(main):007:0> enable 'test_coprocessor'
4.查看掛載情況
hbase(main):009:0> desc 'test_coprocessor' Table test_coprocessor is ENABLED test_coprocessor, {TABLE_ATTRIBUTES => {METADATA => {'COPROCESSOR$1' => 'hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverExample.jar|com.hbase.coprocess or.RegionObserverExample|1001'}} COLUMN FAMILIES DESCRIPTION {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
5.協處理器測試
行鍵@@@GETTIME@@@被observer的preGet捕獲,然后添加當前服務器時間。
如下
hbase(main):054:0> get 'test_coprocessor','@@@GETTIME@@@' COLUMN CELL @@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, value=\x00\x00\x01e\x8E\xC6ad 1 row(s) in 0.0450 seconds
插入一條行鍵為@@@GETTIME@@@的數據
hbase(main):055:0> put 'test_coprocessor','@@@GETTIME@@@','info:name','nimei' 0 row(s) in 0.0540 seconds
此時再次使用get
hbase(main):057:0> get 'test_coprocessor','@@@GETTIME@@@' COLUMN CELL @@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, value=\x00\x00\x01e\x8E\xC7\xD3\x9B info:name timestamp=1535698764962, value=nimei 2 row(s) in 0.0130 seconds
如果捕獲的該行鍵恰巧在表中同時存在,就會出現上面的情況,為了避免這種情況,可以使用e.bypass。更新步驟如下
6.卸載已有協處理器
alter 'test_coprocessor',METHOD => 'table_att_unset',NAME =>'COPROCESSOR$1'
7.重新打包RegionObserverWithBypassExample上傳至HDFS
代碼如下
package com.hbase.coprocessor; import java.io.IOException; import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.util.Bytes; /** * @author:FengZhen * @create:2018年8月31日 */ public class RegionObserverWithBypassExample extends BaseRegionObserver{ public static final byte[] FIXED_ROW = Bytes.toBytes("@@@GETTIME@@@"); @Override public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException { //檢查請求的行鍵是否匹配 if (Bytes.equals(get.getRow(), FIXED_ROW)) { //創建一個特殊的keyvalue,只包含當前的服務器時間。 KeyValue keyValue = new KeyValue(get.getRow(), FIXED_ROW, FIXED_ROW, Bytes.toBytes(System.currentTimeMillis())); results.add(keyValue); //一旦特殊的keyvalue被添加,之后的操作都會被跳過 e.bypass(); } } }
8.掛載新的協處理器
disable 'test_coprocessor' hbase(main):034:0> alter 'test_coprocessor',METHOD=>'table_att','COPROCESSOR'=>'hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverWithBypassExample.jar|com.hbase.coprocessor.RegionObserverWithBypassExample|1001' enable 'test_coprocessor'
9.查看掛載情況
hbase(main):036:0> desc 'test_coprocessor' Table test_coprocessor is ENABLED test_coprocessor, {TABLE_ATTRIBUTES => {METADATA => {'COPROCESSOR$1' => 'hdfs://cluster1/data/fz/hbase/coprocessor/RegionObserverWithBypassExample.jar|com.hbase.c oprocessor.RegionObserverWithBypassExample|1001'}} COLUMN FAMILIES DESCRIPTION {NAME => 'info', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSION => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'} 1 row(s) in 0.0160 seconds
10.測試
hbase(main):037:0> get 'test_coprocessor','@@@GETTIME@@@' COLUMN CELL @@@GETTIME@@@:@@@GETTIME@@@ timestamp=9223372036854775807, value=\x00\x00\x01e\x8E\xDA\xDB\xC5 1 row(s) in 0.0210 seconds
成功。
由於默認的get操作被跳過,只有人工添加的一列被返回,並且是返回的唯一一列數據。可以注意到返回列的時間戳是9223372036854775807,這個只是Long.MAX_VALUE預計得到的值。因為示例代碼創建keyvalue時並沒有指定時間戳,所以被默認設為HConstants.LATEST_TIMESTAMP,即Long.MAX_VALUE..
MasterObserver類
為了處理master服務器的所有回調函數。與關系型數據庫中DDL類似,它們可以被歸類到數據處理操作中。
1.MasterCoprocessorEnvironment類
MasterCoprocessorEnvironment封裝了一個MasterObserver實例,它同樣實現了CoprocessorEnvironment接口,因此它能提供getTable之類的方法幫助用戶在自己的實現中訪問數據。
MasterCoprocessorEnvironment類提供的非繼承方法
MasterServices getMasterServices();提供可訪問的共享MasterServices實例
MasterServices類提供的方法
AssignmentManager getAssignmentManager(); |
使用戶可以訪問AssignmentManager實例,它負責為所有的region分操作,例如分配、卸載和負載均衡等 |
MasterFileSystem getMasterFileSystem(); |
提供一個與master操作相關的文件系統抽象層,例如,創建表或日志文件的目錄 |
ServerManager getServerManager(); |
返回ServerManager實例。它可以訪問所有的服務器進程,無論進程處於存活、死亡或其它狀態 |
ExecutorService getExecutorService(); |
執行服務被master用來調度系統級事件 |
void checkTableModifiable(final TableName tableName) |
檢查表是否已經存在以及是否已經離線,如果是就可以修改它 |
2.BaseMasterObserver類
用戶可以直接實現MasterObserver接口,或擴展BaseMasterObserver類來實現自己的功能。BaseMasterObserver為接口的每個方法完成一個空的實現。不做任何改變直接使用這個類不會有任何反饋。
例子:創建新表時創建一個單獨的目錄
1.打成jar包MasterObserverExample.jar
2.把該jar包添加到hbase-env.sh的HBASE_CLASSPATH(/usr/hdp/2.6.1.0-129/hadoop/lib)中,region服務器在JRE中可以加載到這個類。Hbase-site.xml中添加以下配置
<property> <name>hbase.coprocessor.master.classer</name> <value>com.hbase.coprocessor.MasterObserverExample</value> </property>
3.重啟HBase
4.測試:
新建一張表
hbase(main):011:0> create 'test_master_observer','f1' 0 row(s) in 2.6390 seconds => Hbase::Table - test_master_observer
查看HDFS路徑下是否有其對應的文件夾
[root@HDP1-231 lib]# hadoop fs -ls /user/hdfs/hbase/processor/ SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/usr/hdp/2.6.1.0-129/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/hdp/2.6.1.0-129/hadoop-yarn/ProcessLog-0.0.1-SNAPSHOT-jar-with-dependencies.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] Found 1 items drwxr-xr-x - hbase hdfs 0 2018-09-03 09:52 /user/hdfs/hbase/processor/test_master_observer-blobs
成功。