0.我們有這樣一個表,表名為Student

1.在Hbase中創建一個表

表明為student,列族為info
2.插入數據

我們這里采用put來插入數據
格式如下 put ‘表命’,‘行鍵’,‘列族:列’,‘值’
我們知道Hbase 四個鍵確定一個值,
一般查詢的時候我們需要提供 表名、行鍵、列族:列名、時間戳才會有一個確定的值。
但是這里插入的時候,時間戳自動被生成,我們並不用額外操作。
我們不用表的時候可以這樣刪除

注意,一定要先disable 再drop,不能像RDMS一樣直接drop
3.配置spark
我們需要把Hbase的lib目錄下的一些jar文件拷貝到Spark中,這些都是編程中需要引進的jar包。
需要拷貝的jar包包括:所有hbase開頭的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar
我們將文件拷貝到Spark目錄下的jars文件中

4.編寫程序
(1)讀取數據

我們程序中需要的jar包如下

我們這里使用Maven來導入相關jar包
我們需要導入hadoop和spark相關的jar包
spark方面需要導入的依賴
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
hadoop方面需要導入的依賴
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
hbase方面需要導入的依賴
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
我們使用的org.apache.hadoop.hbase.mapreduce是通過hbase-server導入的。
具體的程序如下
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
object SparkOperateHbase{
def main(args:Array[String]): Unit ={
//建立Hbase的連接
val conf = HBaseConfiguration.create();
//設置查詢的表名student
conf.set(TableInputFormat.INPUT_TABLE,"student")
//通過SparkContext將student表中數據創建一個rdd
val sc = new SparkContext(new SparkConf());
val stuRdd = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]);
stuRdd.cache();//持久化
//計算數據條數
val count = stuRdd.count();
println("Student rdd count:"+count);
//遍歷輸出
//當我們建立Rdd的時候,前邊全部是參數信息,后邊的result才是保存數據的數據集
stuRdd.foreach({case (_,result) =>
//通過result.getRow來獲取行鍵
val key = Bytes.toString(result.getRow);
//通過result.getValue("列族","列名")來獲取值
//注意這里需要使用getBytes將字符流轉化成字節流
val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes));
val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes));
val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes));
//打印結果
println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age);
});
}
}
(2)存入數據
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
object HbasePut{
def main(args:Array[String]): Unit = {
//建立sparkcontext
val sparkConf = new SparkConf().setAppName("HbasePut").setMaster("local")
val sc = new SparkContext(sparkConf)
//與hbase的student表建立連接
val tableName = "student"
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,tableName)
//建立任務job
val job = new Job(sc.hadoopConfiguration)
//配置job參數
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
//要插入的數據,這里的makeRDD是parallelize的擴展版
val indataRdd = sc.makeRDD(Array("3,zhang,M,26","4,yue,M,27"))
val rdd = indataRdd.map(_.split(",")).map(arr=>{
val put = new Put(Bytes.toBytes(arr(0))) //行鍵的值
//依次給列族info的列添加值
put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2)))
put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3)))
//必須有這兩個返回值,put為要傳入的數據
(new ImmutableBytesWritable,put)
})
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
5.Maven打包
我們用命令行打開到項目的根目錄,輸入mvn clean package -DskipTests=true
打包成功后我們到項目目錄下的target文件下就會找到相應的jar包
6.提交任務

