本文記錄Spark讀取Hbase基本操作,及讀取多版本Hbase數據示例。
Hbase數據示例如下:
示例代碼如下
package org.HbaseLearn.Util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Bytes; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import org.apache.spark.sql.SparkSession; import org.apache.log4j.Level; import org.apache.log4j.Logger; public class ReadSpark { private static JavaSparkContext jsc; public static void main(String[] args) throws InterruptedException { Logger.getLogger("org").setLevel(Level.ERROR); SparkSession spark = SparkSession.builder().master("yarn").master("local").appName("hello-wrold") // .config("spark.some.config.option", "some-value") .getOrCreate(); jsc = new JavaSparkContext(spark.sparkContext()); Configuration conf = HBaseConfiguration.create(); Scan scan = new Scan(); // 指定列,可以不寫相當於select * from XX byte[] family_bytes = "info".getBytes(); byte[] birthday_bytes = "birthday".getBytes(); byte[] gender_bytes = "gender".getBytes(); byte[] user_type_bytes = "user_type".getBytes(); scan.addFamily(family_bytes); scan.addColumn(family_bytes, birthday_bytes); scan.addColumn(family_bytes, gender_bytes); scan.addColumn(family_bytes, user_type_bytes); // 設置讀取的最大的版本數 scan.setMaxVersions(3); try { // 將scan編碼 ClientProtos.Scan proto = ProtobufUtil.toScan(scan); String scanToString = Base64.encodeBytes(proto.toByteArray()); // 表名 String tableName = "test_info"; conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set(TableInputFormat.SCAN, scanToString); // ZooKeeper集群 conf.set("hbase.zookeeper.quorum", "node35,node34,node26"); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.master", "master"); // 將HBase數據轉成RDD JavaPairRDD<ImmutableBytesWritable, Result> HBaseRdd = jsc.newAPIHadoopRDD(conf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); // 再將以上結果轉成Row類型RDD JavaRDD<Row> HBaseRow = HBaseRdd.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() { private static final long serialVersionUID = 1L; @Override public Row call(Tuple2<ImmutableBytesWritable, Result> tuple2) throws IOException { Result result = tuple2._2; String rowKey = Bytes.toString(result.getRow()); String birthday = Bytes.toString(result.getValue(family_bytes, birthday_bytes)); String gender = Bytes.toString(result.getValue(family_bytes, gender_bytes)); String user_type = Bytes.toString(result.getValue(family_bytes, user_type_bytes)); return RowFactory.create(rowKey, birthday, gender, user_type); } }); // 順序必須與構建RowRDD的順序一致 List<StructField> structFields = Arrays.asList( DataTypes.createStructField("user_id", DataTypes.StringType, true), DataTypes.createStructField("birthday", DataTypes.StringType, true), DataTypes.createStructField("gender", DataTypes.StringType, true), DataTypes.createStructField("user_type", DataTypes.StringType, true)); // 構建schema StructType schema = DataTypes.createStructType(structFields); // 生成DataFrame Dataset<Row> HBaseDF = spark.createDataFrame(HBaseRow, schema); HBaseDF.show(); // 獲取birthday多版本數據 JavaRDD<Row> multiVersionHBaseRow = HBaseRdd .mapPartitions(new FlatMapFunction<Iterator<Tuple2<ImmutableBytesWritable, Result>>, Row>() { private static final long serialVersionUID = 1L; @Override public Iterator<Row> call(Iterator<Tuple2<ImmutableBytesWritable, Result>> t) throws Exception { // TODO Auto-generated method stub List<Row> rows = new ArrayList<Row>(); while (t.hasNext()) { Result result = t.next()._2(); String rowKey = Bytes.toString(result.getRow()); // 獲取當前rowKey的family_bytes列族對應的所有Cell List<Cell> cells = result.getColumnCells(family_bytes, birthday_bytes); for (Cell cell : cells) { String birthday = Bytes.toString(CellUtil.cloneValue(cell)); rows.add(RowFactory.create(rowKey, birthday, cell.getTimestamp())); } } return rows.iterator(); } }); List<StructField> multiVersionStructFields = Arrays.asList( DataTypes.createStructField("rowKey", DataTypes.StringType, true), DataTypes.createStructField("birthday", DataTypes.StringType, true), DataTypes.createStructField("timestamp", DataTypes.LongType, true)); // 構建schema StructType multiVersionSchema = DataTypes.createStructType(multiVersionStructFields); // 生成DataFrame Dataset<Row> multiVersionHBaseDF = spark.createDataFrame(multiVersionHBaseRow, multiVersionSchema); multiVersionHBaseDF.show(); } catch (Exception e) { e.printStackTrace(); } finally { if (spark != null) { spark.close(); } } spark.close(); } }
運行結果
+-------+----------+------+---------+
|user_id| birthday|gender|user_type|
+-------+----------+------+---------+
| 0001|2018-05-01|female| 1|
| 0002|2019-01-01| male| 2|
| 0003|2018-01-01| male| 2|
+-------+----------+------+---------+
+------+----------+-------------+
|rowKey| birthday| timestamp|
+------+----------+-------------+
| 0001|2018-05-01|1589014338166|
| 0001|2020-01-01|1589008384004|
| 0002|2019-01-01|1589008489848|
| 0003|2018-01-01|1589008489949|
+------+----------+-------------+
pom依賴如下
<dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.0.0</version> <type>pom</type> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.1.2</version> </dependency>