Spark 讀取HBase數據


Spark1.6.2 讀取 HBase 1.2.3

//hbase-common-1.2.3.jar
//hbase-protocol-1.2.3.jar
//hbase-server-1.2.3.jar
//htrace-core-3.1.0-incubating.jar
//metrics-core-2.2.0.jar

  val sparkConf = new SparkConf()
      .setAppName("User")

    // 創建 spark context
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // 創建HBase configuration
    val hBaseConf = HBaseConfiguration.create()
    hBaseConf.set("hbase.zookeeper.quorum", "192.168.1.1,192.168.1.2,192.168.1.3")
    hBaseConf.set("hbase.zookeeper.property.clientPort", "2182"); // zookeeper端口號
    //設置表名
    hBaseConf.set(TableInputFormat.INPUT_TABLE, "knowledge")

    // 應用newAPIHadoopRDD讀取HBase,返回NewHadoopRDD
    val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,
      classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    // 將數據映射為表  也就是將 RDD轉化為 dataframe schema
    // 讀取結果集RDD,返回一個MapPartitionsRDD
    val resRDD = hbaseRDD.map(tuple => tuple._2)

    //打印讀取數據內容
    val user_knowledge = resRDD.map(r => (Bytes.toString(r.getRow),
      Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("reg_id"))),
      Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("create_user_id"))),
      Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("knowledge_id"))),
      Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("create_time")))) //
      ).toDF("row", "reg_id", "create_user_id", "knowledge_id", "create_time")

    user_knowledge.registerTempTable("user_knowledge")

    // 測試
    val df2 = sqlContext.sql("SELECT * FROM user_knowledge")

    df2.collect.foreach(println)
 
    sc.stop

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM