1. HBase連接的方式概況
主要分為:
- 純Java API讀寫HBase的方式;
- Spark讀寫HBase的方式;
- Flink讀寫HBase的方式;
- HBase通過Phoenix讀寫的方式;
第一種方式是HBase自身提供的比較原始的高效操作方式,而第二、第三則分別是Spark、Flink集成HBase的方式,最后一種是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中調用。
注意:
這里我們使用HBase2.1.2版本,flink1.7.2版本,scala-2.12版本。
2. Flink Streaming和Flink DataSet讀寫HBase
Flink上讀取HBase數據有兩種方式:
- 繼承RichSourceFunction重寫父類方法(flink streaming)
- 實現自定義TableInputFormat接口(flink streaming和flink dataSet)
Flink上將數據寫入HBase也有兩種方式:
- 繼承RichSinkFunction重寫父類方法(flink streaming)
- 實現OutputFormat接口(flink streaming和flink dataSet)
注意:
① Flink Streaming流式處理有上述兩種方式;但是Flink DataSet批處理,讀只有“實現TableInputFormat接口”一種方式,寫只有”實現OutputFormat接口“一種方式。
②TableInputFormat接口是在flink-hbase-2.12-1.7.2里面的,而該jar包對應的hbase版本是1.4.3,而項目中我們使用HBase2.1.2版本,故需要對TableInputFormat重寫。
2.1 Flink讀取HBase的兩種方式
注意:讀取HBase之前可以先執行節點2.2.2實現OutputFormat接口:Flink dataSet 批處理寫入HBase的方法,確保HBase test表里面有數據,數據如下:
2.1.1 繼承RichSourceFunction重寫父類方法:
package cn.swordfall.hbaseOnFlink import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.RichSourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Scan, Table} import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConverters._ /** * @Author: Yang JianQiu * @Date: 2019/2/28 18:05 * * 以HBase為數據源 * 從HBase中獲取數據,然后以流的形式發射 * * 從HBase讀取數據 * 第一種:繼承RichSourceFunction重寫父類方法 */ class HBaseReader extends RichSourceFunction[(String, String)]{ private var conn: Connection = null private var table: Table = null private var scan: Scan = null /** * 在open方法使用HBase的客戶端連接 * @param parameters */ override def open(parameters: Configuration): Unit = { val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create() config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201") config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) val tableName: TableName = TableName.valueOf("test") val cf1: String = "cf1" conn = ConnectionFactory.createConnection(config) table = conn.getTable(tableName) scan = new Scan() scan.withStartRow(Bytes.toBytes("100")) scan.withStopRow(Bytes.toBytes("107")) scan.addFamily(Bytes.toBytes(cf1)) } /** * run方法來自java的接口文件SourceFunction,使用IDEA工具Ctrl + o 無法便捷獲取到該方法,直接override會提示 * @param sourceContext */ override def run(sourceContext: SourceContext[(String, String)]): Unit = { val rs = table.getScanner(scan) val iterator = rs.iterator() while (iterator.hasNext){ val result = iterator.next() val rowKey = Bytes.toString(result.getRow) val sb: StringBuffer = new StringBuffer() for (cell:Cell <- result.listCells().asScala){ val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength) sb.append(value).append("_") } val valueString = sb.replace(sb.length() - 1, sb.length(), "").toString sourceContext.collect((rowKey, valueString)) } } /** * 必須添加 */ override def cancel(): Unit = { } /** * 關閉hbase的連接,關閉table表 */ override def close(): Unit = { try { if (table != null) { table.close() } if (conn != null) { conn.close() } } catch { case e:Exception => println(e.getMessage) } } }
調用繼承RichSourceFunction的HBaseReader類,Flink Streaming流式處理的方式:
/** * 從HBase讀取數據 * 第一種:繼承RichSourceFunction重寫父類方法 */ def readFromHBaseWithRichSourceFunction(): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val dataStream: DataStream[(String, String)] = env.addSource(new HBaseReader) dataStream.map(x => println(x._1 + " " + x._2)) env.execute() }
2.1.2 實現自定義的TableInputFormat接口:
由於版本不匹配,這里我們需要對flink-hbase-2.12-1.7.2里面的三個文件進行重寫,分別是TableInputSplit、AbstractTableInputFormat、TableInputFormat
TableInputSplit重寫為CustomTableInputSplit:
package cn.swordfall.hbaseOnFlink.flink172_hbase212; import org.apache.flink.core.io.LocatableInputSplit; /** * @Author: Yang JianQiu * @Date: 2019/3/19 11:50 */ public class CustomTableInputSplit extends LocatableInputSplit { private static final long serialVersionUID = 1L; /** The name of the table to retrieve data from. */ private final byte[] tableName; /** The start row of the split. */ private final byte[] startRow; /** The end row of the split. */ private final byte[] endRow; /** * Creates a new table input split. * * @param splitNumber * the number of the input split * @param hostnames * the names of the hosts storing the data the input split refers to * @param tableName * the name of the table to retrieve data from * @param startRow * the start row of the split * @param endRow * the end row of the split */ CustomTableInputSplit(final int splitNumber, final String[] hostnames, final byte[] tableName, final byte[] startRow, final byte[] endRow) { super(splitNumber, hostnames); this.tableName = tableName; this.startRow = startRow; this.endRow = endRow; } /** * Returns the table name. * * @return The table name. */ public byte[] getTableName() { return this.tableName; } /** * Returns the start row. * * @return The start row. */ public byte[] getStartRow() { return this.startRow; } /** * Returns the end row. * * @return The end row. */ public byte[] getEndRow() { return this.endRow; } }
AbstractTableInputFormat重寫為CustomeAbstractTableInputFormat:
package cn.swordfall.hbaseOnFlink.flink172_hbase212; import org.apache.flink.addons.hbase.AbstractTableInputFormat; import org.apache.flink.api.common.io.LocatableInputSplitAssigner; import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; /** * @Author: Yang JianQiu * @Date: 2019/3/19 11:16 * * 由於flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而現在用到的是hbase2.1.2,版本不匹配 * 故需要重寫flink-hbase_2.12_1.7.2里面的AbstractTableInputFormat,主要原因是AbstractTableInputFormat里面調用的是hbase1.4.3版本的api, * 而新版本hbase2.1.2已經去掉某些api */ public abstract class CustomAbstractTableInputFormat<T> extends RichInputFormat<T, CustomTableInputSplit> { protected static final Logger LOG = LoggerFactory.getLogger(AbstractTableInputFormat.class); // helper variable to decide whether the input is exhausted or not protected boolean endReached = false; protected transient HTable table = null; protected transient Scan scan = null; /** HBase iterator wrapper. */ protected ResultScanner resultScanner = null; protected byte[] currentRow; protected long scannedRows; /** * Returns an instance of Scan that retrieves the required subset of records from the HBase table. * * @return The appropriate instance of Scan for this use case. */ protected abstract Scan getScanner(); /** * What table is to be read. * * <p>Per instance of a TableInputFormat derivative only a single table name is possible. * * @return The name of the table */ protected abstract String getTableName(); /** * HBase returns an instance of {@link Result}. * * <p>This method maps the returned {@link Result} instance into the output type {@link T}. * * @param r The Result instance from HBase that needs to be converted * @return The appropriate instance of {@link T} that contains the data of Result. */ protected abstract T mapResultToOutType(Result r); /** * Creates a {@link Scan} object and opens the {@link HTable} connection. * * <p>These are opened here because they are needed in the createInputSplits * which is called before the openInputFormat method. * * <p>The connection is opened in this method and closed in {@link #closeInputFormat()}. * * @param parameters The configuration that is to be used * @see Configuration */ @Override public abstract void configure(Configuration parameters); @Override public void open(CustomTableInputSplit split) throws IOException { if (table == null) { throw new IOException("The HBase table has not been opened! " + "This needs to be done in configure()."); } if (scan == null) { throw new IOException("Scan has not been initialized! " + "This needs to be done in configure()."); } if (split == null) { throw new IOException("Input split is null!"); } logSplitInfo("opening", split); // set scan range currentRow = split.getStartRow(); /* scan.setStartRow(currentRow); scan.setStopRow(split.getEndRow());*/ scan.withStartRow(currentRow); scan.withStopRow(split.getEndRow()); resultScanner = table.getScanner(scan); endReached = false; scannedRows = 0; } @Override public T nextRecord(T reuse) throws IOException { if (resultScanner == null) { throw new IOException("No table result scanner provided!"); } try { Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } catch (Exception e) { resultScanner.close(); //workaround for timeout on scan LOG.warn("Error after scan of " + scannedRows + " rows. Retry with a new scanner...", e); /*scan.setStartRow(currentRow);*/ scan.withStartRow(currentRow); resultScanner = table.getScanner(scan); Result res = resultScanner.next(); if (res != null) { scannedRows++; currentRow = res.getRow(); return mapResultToOutType(res); } } endReached = true; return null; } private void logSplitInfo(String action, CustomTableInputSplit split) { int splitId = split.getSplitNumber(); String splitStart = Bytes.toString(split.getStartRow()); String splitEnd = Bytes.toString(split.getEndRow()); String splitStartKey = splitStart.isEmpty() ? "-" : splitStart; String splitStopKey = splitEnd.isEmpty() ? "-" : splitEnd; String[] hostnames = split.getHostnames(); LOG.info("{} split (this={})[{}|{}|{}|{}]", action, this, splitId, hostnames, splitStartKey, splitStopKey); } @Override public boolean reachedEnd() throws IOException { return endReached; } @Override public void close() throws IOException { LOG.info("Closing split (scanned {} rows)", scannedRows); currentRow = null; try { if (resultScanner != null) { resultScanner.close(); } } finally { resultScanner = null; } } @Override public void closeInputFormat() throws IOException { try { if (table != null) { table.close(); } } finally { table = null; } } @Override public CustomTableInputSplit[] createInputSplits(final int minNumSplits) throws IOException { if (table == null) { throw new IOException("The HBase table has not been opened! " + "This needs to be done in configure()."); } if (scan == null) { throw new IOException("Scan has not been initialized! " + "This needs to be done in configure()."); } // Get the starting and ending row keys for every region in the currently open table final Pair<byte[][], byte[][]> keys = table.getRegionLocator().getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new IOException("Expecting at least one region."); } final byte[] startRow = scan.getStartRow(); final byte[] stopRow = scan.getStopRow(); final boolean scanWithNoLowerBound = startRow.length == 0; final boolean scanWithNoUpperBound = stopRow.length == 0; final List<CustomTableInputSplit> splits = new ArrayList<CustomTableInputSplit>(minNumSplits); for (int i = 0; i < keys.getFirst().length; i++) { final byte[] startKey = keys.getFirst()[i]; final byte[] endKey = keys.getSecond()[i]; final String regionLocation = table.getRegionLocator().getRegionLocation(startKey, false).getHostnamePort(); // Test if the given region is to be included in the InputSplit while splitting the regions of a table if (!includeRegionInScan(startKey, endKey)) { continue; } // Find the region on which the given row is being served final String[] hosts = new String[]{regionLocation}; // Determine if regions contains keys used by the scan boolean isLastRegion = endKey.length == 0; if ((scanWithNoLowerBound || isLastRegion || Bytes.compareTo(startRow, endKey) < 0) && (scanWithNoUpperBound || Bytes.compareTo(stopRow, startKey) > 0)) { final byte[] splitStart = scanWithNoLowerBound || Bytes.compareTo(startKey, startRow) >= 0 ? startKey : startRow; final byte[] splitStop = (scanWithNoUpperBound || Bytes.compareTo(endKey, stopRow) <= 0) && !isLastRegion ? endKey : stopRow; int id = splits.size(); final CustomTableInputSplit split = new CustomTableInputSplit(id, hosts, table.getName().getName(), splitStart, splitStop); splits.add(split); } } LOG.info("Created " + splits.size() + " splits"); for (CustomTableInputSplit split : splits) { logSplitInfo("created", split); } return splits.toArray(new CustomTableInputSplit[splits.size()]); } /** * Test if the given region is to be included in the scan while splitting the regions of a table. * * @param startKey Start key of the region * @param endKey End key of the region * @return true, if this region needs to be included as part of the input (default). */ protected boolean includeRegionInScan(final byte[] startKey, final byte[] endKey) { return true; } @Override public InputSplitAssigner getInputSplitAssigner(CustomTableInputSplit[] inputSplits) { return new LocatableInputSplitAssigner(inputSplits); } @Override public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { return null; } }
TableInputFormat重寫為CustomTableInputFormat:
package cn.swordfall.hbaseOnFlink.flink172_hbase212; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; /** * @Author: Yang JianQiu * @Date: 2019/3/19 11:15 * 由於flink-hbase_2.12_1.7.2 jar包所引用的是hbase1.4.3版本,而現在用到的是hbase2.1.2,版本不匹配 * 故需要重寫flink-hbase_2.12_1.7.2里面的TableInputFormat */ public abstract class CustomTableInputFormat<T extends Tuple> extends CustomAbstractTableInputFormat<T> { private static final long serialVersionUID = 1L; /** * Returns an instance of Scan that retrieves the required subset of records from the HBase table. * @return The appropriate instance of Scan for this usecase. */ @Override protected abstract Scan getScanner(); /** * What table is to be read. * Per instance of a TableInputFormat derivative only a single tablename is possible. * @return The name of the table */ @Override protected abstract String getTableName(); /** * The output from HBase is always an instance of {@link Result}. * This method is to copy the data in the Result instance into the required {@link Tuple} * @param r The Result instance from HBase that needs to be converted * @return The appropriate instance of {@link Tuple} that contains the needed information. */ protected abstract T mapResultToTuple(Result r); /** * Creates a {@link Scan} object and opens the {@link HTable} connection. * These are opened here because they are needed in the createInputSplits * which is called before the openInputFormat method. * So the connection is opened in {@link #configure(Configuration)} and closed in {@link #closeInputFormat()}. * * @param parameters The configuration that is to be used * @see Configuration */ @Override public void configure(Configuration parameters) { table = createTable(); if (table != null) { scan = getScanner(); } } /** * Create an {@link HTable} instance and set it into this format. */ private HTable createTable() { LOG.info("Initializing HBaseConfiguration"); //use files found in the classpath org.apache.hadoop.conf.Configuration hConf = HBaseConfiguration.create(); try { return null; } catch (Exception e) { LOG.error("Error instantiating a new HTable instance", e); } return null; } @Override protected T mapResultToOutType(Result r) { return mapResultToTuple(r); } }
繼承自定義的CustomTableInputFormat,進行hbase連接、讀取操作:
package cn.swordfall.hbaseOnFlink import java.io.IOException import cn.swordfall.hbaseOnFlink.flink172_hbase212.CustomTableInputFormat import org.apache.flink.api.java.tuple.Tuple2 import org.apache.flink.addons.hbase.TableInputFormat import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes import scala.collection.JavaConverters._ /** * @Author: Yang JianQiu * @Date: 2019/3/1 1:14 * * 從HBase讀取數據 * 第二種:實現TableInputFormat接口 */ class HBaseInputFormat extends CustomTableInputFormat[Tuple2[String, String]]{ // 結果Tuple val tuple2 = new Tuple2[String, String] /** * 建立HBase連接 * @param parameters */ override def configure(parameters: Configuration): Unit = { val tableName: TableName = TableName.valueOf("test") val cf1 = "cf1" var conn: Connection = null val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201") config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) try { conn = ConnectionFactory.createConnection(config) table = conn.getTable(tableName).asInstanceOf[HTable] scan = new Scan() scan.withStartRow(Bytes.toBytes("001")) scan.withStopRow(Bytes.toBytes("201")) scan.addFamily(Bytes.toBytes(cf1)) } catch { case e: IOException => e.printStackTrace() } } /** * 對獲取的數據進行加工處理 * @param result * @return */ override def mapResultToTuple(result: Result): Tuple2[String, String] = { val rowKey = Bytes.toString(result.getRow) val sb = new StringBuffer() for (cell: Cell <- result.listCells().asScala){ val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength) sb.append(value).append("_") } val value = sb.replace(sb.length() - 1, sb.length(), "").toString tuple2.setField(rowKey, 0) tuple2.setField(value, 1) tuple2 } /** * tableName * @return */ override def getTableName: String = "test" /** * 獲取Scan * @return */ override def getScanner: Scan = { scan } }
調用實現CustomTableInputFormat接口的類HBaseInputFormat,Flink Streaming流式處理的方式:
/** * 從HBase讀取數據 * 第二種:實現TableInputFormat接口 */ def readFromHBaseWithTableInputFormat(): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val dataStream = env.createInput(new HBaseInputFormat) dataStream.filter(_.f0.startsWith("10")).print() env.execute() }
而Flink DataSet批處理的方式為:
/** * 讀取HBase數據方式:實現TableInputFormat接口 */ def readFromHBaseWithTableInputFormat(): Unit ={ val env = ExecutionEnvironment.getExecutionEnvironment val dataStream = env.createInput(new HBaseInputFormat) dataStream.filter(_.f1.startsWith("20")).print() }
2.2 Flink寫入HBase的兩種方式
這里Flink Streaming寫入HBase,需要從Kafka接收數據,可以開啟kafka單機版,利用kafka-console-producer.sh往topic "test"寫入如下數據:
100,hello,20 101,nice,24 102,beautiful,26
2.2.1 繼承RichSinkFunction重寫父類方法:
package cn.swordfall.hbaseOnFlink import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes /** * @Author: Yang JianQiu * @Date: 2019/3/1 1:34 * * 寫入HBase * 第一種:繼承RichSinkFunction重寫父類方法 * * 注意:由於flink是一條一條的處理數據,所以我們在插入hbase的時候不能來一條flush下, * 不然會給hbase造成很大的壓力,而且會產生很多線程導致集群崩潰,所以線上任務必須控制flush的頻率。 * * 解決方案:我們可以在open方法中定義一個變量,然后在寫入hbase時比如500條flush一次,或者加入一個list,判斷list的大小滿足某個閥值flush一下 */ class HBaseWriter extends RichSinkFunction[String]{ var conn: Connection = null val scan: Scan = null var mutator: BufferedMutator = null var count = 0 /** * 建立HBase連接 * @param parameters */ override def open(parameters: Configuration): Unit = { val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, "192.168.187.201") config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181") config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) conn = ConnectionFactory.createConnection(config) val tableName: TableName = TableName.valueOf("test") val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //設置緩存1m,當達到1m時數據會自動刷到hbase params.writeBufferSize(1024 * 1024) //設置緩存的大小 mutator = conn.getBufferedMutator(params) count = 0 } /** * 處理獲取的hbase數據 * @param value * @param context */ override def invoke(value: String, context: SinkFunction.Context[_]): Unit = { val cf1 = "cf1" val array: Array[String] = value.split(",") val put: Put = new Put(Bytes.toBytes(array(0))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2))) mutator.mutate(put) //每滿2000條刷新一下數據 if (count >= 2000){ mutator.flush() count = 0 } count = count + 1 } /** * 關閉 */ override def close(): Unit = { if (conn != null) conn.close() } }
調用繼承RichSinkFunction的HBaseWriter類,Flink Streaming流式處理的方式:
/** * 寫入HBase * 第一種:繼承RichSinkFunction重寫父類方法 */ def write2HBaseWithRichSinkFunction(): Unit = { val topic = "test" val props = new Properties props.put("bootstrap.servers", "192.168.187.201:9092") props.put("group.id", "kv_flink") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props) val dataStream: DataStream[String] = env.addSource(myConsumer) //寫入HBase dataStream.addSink(new HBaseWriter) env.execute() }
2.2.2 實現OutputFormat接口:
package cn.swordfall.hbaseOnFlink import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.configuration.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName} import org.apache.hadoop.hbase.client._ import org.apache.hadoop.hbase.util.Bytes /** * @Author: Yang JianQiu * @Date: 2019/3/1 1:40 * * 寫入HBase提供兩種方式 * 第二種:實現OutputFormat接口 */ class HBaseOutputFormat extends OutputFormat[String]{ val zkServer = "192.168.187.201" val port = "2181" var conn: Connection = null var mutator: BufferedMutator = null var count = 0 /** * 配置輸出格式。此方法總是在實例化輸出格式上首先調用的 * * @param configuration */ override def configure(configuration: Configuration): Unit = { } /** * 用於打開輸出格式的並行實例,所以在open方法中我們會進行hbase的連接,配置,建表等操作。 * * @param i * @param i1 */ override def open(i: Int, i1: Int): Unit = { val config: org.apache.hadoop.conf.Configuration = HBaseConfiguration.create config.set(HConstants.ZOOKEEPER_QUORUM, zkServer) config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port) config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000) config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000) conn = ConnectionFactory.createConnection(config) val tableName: TableName = TableName.valueOf("test") val params: BufferedMutatorParams = new BufferedMutatorParams(tableName) //設置緩存1m,當達到1m時數據會自動刷到hbase params.writeBufferSize(1024 * 1024) //設置緩存的大小 mutator = conn.getBufferedMutator(params) count = 0 } /** * 用於將數據寫入數據源,所以我們會在這個方法中調用寫入hbase的API * * @param it */ override def writeRecord(it: String): Unit = { val cf1 = "cf1" val array: Array[String] = it.split(",") val put: Put = new Put(Bytes.toBytes(array(0))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1))) put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2))) mutator.mutate(put) //每4條刷新一下數據,如果是批處理調用outputFormat,這里填寫的4必須不能大於批處理的記錄總數量,否則數據不會更新到hbase里面 if (count >= 4){ mutator.flush() count = 0 } count = count + 1 } /** * 關閉 */ override def close(): Unit = { try { if (conn != null) conn.close() } catch { case e: Exception => println(e.getMessage) } } }
調用實現OutputFormat的HBaseOutputFormat類,Flink Streaming流式處理的方式:
/** * 寫入HBase * 第二種:實現OutputFormat接口 */ def write2HBaseWithOutputFormat(): Unit = { val topic = "test" val props = new Properties props.put("bootstrap.servers", "192.168.187.201:9092") props.put("group.id", "kv_flink") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) val myConsumer = new FlinkKafkaConsumer[String](topic, new SimpleStringSchema, props) val dataStream: DataStream[String] = env.addSource(myConsumer) dataStream.writeUsingOutputFormat(new HBaseOutputFormat) env.execute() }
而Flink DataSet批處理的方式為:
/** * 寫入HBase方式:實現OutputFormat接口 */ def write2HBaseWithOutputFormat(): Unit = { val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //2.定義數據 val dataSet: DataSet[String] = env.fromElements("103,zhangsan,20", "104,lisi,21", "105,wangwu,22", "106,zhaolilu,23") dataSet.output(new HBaseOutputFormat) //運行下面這句話,程序才會真正執行,這句代碼針對的是data sinks寫入數據的 env.execute() }
注意:
如果是批處理調用的,應該要注意HBaseOutputFormat類的writeRecord方法每次批量刷新的數據量不能大於批處理的總記錄數據量,否則數據更新不到hbase里面。
3. 總結
【其他相關文章】
HBase連接的幾種方式(一)java篇 查看純Java API讀寫HBase
HBase連接的幾種方式(二)spark篇 查看Spark上讀寫HBase
github地址:
https://github.com/SwordfallYeung/HBaseDemo(flink讀寫hbase包括java和scala兩個版本的代碼)
【參考資料】
https://blog.csdn.net/liguohuabigdata/article/details/78588861
https://blog.csdn.net/aA518189/article/details/86544844