HBase讀寫的幾種方式(三)flink篇


1. HBase連接的方式概況

主要分為:

  1. 純Java API讀寫HBase的方式;
  2. Spark讀寫HBase的方式;
  3. Flink讀寫HBase的方式;
  4. HBase通過Phoenix讀寫的方式;

第一種方式是HBase自身提供的比較原始的高效操作方式,而第二、第三則分別是Spark、Flink集成HBase的方式,最后一種是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中調用。

注意:

這里我們使用HBase2.1.2版本,flink1.7.2版本,scala-2.12版本。

2. Flink Streaming和Flink DataSet讀寫HBase

 Flink上讀取HBase數據有兩種方式:

  • 繼承RichSourceFunction重寫父類方法(flink streaming)
  • 實現自定義TableInputFormat接口(flink streaming和flink dataSet)

Flink上將數據寫入HBase也有兩種方式:

  • 繼承RichSinkFunction重寫父類方法(flink streaming)
  • 實現OutputFormat接口(flink streaming和flink dataSet)

注意:

① Flink Streaming流式處理有上述兩種方式;但是Flink DataSet批處理,讀只有“實現TableInputFormat接口”一種方式,寫只有”實現OutputFormat接口“一種方式

②TableInputFormat接口是在flink-hbase-2.12-1.7.2里面的,而該jar包對應的hbase版本是1.4.3,而項目中我們使用HBase2.1.2版本,故需要對TableInputFormat重寫。

   

2.1 Flink讀取HBase的兩種方式

注意:讀取HBase之前可以先執行節點2.2.2實現OutputFormat接口:Flink dataSet 批處理寫入HBase的方法,確保HBase test表里面有數據,數據如下:

  

2.1.1 繼承RichSourceFunction重寫父類方法:

package cn.swordfall.hbaseOnFlink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Scan, Table}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.JavaConverters._
/**
  * @Author: Yang JianQiu
  * @Date: 2019/2/28 18:05
  *
  * 以HBase為數據源
  * 從HBase中獲取數據,然后以流的形式發射
  *
  * 從HBase讀取數據
  * 第一種:繼承RichSourceFunction重寫父類方法
  */
class HBaseReader extends RichSourceFunction[(String, String)]{

  private var conn: Connection = null
  private var table: Table = null
  private var scan: Scan = null

  /**
    * 在open方法使用HBase的客戶端連接
    * @param parameters
    */
  override def open(parameters: Configuration): Unit = {
    val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()

    config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)

    val tableName: TableName = TableName.valueOf("test")
    val cf1: String = "cf1"
    conn = ConnectionFactory.createConnection(config)
    table = conn.getTable(tableName)
    scan = new Scan()
    scan.withStartRow(Bytes.toBytes("100"))
    scan.withStopRow(Bytes.toBytes("107"))
    scan.addFamily(Bytes.toBytes(cf1))
  }

  /**
    * run方法來自java的接口文件SourceFunction,使用IDEA工具Ctrl + o 無法便捷獲取到該方法,直接override會提示
    * @param sourceContext
    */
  override def run(sourceContext: SourceContext[(String, String)]): Unit = {
    val rs = table.getScanner(scan)
    val iterator = rs.iterator()
    while (iterator.hasNext){
      val result = iterator.next()
      val rowKey = Bytes.toString(result.getRow)
      val sb: StringBuffer = new StringBuffer()
      for (cell:Cell <- result.listCells().asScala){
        val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
        sb.append(value).append("_")
      }
      val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString
      sourceContext.collect((rowKey, valueString))
    }
  }

  /**
    * 必須添加
    */
  override def cancel(): Unit = {

  }

  /**
    * 關閉hbase的連接,關閉table表
    */
  override def close(): Unit = {
    try {
      if (table != null) {
        table.close()
      }
      if (conn != null) {
        conn.close()
      }
    } catch {
      case e:Exception => println(e.getMessage)
    }
  }
}

調用繼承RichSourceFunction的HBaseReader類,Flink Streaming流式處理的方式:

/**
  * 從HBase讀取數據
  * 第一種:繼承RichSourceFunction重寫父類方法
  */
 def readFromHBaseWithRichSourceFunction(): Unit ={
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.enableCheckpointing(5000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   val dataStream: DataStream[(String, String)] = env.addSource(new HBaseReader)
   dataStream.map(x => println(x._1 + " " + x._2))
   env.execute()
 }

2.1.2 實現自定義的TableInputFormat接口:

由於版本不匹配,這里我們需要對flink-hbase-2.12-1.7.2里面的三個文件進行重寫,分別是TableInputSplit、AbstractTableInputFormat、TableInputFormat

TableInputSplit重寫為CustomTableInputSplit:

package cn.swordfall.hbaseOnFlink.flink172_hbase212;

import org.apache.flink.core.io.LocatableInputSplit;

/**
 * @Author: Yang JianQiu
 * @Date: 2019/3/19 11:50
 */
public class CustomTableInputSplit extends LocatableInputSplit {
    private static final long serialVersionUID = 1L;

    /** The name of the table to retrieve data from. */
    private final byte[] tableName;

    /** The start row of the split. */
    private final byte[] startRow;

    /** The end row of the split. */
    private final byte[] endRow;

    /**
     * Creates a new table input split.
     *
     * @param splitNumber
     *        the number of the input split
     * @param hostnames
     *        the names of the hosts storing the data the input split refers to
     * @param tableName
     *        the name of the table to retrieve data from
     * @param startRow
     *        the start row of the split
     * @param endRow
     *        the end row of the split
     */
    CustomTableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow,
                    final byte[] endRow) {
        super(splitNumber, hostnames);

        this.tableName = tableName;
        this.startRow = startRow;
        this.endRow = endRow;
    }

    /**
     * Returns the table name.
     *
     * @return The table name.
     */
    public byte[] getTableName() {
        return this.tableName;
    }

    /**
     * Returns the start row.
     *
     * @return The start row.
     */
    public byte[] getStartRow() {
        return this.startRow;
    }

    /**
     * Returns the end row.
     *
     * @return The end row.
     */
    public byte[] getEndRow() {
        return this.endRow;
    }
}

AbstractTableInputFormat重寫為CustomeAbstractTableInputFormat:

package cn.swordfall.hbaseOnFlink.flink172_hbase212;

import org.apache.flink.addons.hbase.AbstractTableInputFormat;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: Yang JianQiu
 * @Date: 2019/3/19 11:16
 *
 * 由於flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而現在用到的是hbase2.1.2,版本不匹配
 * 故需要重寫flink-hbase_2.12_1.7.2里面的AbstractTableInputFormat,主要原因是AbstractTableInputFormat里面調用的是hbase1.4.3版本的api,
 * 而新版本hbase2.1.2已經去掉某些api
 */
public abstract class CustomAbstractTableInputFormat<T> extends RichInputFormat<T, CustomTableInputSplit> {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class);

    // helper variable to decide whether the input is exhausted or not
    protected boolean endReached = false;

    protected transient HTable table = null;
    protected transient Scan scan = null;

    /** HBase iterator wrapper. */
    protected ResultScanner resultScanner = null;

    protected byte[] currentRow;
    protected long scannedRows;

    /**
     * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
     *
     * @return The appropriate instance of Scan for this use case.
     */
    protected abstract Scan getScanner();

    /**
     * What table is to be read.
     *
     * <p>Per instance of a TableInputFormat derivative only a single table name is possible.
     *
     * @return The name of the table
     */
    protected abstract String getTableName();

    /**
     * HBase returns an instance of {@link Result}.
     *
     * <p>This method maps the returned {@link Result} instance into the output type {@link T}.
     *
     * @param r The Result instance from HBase that needs to be converted
     * @return The appropriate instance of {@link T} that contains the data of Result.
     */
    protected abstract T mapResultToOutType(Result r);

    /**
     * Creates a {@link Scan} object and opens the {@link HTable} connection.
     *
     * <p>These are opened here because they are needed in the createInputSplits
     * which is called before the openInputFormat method.
     *
     * <p>The connection is opened in this method and closed in {@link #closeInputFormat()}.
     *
     * @param parameters The configuration that is to be used
     * @see Configuration
     */
    @Override
    public abstract void configure(Configuration parameters);

    @Override
    public void open(CustomTableInputSplit split) throws IOException {
        if (table == null) {
            throw new IOException("The HBase table has not been opened! " +
                    "This needs to be done in configure().");
        }
        if (scan == null) {
            throw new IOException("Scan has not been initialized! " +
                    "This needs to be done in configure().");
        }
        if (split == null) {
            throw new IOException("Input split is null!");
        }

        logSplitInfo("opening", split);

        // set scan range
        currentRow = split.getStartRow();
       /* scan.setStartRow(currentRow);
        scan.setStopRow(split.getEndRow());*/
        scan.withStartRow(currentRow);
        scan.withStopRow(split.getEndRow());

        resultScanner = table.getScanner(scan);
        endReached = false;
        scannedRows = 0;
    }

    @Override
    public T nextRecord(T reuse) throws IOException {
        if (resultScanner == null) {
            throw new IOException("No table result scanner provided!");
        }
        try {
            Result res = resultScanner.next();
            if (res != null) {
                scannedRows++;
                currentRow = res.getRow();
                return mapResultToOutType(res);
            }
        } catch (Exception e) {
            resultScanner.close();
            //workaround for timeout on scan
            LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e);
            /*scan.setStartRow(currentRow);*/
            scan.withStartRow(currentRow);
            resultScanner = table.getScanner(scan);
            Result res = resultScanner.next();
            if (res != null) {
                scannedRows++;
                currentRow = res.getRow();
                return mapResultToOutType(res);
            }
        }

        endReached = true;
        return null;
    }

    private void logSplitInfo(String action, CustomTableInputSplit split) {
        int splitId = split.getSplitNumber();
        String splitStart = Bytes.toString(split.getStartRow());
        String splitEnd = Bytes.toString(split.getEndRow());
        String splitStartKey = splitStart.isEmpty() ? "-" : splitStart;
        String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd;
        String[] hostnames = split.getHostnames();
        LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        return endReached;
    }

    @Override
    public void close() throws IOException {
        LOG.info("Closing split (scanned {} rows)", scannedRows);
        currentRow = null;
        try {
            if (resultScanner != null) {
                resultScanner.close();
            }
        } finally {
            resultScanner = null;
        }
    }

    @Override
    public void closeInputFormat() throws IOException {
        try {
            if (table != null) {
                table.close();
            }
        } finally {
            table = null;
        }
    }

    @Override
    public CustomTableInputSplit[] createInputSplits(final int minNumSplits) throws IOException {
        if (table == null) {
            throw new IOException("The HBase table has not been opened! " +
                    "This needs to be done in configure().");
        }
        if (scan == null) {
            throw new IOException("Scan has not been initialized! " +
                    "This needs to be done in configure().");
        }

        // Get the starting and ending row keys for every region in the currently open table
        final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys();
        if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) {
            throw new IOException("Expecting at least one region.");
        }
        final byte[] startRow = scan.getStartRow();
        final byte[] stopRow = scan.getStopRow();
        final boolean scanWithNoLowerBound = startRow.length == 0;
        final boolean scanWithNoUpperBound = stopRow.length == 0;

        final List<CustomTableInputSplit> splits = new ArrayList<CustomTableInputSplit>(minNumSplits);
        for (int i = 0; i < keys.getFirst().length; i++) {
            final byte[] startKey = keys.getFirst()[i];
            final byte[] endKey = keys.getSecond()[i];
            final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort();
            // Test if the given region is to be included in the InputSplit while splitting the regions of a table
            if (!includeRegionInScan(startKey, endKey)) {
                continue;
            }
            // Find the region on which the given row is being served
            final String[] hosts = new String[]{regionLocation};

            // Determine if regions contains keys used by the scan
            boolean isLastRegion = endKey.length == 0;
            if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) &&
                    (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) {

                final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow;
                final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0)
                        && !isLastRegion ? endKey : stopRow;
                int id = splits.size();
                final CustomTableInputSplit split = new CustomTableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop);
                splits.add(split);
            }
        }
        LOG.info("Created " + splits.size() + " splits");
        for (CustomTableInputSplit split : splits) {
            logSplitInfo("created", split);
        }
        return splits.toArray(new CustomTableInputSplit[splits.size()]);
    }

    /**
     * Test if the given region is to be included in the scan while splitting the regions of a table.
     *
     * @param startKey Start key of the region
     * @param endKey   End key of the region
     * @return true, if this region needs to be included as part of the input (default).
     */
    protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) {
        return true;
    }

    @Override
    public InputSplitAssigner getInputSplitAssigner(CustomTableInputSplit[] inputSplits) {
        return new LocatableInputSplitAssigner(inputSplits);
    }

    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
        return null;
    }
}

TableInputFormat重寫為CustomTableInputFormat:

package cn.swordfall.hbaseOnFlink.flink172_hbase212;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;

/**
 * @Author: Yang JianQiu
 * @Date: 2019/3/19 11:15
 * 由於flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而現在用到的是hbase2.1.2,版本不匹配
 * 故需要重寫flink-hbase_2.12_1.7.2里面的TableInputFormat
 */
public abstract class CustomTableInputFormat<T extends Tuple> extends CustomAbstractTableInputFormat<T> {

    private static final long serialVersionUID = 1L;

    /**
     * Returns an instance of Scan that retrieves the required subset of records from the HBase table.
     * @return The appropriate instance of Scan for this usecase.
     */
    @Override
    protected abstract Scan getScanner();

    /**
     * What table is to be read.
     * Per instance of a TableInputFormat derivative only a single tablename is possible.
     * @return The name of the table
     */
    @Override
    protected abstract String getTableName();

    /**
     * The output from HBase is always an instance of {@link Result}.
     * This method is to copy the data in the Result instance into the required {@link Tuple}
     * @param r The Result instance from HBase that needs to be converted
     * @return The appropriate instance of {@link Tuple} that contains the needed information.
     */
    protected abstract T mapResultToTuple(Result r);

    /**
     * Creates a {@link Scan} object and opens the {@link HTable} connection.
     * These are opened here because they are needed in the createInputSplits
     * which is called before the openInputFormat method.
     * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}.
     *
     * @param parameters The configuration that is to be used
     * @see Configuration
     */
    @Override
    public void configure(Configuration parameters) {
        table = createTable();
        if (table != null) {
            scan = getScanner();
        }
    }

    /**
     * Create an {@link HTable} instance and set it into this format.
     */
    private HTable createTable() {
        LOG.info("Initializing HBaseConfiguration");
        //use files found in the classpath
        org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create();

        try {
            return null;
        } catch (Exception e) {
            LOG.error("Error instantiating a new HTable instance", e);
        }
        return null;
    }

    @Override
    protected T mapResultToOutType(Result r) {
        return mapResultToTuple(r);
    }
}

繼承自定義的CustomTableInputFormat,進行hbase連接、讀取操作:

package cn.swordfall.hbaseOnFlink

import java.io.IOException

import cn.swordfall.hbaseOnFlink.flink172_hbase212.CustomTableInputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes

import scala.collection.JavaConverters._
/**
  * @Author: Yang JianQiu
  * @Date: 2019/3/1 1:14
  *
  * 從HBase讀取數據
  * 第二種:實現TableInputFormat接口
  */
class HBaseInputFormat extends CustomTableInputFormat[Tuple2[String, String]]{

  // 結果Tuple
  val tuple2 = new Tuple2[String, String]

  /**
    * 建立HBase連接
    * @param parameters
    */
  override def configure(parameters: Configuration): Unit = {
    val tableName: TableName = TableName.valueOf("test")
    val cf1 = "cf1"
    var conn: Connection = null
    val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create

    config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)

    try {
      conn = ConnectionFactory.createConnection(config)
      table = conn.getTable(tableName).asInstanceOf[HTable]
      scan = new Scan()
      scan.withStartRow(Bytes.toBytes("001"))
      scan.withStopRow(Bytes.toBytes("201"))
      scan.addFamily(Bytes.toBytes(cf1))
    } catch {
      case e: IOException =>
        e.printStackTrace()
    }
  }

  /**
    * 對獲取的數據進行加工處理
    * @param result
    * @return
    */
  override def mapResultToTuple(result: Result): Tuple2[String, String] = {
    val rowKey = Bytes.toString(result.getRow)
    val sb = new StringBuffer()
    for (cell: Cell <- result.listCells().asScala){
      val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
      sb.append(value).append("_")
    }
    val value = sb.replace(sb.length() - 1, sb.length(), "").toString
    tuple2.setField(rowKey, 0)
    tuple2.setField(value, 1)
    tuple2
  }

  /**
    * tableName
    * @return
    */
  override def getTableName: String = "test"


  /**
    * 獲取Scan
    * @return
    */
  override def getScanner: Scan = {
    scan
  }

}

調用實現CustomTableInputFormat接口的類HBaseInputFormat,Flink Streaming流式處理的方式:

/**
 * 從HBase讀取數據
 * 第二種:實現TableInputFormat接口
 */
 def readFromHBaseWithTableInputFormat(): Unit ={
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.enableCheckpointing(5000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

   val dataStream = env.createInput(new HBaseInputFormat)
   dataStream.filter(_.f0.startsWith("10")).print()
   env.execute()
 }

而Flink DataSet批處理的方式為:

/**
 * 讀取HBase數據方式:實現TableInputFormat接口
 */
 def readFromHBaseWithTableInputFormat(): Unit ={
   val env = ExecutionEnvironment.getExecutionEnvironment

   val dataStream = env.createInput(new HBaseInputFormat)
   dataStream.filter(_.f1.startsWith("20")).print()
 }

2.2 Flink寫入HBase的兩種方式

這里Flink Streaming寫入HBase,需要從Kafka接收數據,可以開啟kafka單機版,利用kafka-console-producer.sh往topic "test"寫入如下數據:

100,hello,20
101,nice,24
102,beautiful,26

2.2.1 繼承RichSinkFunction重寫父類方法:

package cn.swordfall.hbaseOnFlink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes

/**
  * @Author: Yang JianQiu
  * @Date: 2019/3/1 1:34
  *
  * 寫入HBase
  * 第一種:繼承RichSinkFunction重寫父類方法
  *
  * 注意:由於flink是一條一條的處理數據,所以我們在插入hbase的時候不能來一條flush下,
  * 不然會給hbase造成很大的壓力,而且會產生很多線程導致集群崩潰,所以線上任務必須控制flush的頻率。
  *
  * 解決方案:我們可以在open方法中定義一個變量,然后在寫入hbase時比如500條flush一次,或者加入一個list,判斷list的大小滿足某個閥值flush一下
  */
class HBaseWriter extends RichSinkFunction[String]{

  var conn: Connection = null
  val scan: Scan = null
  var mutator: BufferedMutator = null
  var count = 0

  /**
    * 建立HBase連接
    * @param parameters
    */
  override def open(parameters: Configuration): Unit = {
    val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
    config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201")
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    conn = ConnectionFactory.createConnection(config)

    val tableName: TableName = TableName.valueOf("test")
    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
    //設置緩存1m,當達到1m時數據會自動刷到hbase
    params.writeBufferSize(1024 * 1024) //設置緩存的大小
    mutator = conn.getBufferedMutator(params)
    count = 0
  }

  /**
    * 處理獲取的hbase數據
    * @param value
    * @param context
    */
  override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {
    val cf1 = "cf1"
    val array: Array[String] = value.split(",")
    val put: Put = new Put(Bytes.toBytes(array(0)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
    mutator.mutate(put)
    //每滿2000條刷新一下數據
    if (count >= 2000){
      mutator.flush()
      count = 0
    }
    count = count + 1
  }

  /**
    * 關閉
    */
  override def close(): Unit = {
    if (conn != null) conn.close()
  }
}

 

調用繼承RichSinkFunction的HBaseWriter類,Flink Streaming流式處理的方式:

/**
  * 寫入HBase
  * 第一種:繼承RichSinkFunction重寫父類方法
  */
 def write2HBaseWithRichSinkFunction(): Unit = {
   val topic = "test"
   val props = new Properties
   props.put("bootstrap.servers", "192.168.187.201:9092")
   props.put("group.id", "kv_flink")
   props.put("enable.auto.commit", "true")
   props.put("auto.commit.interval.ms", "1000")
   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.enableCheckpointing(5000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)
   val dataStream: DataStream[String] = env.addSource(myConsumer)
   //寫入HBase
   dataStream.addSink(new HBaseWriter)
   env.execute()
 }

2.2.2 實現OutputFormat接口:

package cn.swordfall.hbaseOnFlink

import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes

/**
  * @Author: Yang JianQiu
  * @Date: 2019/3/1 1:40
  *
  * 寫入HBase提供兩種方式
  * 第二種:實現OutputFormat接口
  */
class HBaseOutputFormat extends OutputFormat[String]{

  val zkServer = "192.168.187.201"
  val port = "2181"
  var conn: Connection = null
  var mutator: BufferedMutator = null
  var count = 0

  /**
    * 配置輸出格式。此方法總是在實例化輸出格式上首先調用的
    *
    * @param configuration
    */
  override def configure(configuration: Configuration): Unit = {

  }

  /**
    * 用於打開輸出格式的並行實例,所以在open方法中我們會進行hbase的連接,配置,建表等操作。
    *
    * @param i
    * @param i1
    */
  override def open(i: Int, i1: Int): Unit = {
    val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create
    config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
    config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
    config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
    config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
    conn = ConnectionFactory.createConnection(config)

    val tableName: TableName = TableName.valueOf("test")

    val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
    //設置緩存1m,當達到1m時數據會自動刷到hbase
    params.writeBufferSize(1024 * 1024) //設置緩存的大小
    mutator = conn.getBufferedMutator(params)
    count = 0
  }

  /**
    * 用於將數據寫入數據源,所以我們會在這個方法中調用寫入hbase的API
    *
    * @param it
    */
  override def writeRecord(it: String): Unit = {

    val cf1 = "cf1"
    val array: Array[String] = it.split(",")
    val put: Put = new Put(Bytes.toBytes(array(0)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
    put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
    mutator.mutate(put)
    //每4條刷新一下數據,如果是批處理調用outputFormat,這里填寫的4必須不能大於批處理的記錄總數量,否則數據不會更新到hbase里面
    if (count >= 4){
      mutator.flush()
      count = 0
    }
    count = count + 1
  }

  /**
    * 關閉
    */
  override def close(): Unit = {
    try {
      if (conn != null) conn.close()
    } catch {
      case e: Exception => println(e.getMessage)
    }
  }
}

調用實現OutputFormat的HBaseOutputFormat類,Flink Streaming流式處理的方式:

/**
  * 寫入HBase
  * 第二種:實現OutputFormat接口
  */
 def write2HBaseWithOutputFormat(): Unit = {
   val topic = "test"
   val props = new Properties
   props.put("bootstrap.servers", "192.168.187.201:9092")
   props.put("group.id", "kv_flink")
   props.put("enable.auto.commit", "true")
   props.put("auto.commit.interval.ms", "1000")
   props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
   props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   env.enableCheckpointing(5000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
   val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props)
   val dataStream: DataStream[String] = env.addSource(myConsumer)
   dataStream.writeUsingOutputFormat(new HBaseOutputFormat)
   env.execute()
 }

而Flink DataSet批處理的方式為:

/**
  * 寫入HBase方式:實現OutputFormat接口
  */
 def write2HBaseWithOutputFormat(): Unit = {
   val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

   //2.定義數據
   val dataSet: DataSet[String] = env.fromElements("103,zhangsan,20", "104,lisi,21", "105,wangwu,22", "106,zhaolilu,23")
   dataSet.output(new HBaseOutputFormat)
   //運行下面這句話,程序才會真正執行,這句代碼針對的是data sinks寫入數據的
   env.execute()
 }

注意:

  如果是批處理調用的,應該要注意HBaseOutputFormat類的writeRecord方法每次批量刷新的數據量不能大於批處理的總記錄數據量,否則數據更新不到hbase里面。

3. 總結

【其他相關文章】

HBase連接的幾種方式(一)java篇  查看純Java API讀寫HBase

HBase連接的幾種方式(二)spark篇 查看Spark上讀寫HBase

github地址:

https://github.com/SwordfallYeung/HBaseDemo(flink讀寫hbase包括java和scala兩個版本的代碼)

【參考資料】

https://blog.csdn.net/liguohuabigdata/article/details/78588861

 https://blog.csdn.net/aA518189/article/details/86544844

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM