spark讀取hbase形成RDD,存入hive或者spark_sql分析


object SaprkReadHbase {
    var total:Int = 0
    def main(args: Array[String]) {
      val spark = SparkSession
        .builder()
        .master("local[2]")
        .appName("Spark Read  Hbase ")
        .enableHiveSupport()    //如果要讀取hive的表,就必須使用這個
        .getOrCreate()
     val sc= spark.sparkContext
//zookeeper信息設置,存儲着hbase的元信息
      val conf = HBaseConfiguration.create()
      conf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")
      conf.set("hbase.zookeeper.property.clientPort", "2181")
      conf.set(TableInputFormat.INPUT_TABLE, "event_logs_20190218")

      //讀取數據並轉化成rdd
      val hBaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], //定義輸入格式
        classOf[org.apache.hadoop.hbase.client.Result]) //定義輸出
      val count = hBaseRDD.count()
      println("\n\n\n:" + count)
      import spark.implicits._
    val logRDD: RDD[EventLog] = hBaseRDD.map{case (_,result) =>{
        //獲取行鍵v
        val rowKey = Bytes.toString(result.getRow)
       val api_v=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("api_v")))
        val app_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("app_id")))
        val c_time=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("c_time")))
        val ch_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ch_id")))
        val city=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("city")))
        val province=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("province")))
        val country=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("country")))
        val en=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("en")))
        val ip=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ip")))
        val net_t=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("net_t")))
        val pl=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("pl")))
        val s_time=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("s_time")))
        val user_id=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("user_id")))
        val uuid=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("uuid")))
        val ver=Bytes.toString(result.getValue(Bytes.toBytes("info"),Bytes.toBytes("ver")))
//樣例類進行schemal信息構建。元組與樣例類的字段值據說不能超過22個,一般structureType構建(row,schemal) new EventLog(rowKey,api_v,app_id,c_time,ch_id,city,province,country,en,ip,net_t,pl,s_time,user_id,uuid,ver) } }
//可以轉為dataframe、dataset存入hive作為寬表 或者直接進行sparkcore分析 val logds= logRDD.toDS() logds.createTempView("event_logs") val sq= spark.sql("select * from event_logs limit 1") println(sq.explain()) sq.show() sc.stop() spark.stop() } }


//write hbase
/**
* @created by imp ON 2018/2/19
*/
class SparkWriteHbase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local")
val sc = new SparkContext(sparkConf)
sc.setLogLevel("ERROR")
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03")
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set(TableOutputFormat.OUTPUT_TABLE, "test")
val job = new Job(conf)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

var arrResult: Array[String] = new Array[String](1)
arrResult(0) = "1, 3000000000";
//arrResult(0) = "1,100,11"

val resultRDD = sc.makeRDD(arrResult)
val saveRDD = resultRDD.map(_.split(',')).map { arr => {
val put = new Put(Bytes.toBytes(arr(0)))
put.add(Bytes.toBytes("info"), Bytes.toBytes("total"), Bytes.toBytes(arr(1)))
(new ImmutableBytesWritable, put)
}
}
println("getConfiguration")
var c = job.getConfiguration()
println("save")
saveRDD.saveAsNewAPIHadoopDataset(c)

sc.stop()
// spark.stop()
}

}
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM