import java.util.Date import org.apache.hadoop.fs.Path import org.apache.hadoop.hbase.client.{HTable, Table, _} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName} import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} object App7 { def main(args: Array[String]) { //創建sparkcontext,用默認的配置 val sc = new SparkContext(new SparkConf()) //hbase的列族 val columnFamily1 = "f1" //hbase的默認配置文件 val conf = HBaseConfiguration.create() //當前時間 val date = new Date().getTime //初始化RDD,用 sc.parallelize 生成一個RDD val sourceRDD = sc.parallelize(Array( (Bytes.toBytes("41"), //41是rowkey (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), //分別設置family colum 和 value (Bytes.toBytes("41"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))), (Bytes.toBytes("42"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))), (Bytes.toBytes("42"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))), (Bytes.toBytes("43"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))), (Bytes.toBytes("44"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))), (Bytes.toBytes("44"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))), (Bytes.toBytes("45"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))), (Bytes.toBytes("45"), (Bytes.toBytes(columnFamily1), Bytes.toBytes("d"), Bytes.toBytes("bar.2"))))) val rdd = sourceRDD.map(x => { //將rdd轉換成HFile需要的格式,我們上面定義了Hfile的key是ImmutableBytesWritable,那么我們定義的RDD也是要以ImmutableBytesWritable的實例為key //KeyValue的實例為value //rowkey val rowKey = x._1 val family = x._2._1 val colum = x._2._2 val value = x._2._3 (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, date, value)) }) //生成的HFile的臨時保存路徑 val stagingFolder = "/user/hbase/spark/" //將日志保存到指定目錄 rdd.saveAsNewAPIHadoopFile(stagingFolder, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], conf) //此處運行完成之后,在stagingFolder會有我們生成的Hfile文件 //開始即那個HFile導入到Hbase,此處都是hbase的api操作 val load = new LoadIncrementalHFiles(conf) //hbase的表名 val tableName = "output_table" //創建hbase的鏈接,利用默認的配置文件,實際上讀取的hbase的master地址 val conn = ConnectionFactory.createConnection(conf) //根據表名獲取表 val table: Table = conn.getTable(TableName.valueOf(tableName)) try { //獲取hbase表的region分布 val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName)) //創建一個hadoop的mapreduce的job val job = Job.getInstance(conf) //設置job名稱 job.setJobName("DumpFile") //此處最重要,需要設置文件輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) //輸出文件的內容KeyValue job.setMapOutputValueClass(classOf[KeyValue]) //配置HFileOutputFormat2的信息 HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator) //開始導入 load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable]) } finally { table.close() conn.close() } } }
