Spark1.6.2 讀取 HBase 1.2.3
//hbase-common-1.2.3.jar
//hbase-protocol-1.2.3.jar
//hbase-server-1.2.3.jar
//htrace-core-3.1.0-incubating.jar
//metrics-core-2.2.0.jar
val sparkConf = new SparkConf() .setAppName("User") // 創建 spark context val sc = new SparkContext(sparkConf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ // 創建HBase configuration val hBaseConf = HBaseConfiguration.create() hBaseConf.set("hbase.zookeeper.quorum", "192.168.1.1,192.168.1.2,192.168.1.3") hBaseConf.set("hbase.zookeeper.property.clientPort", "2182"); // zookeeper端口號 //設置表名 hBaseConf.set(TableInputFormat.INPUT_TABLE, "knowledge") // 應用newAPIHadoopRDD讀取HBase,返回NewHadoopRDD val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) // 將數據映射為表 也就是將 RDD轉化為 dataframe schema // 讀取結果集RDD,返回一個MapPartitionsRDD val resRDD = hbaseRDD.map(tuple => tuple._2) //打印讀取數據內容 val user_knowledge = resRDD.map(r => (Bytes.toString(r.getRow), Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("reg_id"))), Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("create_user_id"))), Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("knowledge_id"))), Bytes.toString(r.getValue(Bytes.toBytes("behavior"), Bytes.toBytes("create_time")))) // ).toDF("row", "reg_id", "create_user_id", "knowledge_id", "create_time") user_knowledge.registerTempTable("user_knowledge") // 測試 val df2 = sqlContext.sql("SELECT * FROM user_knowledge") df2.collect.foreach(println) sc.stop