flume 自定義 hbase sink 類


flume 1.5 的配置文件示例

#Name the  components on this agent  
a1.sources  = r1  
a1.sinks =  k1  
a1.channels  = c1  
  
#  Describe/configure the source  
a1.sources.r1.type  = spooldir  
a1.sources.r1.spoolDir  = /home/scut/Downloads/testFlume  
  
# Describe  the sink  
a1.sinks.k1.type  = org.apache.flume.sink.hbase.AsyncHBaseSink  
a1.sinks.k1.table = Router #設置hbase的表名  
a1.sinks.k1.columnFamily = log #設置hbase中的columnFamily  
a1.sinks.k1.serializer.payloadColumn=serviceTime,browerOS,clientTime,screenHeight,screenWidth,url,userAgent,mobileDevice,gwId,mac # 設置hbase的column  
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.BaimiAsyncHbaseEventSerializer # 設置serializer的處理類  
  
# Use a  channel which buffers events in memory  
a1.channels.c1.type  = memory  
a1.channels.c1.capacity  = 1000  
a1.channels.c1.transactionCapacity  = 100  
  
# Bind the  source and sink to the channel  
a1.sources.r1.channels  = c1  
a1.sinks.k1.channel  = c1  

  

重點說明幾個屬性
  • a1.sinks.k1.serializer.payloadColumn 中列出了所有的列名。
  • a1.sinks.k1.serializer設置了flume serializer的處理類。BaimiAsyncHbaseEventSerializer類中會獲取payloadColumn的內容,將它以逗號分隔,從而得出所有的列名。

BaimiAsyncHbaseEventSerializer類

/* 
 * Licensed to the Apache Software Foundation (ASF) under one 
 * or more contributor license agreements.  See the NOTICE file 
 * distributed with this work for additional information 
 * regarding copyright ownership.  The ASF licenses this file 
 * to you under the Apache License, Version 2.0 (the 
 * "License"); you may not use this file except in compliance 
 * with the License.  You may obtain a copy of the License at 
 * 
 * http://www.apache.org/licenses/LICENSE-2.0 
 * 
 * Unless required by applicable law or agreed to in writing, 
 * software distributed under the License is distributed on an 
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
 * KIND, either express or implied.  See the License for the 
 * specific language governing permissions and limitations 
 * under the License. 
 */  
  
package org.apache.flume.sink.hbase;  
  
import java.util.ArrayList;  
import java.util.List;  
  
import org.apache.flume.Context;  
import org.apache.flume.Event;  
import org.apache.flume.FlumeException;  
import org.hbase.async.AtomicIncrementRequest;  
import org.hbase.async.PutRequest;  
import org.apache.flume.conf.ComponentConfiguration;  
import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;  
  
import com.google.common.base.Charsets;  
  
public class BaimiAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer {  
  private byte[] table;  
  private byte[] cf;  
  private byte[][] payload;  
  private byte[][] payloadColumn;  
  private final String payloadColumnSplit = "\\^A";  
  private byte[] incrementColumn;  
  private String rowSuffix;  
  private String rowSuffixCol;  
  private byte[] incrementRow;  
  private KeyType keyType;  
  
  @Override  
  public void initialize(byte[] table, byte[] cf) {  
    this.table = table;  
    this.cf = cf;  
  }  
  
  @Override  
  public List<PutRequest> getActions() {  
    List<PutRequest> actions = new ArrayList<PutRequest>();  
    if(payloadColumn != null){  
      byte[] rowKey;  
      try {  
        switch (keyType) {  
          case TS:  
            rowKey = SimpleRowKeyGenerator.getTimestampKey(rowSuffix);  
            break;  
          case TSNANO:  
            rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowSuffix);  
            break;  
          case RANDOM:  
            rowKey = SimpleRowKeyGenerator.getRandomKey(rowSuffix);  
            break;  
          default:  
            rowKey = SimpleRowKeyGenerator.getUUIDKey(rowSuffix);  
            break;  
        }  
  
    // for 循環,提交所有列和對於數據的put請求。  
    for (int i = 0; i < this.payload.length; i++)  
    {  
            PutRequest putRequest =  new PutRequest(table, rowKey, cf,payloadColumn[i], payload[i]);  
            actions.add(putRequest);  
    }  
  
      } catch (Exception e){  
        throw new FlumeException("Could not get row key!", e);  
      }  
    }  
    return actions;  
  }  
  
  public List<AtomicIncrementRequest> getIncrements(){  
    List<AtomicIncrementRequest> actions = new  
        ArrayList<AtomicIncrementRequest>();  
    if(incrementColumn != null) {  
      AtomicIncrementRequest inc = new AtomicIncrementRequest(table,  
          incrementRow, cf, incrementColumn);  
      actions.add(inc);  
    }  
    return actions;  
  }  
  
  @Override  
  public void cleanUp() {  
    // TODO Auto-generated method stub  
  
  }  
  
  @Override  
  public void configure(Context context) {  
    String pCol = context.getString("payloadColumn", "pCol");  
    String iCol = context.getString("incrementColumn", "iCol");  
    rowSuffixCol = context.getString("rowPrefixCol", "mac");  
    String suffix = context.getString("suffix", "uuid");  
    if(pCol != null && !pCol.isEmpty()) {  
      if(suffix.equals("timestamp")){  
        keyType = KeyType.TS;  
      } else if (suffix.equals("random")) {  
        keyType = KeyType.RANDOM;  
      } else if(suffix.equals("nano")){  
        keyType = KeyType.TSNANO;  
      } else {  
        keyType = KeyType.UUID;  
      }  
   
        // 從配置文件中讀出column。   
        String[] pCols = pCol.replace(" ", "").split(",");  
        payloadColumn = new byte[pCols.length][];  
        for (int i = 0; i < pCols.length; i++)  
    {  
        // 列名轉為小寫  
        payloadColumn[i] = pCols[i].toLowerCase().getBytes(Charsets.UTF_8);  
    }  
    }  
  
    if(iCol != null && !iCol.isEmpty()) {  
      incrementColumn = iCol.getBytes(Charsets.UTF_8);  
    }  
    incrementRow =  
        context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8);  
  }  
  
  @Override  
  public void setEvent(Event event) {  
    String strBody = new String(event.getBody());  
    String[] subBody = strBody.split(this.payloadColumnSplit);  
    if (subBody.length == this.payloadColumn.length)  
    {  
        this.payload = new byte[subBody.length][];  
        for (int i = 0; i < subBody.length; i++)  
        {  
            this.payload[i] = subBody[i].getBytes(Charsets.UTF_8);  
            if ((new String(this.payloadColumn[i]).equals(this.rowSuffixCol)))  
            {  
                // rowkey 前綴是某一列的值, 默認情況是mac地址  
                this.rowSuffix = subBody[i];  
            }  
        }  
    }  
  }  
  
  @Override  
  public void configure(ComponentConfiguration conf) {  
    // TODO Auto-generated method stub  
  }  
}  

 

重點可以查看setEent,configure,getActions函數。
  • configure函數:讀取flume配置文件內容,包括列名,rowkey后綴等信息
  • setEvent函數:獲取flume event 內容,將其保存到payload數組中。
  • getActions函數:創建PutRequest實例,將rowkey,columnfamily,column,value等信息寫入putrequest實例中。

源碼編譯和執行

     編寫好自定義的BaimiAsyncHbaseEventSerializer函數后,接下來需要編譯源碼,生成flume-ng-hbase-sink.*.jar包,替換flume中原來的flume-ng-hbase-sink.*.jar包。
  • 下載flume 1.5 源碼,解壓后進入目錄flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/src/main/java/org/apache/flume/sink/hbase/
  • 復制上面的BaimiAsyncHbaseEventSerializer類到上面的目錄中。
  • 進入flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/,運行mvn編譯命令【mvn install -Dmaven.test.skip=true】
  • mvn編譯后會在flume-1.5.0-src/flume-ng-sinks/flume-ng-hbase-sinks/target目錄下生成flume-ng-hbase-sink-1.5.0.jar,將這個jar包替換$FLUME_HOME/lib下的jar包
  • 運行flume執行命令【flume-ng agent -c . -f conf/spoolDir.conf -n a1  -Dflume.root.logger=INFO,console】


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM