spark生成HFile導入到hbase


import java.util.Date

import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{HTable, Table, _}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}


object App7 {
  def main(args: Array[String]) {
    //創建sparkcontext,用默認的配置
    val sc = new SparkContext(new SparkConf())
    //hbase的列族
    val columnFamily1 = "f1"
    //hbase的默認配置文件
    val conf = HBaseConfiguration.create()
    //當前時間
    val date = new Date().getTime
    //初始化RDD,用 sc.parallelize 生成一個RDD
    val sourceRDD = sc.parallelize(Array(
      (Bytes.toBytes("41"), //41是rowkey
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo1"))), //分別設置family  colum  和 value
      (Bytes.toBytes("41"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo2.b"))),
      (Bytes.toBytes("42"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo2.a"))),
      (Bytes.toBytes("42"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("c"), Bytes.toBytes("foo2.c"))),
      (Bytes.toBytes("43"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo3"))),
      (Bytes.toBytes("44"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("foo.1"))),
      (Bytes.toBytes("44"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("b"), Bytes.toBytes("foo.2"))),
      (Bytes.toBytes("45"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("a"), Bytes.toBytes("bar.1"))),
      (Bytes.toBytes("45"),
        (Bytes.toBytes(columnFamily1), Bytes.toBytes("d"), Bytes.toBytes("bar.2")))))

    val rdd = sourceRDD.map(x => {
      //將rdd轉換成HFile需要的格式,我們上面定義了Hfile的key是ImmutableBytesWritable,那么我們定義的RDD也是要以ImmutableBytesWritable的實例為key
      //KeyValue的實例為value
      //rowkey
      val rowKey = x._1
      val family = x._2._1
      val colum = x._2._2
      val value = x._2._3
      (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, date, value))
    })

    //生成的HFile的臨時保存路徑
    val stagingFolder = "/user/hbase/spark/"
    //將日志保存到指定目錄
    rdd.saveAsNewAPIHadoopFile(stagingFolder,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      conf)
    //此處運行完成之后,在stagingFolder會有我們生成的Hfile文件


    //開始即那個HFile導入到Hbase,此處都是hbase的api操作
    val load = new LoadIncrementalHFiles(conf)
    //hbase的表名
    val tableName = "output_table"
    //創建hbase的鏈接,利用默認的配置文件,實際上讀取的hbase的master地址
    val conn = ConnectionFactory.createConnection(conf)
    //根據表名獲取表
    val table: Table = conn.getTable(TableName.valueOf(tableName))
    try {
      //獲取hbase表的region分布
      val regionLocator = conn.getRegionLocator(TableName.valueOf(tableName))
      //創建一個hadoop的mapreduce的job
      val job = Job.getInstance(conf)
      //設置job名稱
      job.setJobName("DumpFile")
      //此處最重要,需要設置文件輸出的key,因為我們要生成HFil,所以outkey要用ImmutableBytesWritable
      job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
      //輸出文件的內容KeyValue
      job.setMapOutputValueClass(classOf[KeyValue])
      //配置HFileOutputFormat2的信息
      HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator)

      //開始導入
      load.doBulkLoad(new Path(stagingFolder), table.asInstanceOf[HTable])
    } finally {
      table.close()
      conn.close()
    }
  }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM