關於HBase的sink的所有內容均在org.apache.flume.sink.hbase包下。
每個sink包括自己定制的,都extends AbstractSink implements Configurable。
一、首先是configure(Context context)方法。該方法是對HBaseSink的參數初始化。主要包括以下幾個:
tableName:要寫入的HBase數據表名,不能為空;
columnFamily:數據表對應的列簇名,這個sink目前只支持一個列簇,不能為空;
batchSize:每次事務可以處理的最大Event數量,默認是100;
eventSerializerType:用來將event寫入HBase,即將event轉化為put。默認是org.apache.flume.sink.hbase.SimpleHbaseEventSerializer,還有一個是RegexHbaseEventSerializer,即適合HBaseSink的Serializer只有這倆,否則自己定制;
serializerContext:是eventSerializerType的配置信息,就是配置文件中包含“serializer.”的項;
kerberosKeytab和kerberosPrincipal是用來做訪問控制的,默認都為空,即不設置。
並生成eventSerializerType對應的實例並加以配置,兩個Serializer各有不同的用途主要是一個只能寫一列,一個可以寫多列:
1 Class<? extends HbaseEventSerializer> clazz = 2 (Class<? extends HbaseEventSerializer>) 3 Class.forName(eventSerializerType); 4 serializer = clazz.newInstance(); 5 serializer.configure(serializerContext); //配置序列化組件,先配置。默認是SimpleHbaseEventSerializer
1、SimpleHbaseEventSerializer.configure(Context context):此Serializer只能將數據寫入一列
1 public void configure(Context context) { 2 rowPrefix = context.getString("rowPrefix", "default"); //獲取RowKey的前綴,固定的部分,默認前綴是default 3 incrementRow = 4 context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);//獲取計數器對應的行鍵 5 String suffix = context.getString("suffix", "uuid"); //rowkey的類型(可以指定的有四種uuid/random/timestamp/nano),默認是uuid 6 7 String payloadColumn = context.getString("payloadColumn"); //要寫入HBase的列名 8 String incColumn = context.getString("incrementColumn"); //計數器對應的列 9 if(payloadColumn != null && !payloadColumn.isEmpty()) { //根據suffix決定rowkey類型 10 if(suffix.equals("timestamp")){ 11 keyType = KeyType.TS; 12 } else if (suffix.equals("random")) { 13 keyType = KeyType.RANDOM; 14 } else if(suffix.equals("nano")){ 15 keyType = KeyType.TSNANO; 16 } else { 17 keyType = KeyType.UUID; 18 } 19 plCol = payloadColumn.getBytes(Charsets.UTF_8); //列名 20 } 21 if(incColumn != null && !incColumn.isEmpty()) { //存在計數器列 22 incCol = incColumn.getBytes(Charsets.UTF_8); 23 } 24 }
2、RegexHbaseEventSerializer.configure(Context context):此Serializer根據正則可以寫入多列
public void configure(Context context) { String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT); //獲取配置文件中的正則表達式,默認是“(.*)” regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, INGORE_CASE_DEFAULT); //是否忽略大小寫 inputPattern = Pattern.compile(regex, Pattern.DOTALL + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0)); //將給定的正則表達式編譯到具有給定標志的模式中 String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT); //獲取配置文件中的列名s String[] columnNames = colNameStr.split(","); //分割列名獲得列名數組 for (String s: columnNames) { colNames.add(s.getBytes(Charsets.UTF_8)); } }
二、start()方法。該方法首先會構造一個HTable對象,並table.setAutoFlush(false)來激活緩沖區(默認大小時2MB),隨后的是一些檢查。
三、然后是process()方法用來從channel中take數據,serializer之后寫入HBase。
1 public Status process() throws EventDeliveryException { 2 Status status = Status.READY; 3 Channel channel = getChannel(); 4 Transaction txn = channel.getTransaction(); 5 List<Row> actions = new LinkedList<Row>(); 6 List<Increment> incs = new LinkedList<Increment>(); 7 txn.begin(); 8 for(long i = 0; i < batchSize; i++) { 9 Event event = channel.take(); 10 if(event == null){ 11 status = Status.BACKOFF; 12 counterGroup.incrementAndGet("channel.underflow"); 13 break; 14 } else { 15 serializer.initialize(event, columnFamily); 16 actions.addAll(serializer.getActions()); 17 incs.addAll(serializer.getIncrements()); 18 } 19 } 20 putEventsAndCommit(actions, incs, txn); 21 return status; 22 }
1、actions和incs是要寫入HBase的數據,actions對應的是數據;incs對應的是計數器。
2、serializer.initialize(event, columnFamily),兩個Serializer的initialize目的一樣:
1 public void initialize(Event event, byte[] columnFamily) { 2 this.payload = event.getBody(); //獲取要處理的數據 3 this.cf = columnFamily; //獲取要寫入的列簇 4 }
3、serializer.getActions()
SimpleHbaseEventSerializer.getActions()方法會根據configure(Context context)中設置的RowKey類型先獲取rowkey,可以是毫秒時間戳、隨機數、納秒時間戳以及UUID128位數四種類型。然后構造一個Put對象,將(列簇,列名,數據)添加進這個Put,返回List<Row> actions。
RegexHbaseEventSerializer.getActions()方法,首先會做一些判斷匹配成功否?匹配出的個數和指定的列數相同否?,然后是獲取rowkey,這里的rowkey是[time in millis]-[random key]-[nonce]三部分組成的字符串。剩下的是依次匹配列組成Put,返回List<Row> actions。
4、serializer.getIncrements()
SimpleHbaseEventSerializer.getIncrements()如果配置文件中配置了incrementColumn,就添加相應的計數器,否則返回一個沒有數據的List<Increment>。
RegexHbaseEventSerializer.getIncrements()直接返回一個沒有數據的List<Increment>,即不設置計數器。
5、putEventsAndCommit(actions, incs, txn)方法。首先會table.batch(actions)提交List<Put>;然后是計數器table.increment(i);txn.commit()提交事務;如有異常txn.rollback()回滾;txn.close()事務關閉。
四、stop()方法。table.close();table = null;
有兩個問題撒:
1、我們在開發HBase程序的時候總是要指定“hbase.zookeeper.quorum”對應的zookeeper地址的,但是看完HBaseSink也沒發現設置的地方,是不是在HBase集群中的任意節點都不需要設置,除非在集群外節點才設置?
2、還有在使用時發現放在安裝有zookeeper的節點上運行flume報錯,刪除zookeeper后運行正常,沒安裝zookeeper的節點上運行正常,這是為什么??
希望知道的可以解答哈。。。HBaseSink也比較簡單。。。后續還有更多源碼解讀!敬請期待!!