Flume-NG源碼閱讀之HBaseSink


  關於HBase的sink的所有內容均在org.apache.flume.sink.hbase包下。

  每個sink包括自己定制的,都extends AbstractSink implements Configurable。

  一、首先是configure(Context context)方法。該方法是對HBaseSink的參數初始化。主要包括以下幾個:

  tableName:要寫入的HBase數據表名,不能為空;

  columnFamily:數據表對應的列簇名,這個sink目前只支持一個列簇,不能為空;

  batchSize:每次事務可以處理的最大Event數量,默認是100;

  eventSerializerType:用來將event寫入HBase,即將event轉化為put。默認是org.apache.flume.sink.hbase.SimpleHbaseEventSerializer,還有一個是RegexHbaseEventSerializer,即適合HBaseSink的Serializer只有這倆,否則自己定制;

  serializerContext:是eventSerializerType的配置信息,就是配置文件中包含“serializer.”的項;

  kerberosKeytab和kerberosPrincipal是用來做訪問控制的,默認都為空,即不設置。

  並生成eventSerializerType對應的實例並加以配置,兩個Serializer各有不同的用途主要是一個只能寫一列,一個可以寫多列: 

1 Class<? extends HbaseEventSerializer> clazz =
2           (Class<? extends HbaseEventSerializer>)
3           Class.forName(eventSerializerType);
4       serializer = clazz.newInstance();
5       serializer.configure(serializerContext);        //配置序列化組件,先配置。默認是SimpleHbaseEventSerializer

  1、SimpleHbaseEventSerializer.configure(Context context):此Serializer只能將數據寫入一列

 1   public void configure(Context context) {
 2     rowPrefix = context.getString("rowPrefix", "default");  //獲取RowKey的前綴,固定的部分,默認前綴是default
 3     incrementRow =
 4         context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);//獲取計數器對應的行鍵
 5     String suffix = context.getString("suffix", "uuid");  //rowkey的類型(可以指定的有四種uuid/random/timestamp/nano),默認是uuid
 6 
 7     String payloadColumn = context.getString("payloadColumn");  //要寫入HBase的列名
 8     String incColumn = context.getString("incrementColumn");  //計數器對應的列
 9     if(payloadColumn != null && !payloadColumn.isEmpty()) {  //根據suffix決定rowkey類型
10       if(suffix.equals("timestamp")){
11         keyType = KeyType.TS;
12       } else if (suffix.equals("random")) {
13         keyType = KeyType.RANDOM;
14       } else if(suffix.equals("nano")){
15         keyType = KeyType.TSNANO;
16       } else {
17         keyType = KeyType.UUID;
18       }
19       plCol = payloadColumn.getBytes(Charsets.UTF_8);  //列名
20     }
21     if(incColumn != null && !incColumn.isEmpty()) {  //存在計數器列
22       incCol = incColumn.getBytes(Charsets.UTF_8);
23     }
24   }

  2、RegexHbaseEventSerializer.configure(Context context):此Serializer根據正則可以寫入多列

  public void configure(Context context) {
    String regex = context.getString(REGEX_CONFIG, REGEX_DEFAULT);  //獲取配置文件中的正則表達式,默認是“(.*)”
    regexIgnoreCase = context.getBoolean(IGNORE_CASE_CONFIG, 
        INGORE_CASE_DEFAULT);  //是否忽略大小寫
    inputPattern = Pattern.compile(regex, Pattern.DOTALL
        + (regexIgnoreCase ? Pattern.CASE_INSENSITIVE : 0));  //將給定的正則表達式編譯到具有給定標志的模式中
    
    String colNameStr = context.getString(COL_NAME_CONFIG, COLUMN_NAME_DEFAULT);  //獲取配置文件中的列名s
    String[] columnNames = colNameStr.split(",");  //分割列名獲得列名數組
    for (String s: columnNames) { 
      colNames.add(s.getBytes(Charsets.UTF_8));
    }
  }

  二、start()方法。該方法首先會構造一個HTable對象,並table.setAutoFlush(false)來激活緩沖區(默認大小時2MB),隨后的是一些檢查。

  三、然后是process()方法用來從channel中take數據,serializer之后寫入HBase。

 1 public Status process() throws EventDeliveryException {
 2     Status status = Status.READY;
 3     Channel channel = getChannel();
 4     Transaction txn = channel.getTransaction();
 5     List<Row> actions = new LinkedList<Row>();
 6     List<Increment> incs = new LinkedList<Increment>();
 7     txn.begin();
 8     for(long i = 0; i < batchSize; i++) {
 9       Event event = channel.take();
10       if(event == null){
11         status = Status.BACKOFF;
12         counterGroup.incrementAndGet("channel.underflow");
13         break;
14       } else {
15         serializer.initialize(event, columnFamily);
16         actions.addAll(serializer.getActions());
17         incs.addAll(serializer.getIncrements());
18       }
19     }
20     putEventsAndCommit(actions, incs, txn);
21     return status;
22   }

  1、actions和incs是要寫入HBase的數據,actions對應的是數據;incs對應的是計數器。

  2、serializer.initialize(event, columnFamily),兩個Serializer的initialize目的一樣:

1 public void initialize(Event event, byte[] columnFamily) {
2     this.payload = event.getBody();  //獲取要處理的數據
3     this.cf = columnFamily;    //獲取要寫入的列簇
4   }

  3、serializer.getActions()

  SimpleHbaseEventSerializer.getActions()方法會根據configure(Context context)中設置的RowKey類型先獲取rowkey,可以是毫秒時間戳、隨機數、納秒時間戳以及UUID128位數四種類型。然后構造一個Put對象,將(列簇,列名,數據)添加進這個Put,返回List<Row> actions。

  RegexHbaseEventSerializer.getActions()方法,首先會做一些判斷匹配成功否?匹配出的個數和指定的列數相同否?,然后是獲取rowkey,這里的rowkey是[time in millis]-[random key]-[nonce]三部分組成的字符串。剩下的是依次匹配列組成Put,返回List<Row> actions。

  4、serializer.getIncrements()

  SimpleHbaseEventSerializer.getIncrements()如果配置文件中配置了incrementColumn,就添加相應的計數器,否則返回一個沒有數據的List<Increment>。

  RegexHbaseEventSerializer.getIncrements()直接返回一個沒有數據的List<Increment>,即不設置計數器。

  5、putEventsAndCommit(actions, incs, txn)方法。首先會table.batch(actions)提交List<Put>;然后是計數器table.increment(i);txn.commit()提交事務;如有異常txn.rollback()回滾;txn.close()事務關閉。

  四、stop()方法。table.close();table = null;

 

  有兩個問題撒:

  1、我們在開發HBase程序的時候總是要指定“hbase.zookeeper.quorum”對應的zookeeper地址的,但是看完HBaseSink也沒發現設置的地方,是不是在HBase集群中的任意節點都不需要設置,除非在集群外節點才設置?

  2、還有在使用時發現放在安裝有zookeeper的節點上運行flume報錯,刪除zookeeper后運行正常,沒安裝zookeeper的節點上運行正常,這是為什么??

  希望知道的可以解答哈。。。HBaseSink也比較簡單。。。后續還有更多源碼解讀!敬請期待!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM