Spark和HBase整合


寫入HBase表代碼示例:

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat

import org.apache.hadoop.mapreduce.Job

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.fs.shell.find.Result

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.util.Bytes

   

object WriteDriver {

 

def main(args: Array[String]): Unit = {

 

val conf=new SparkConf().setMaster("local").setAppName("writeHbase")

 

val sc=new SparkContext(conf)

 

sc.hadoopConfiguration.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")

sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort","2181")

sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"tabx")

 

val job=new Job(sc.hadoopConfiguration)

 

job.setOutputKeyClass(classOf[ImmutableBytesWritable])

job.setOutputValueClass(classOf[Result])

job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

 

val data=sc.makeRDD(Array("rk1,tom,23","rk2,rose,25","rk3,jary,30"))

 

val hbaseRDD=data.map { line =>{

val infos=line.split(",")

val rowKey=infos(0)

val name=infos(1)

val age=infos(2)

 

val put=new Put(Bytes.toBytes(rowKey))

put.add(Bytes.toBytes("cf1"),Bytes.toBytes("name"),Bytes.toBytes(name))

put.add(Bytes.toBytes("cf1"),Bytes.toBytes("age"),Bytes.toBytes(age))

 

(new ImmutableBytesWritable,put)

} }

 

//--RDD數據存儲進Hbase

hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)

   

}

}

   

讀取HBase表代碼:

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.client.Result

import org.apache.hadoop.hbase.util.Bytes

   

object ReadDriver {

 

def main(args: Array[String]): Unit = {

 

val conf=new SparkConf().setMaster("local").setAppName("readHbase")

val sc=new SparkContext(conf)

 

//--創建Hbase的環境變量參數

val hbaseConf=HBaseConfiguration.create()

 

hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")

hbaseConf.set("hbase.zookeeper.property.clientPort","2181")

hbaseConf.set(TableInputFormat.INPUT_TABLE,"tabx")

 

val resultRDD=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],

classOf[ImmutableBytesWritable],classOf[Result])

 

resultRDD.foreach{x=>{

//--查詢出來的結果集存在 (ImmutableBytesWritable, Result)第二個元素

val result=x._2

//--獲取行鍵

val rowKey=Bytes.toString(result.getRow)

 

val name=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))

val age=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))

 

println(rowKey+":"+name+":"+age)

}}

}

}

   

過濾器代碼:

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.hadoop.hbase.HBaseConfiguration

import org.datanucleus.store.types.backed.Set

import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.hadoop.hbase.client.Scan

import org.apache.hadoop.hbase.filter.RandomRowFilter

import org.apache.hadoop.hbase.util.Base64

import org.apache.hadoop.hbase.protobuf.ProtobufUtil

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.client.Result

import org.apache.hadoop.hbase.util.Bytes

   

object ReadDriver2 {

 

def main(args: Array[String]): Unit = {

val conf=new SparkConf().setMaster("local").setAppName("readHbaseFilter")

val sc=new SparkContext(conf)

 

val hbaseConf=HBaseConfiguration.create()

 

hbaseConf.set("hbase.zookeeper.quorum","hadoop01,hadoop02,hadoop03")

hbaseConf.set("hbase.zookeeper.property.clientPort","2181")

hbaseConf.set(TableInputFormat.INPUT_TABLE,"tabx")

 

val scan=new Scan

scan.setFilter(new RandomRowFilter(0.5f))

//--設置scan對象,讓filter生效

hbaseConf.set(TableInputFormat.SCAN,

Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

 

val resultRDD=sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],

classOf[ImmutableBytesWritable],classOf[Result])

 

resultRDD.foreach{x=>{

//--查詢出來的結果集存在 (ImmutableBytesWritable, Result)第二個元素

val result=x._2

//--獲取行鍵

val rowKey=Bytes.toString(result.getRow)

 

val name=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("name")))

val age=Bytes.toString(result.getValue(Bytes.toBytes("cf1"),Bytes.toBytes("age")))

 

println(rowKey+":"+name+":"+age)

 

}}

}

}

   

   


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM