spark 數據寫入到 hbase


1)spark把數據寫入到hbase需要用到:PairRddFunctions的saveAsHadoopDataset方法,這里用到了 implicit conversion,需要我們引入

import org.apache.spark.SparkContext._

2)spark寫入hbase,實質是借用了org.apache.hadoop.hbase.mapreduce.TableInputFormat這個對象,用其內部的recorderWriter將數據寫入hbase

同時,也借用了hadoop的JobConf,配置和寫MR的配置方式一樣

3)請看下面代碼,這里使用sparksql從hive里面讀出數據,經過處理,寫入到hbase

    //創建jobConf
    val conf = HBaseConfiguration.create()
    val jobConf = new JobConf(conf)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")

    //創建hiveContext
    val sparkConf = new SparkConf().setAppName("test")
    val sc = new SparkContext(sparkConf)
    @transient  val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
    sqlContext.setConf("spark.sql.shuffle.partitions","3")

    //保存到hbase
    val rdd = sqlContext.sql("select C1,C2,C3 from test")
      .map(row => {
       val c1 = row(0).asInstanceOf[String]
       val c2 = row(1).asInstanceOf[String]
       val c3 = row(2).asInstanceOf[String]
       val p = new Put(Bytes.toBytes(c1))
       p.add(Bytes.toBytes("f"),Bytes.toBytes("c2"),Bytes.toBytes(c2))
       p.add(Bytes.toBytes("f"),Bytes.toBytes("c3"),Bytes.toBytes(c3))
       (new ImmutableBytesWritable,p) 
     }).saveAsHadoopDataset(jobConf)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM