本文主要來自於 http://dblab.xmu.edu.cn/blog/1316-2/ 謝謝原作者
准備工作一:創建一個HBase表
這里依然是以student表為例進行演示。這里假設你已經成功安裝了HBase數據庫,如果你還沒有安裝,可以參考大數據-04-Hbase入門,進行安裝,安裝好以后,不要創建數據庫和表,只要跟着本節后面的內容操作即可。
因為hbase依賴於hadoop,因此啟動和停止都是需要按照順序進行
如果安裝了獨立的zookeeper
啟動順序: hadoop-> zookeeper-> hbase
停止順序:hbase-> zookeeper-> hadoop
使用自帶的zookeeper
啟動順序: hadoop-> hbase
停止順序:hbase-> hadoop
如下所示:
cd /usr/local/hadoop
./sbin/start-all.sh
cd /usr/local/hbase
./bin/start-hbase.sh //啟動HBase
./bin/hbase shell //啟動hbase shell
這樣就可以進入hbase shell命令提示符狀態。下面我們在HBase數據庫中創建student表(注意:在關系型數據庫MySQL中,需要首先創建數據庫,然后再創建表,但是,在HBase數據庫中,不需要創建數據庫,只要直接創建表就可以):
hbase> list # 查看所有表
hbase> disable 'student' # 禁用表
hbase> drop 'student' # 刪除表
下面讓我們一起來創建一個student表,我們可以在hbase shell中使用下面命令創建:
hbase> create 'student','info'
hbase> describe 'student'
//首先錄入student表的第一個學生記錄
hbase> put 'student','1','info:name','Xueqian'
hbase> put 'student','1','info:gender','F'
hbase> put 'student','1','info:age','23'
//然后錄入student表的第二個學生記錄
hbase> put 'student','2','info:name','Weiliang'
hbase> put 'student','2','info:gender','M'
hbase> put 'student','2','info:age','24'
數據錄入結束后,可以用下面命令查看剛才已經錄入的數據:
//如果每次只查看一行,就用下面命令
hbase> get 'student','1'
//如果每次查看全部數據,就用下面命令
hbase> scan 'student'
准備工作二:配置Spark
在開始編程操作HBase數據庫之前,需要對做一些准備工作。
(1)請新建一個終端,執行下面命令,把HBase的lib目錄下的一些jar文件拷貝到Spark中,這些都是編程時需要引入的jar包,需要拷貝的jar文件包括:所有hbase開頭的jar文件、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar,可以打開一個終端按照以下命令來操作:
cd /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/
mkdir hbase
cd hbase
cp /usr/local/hbase/lib/hbase*.jar ./
cp /usr/local/hbase/lib/guava-12.0.1.jar ./
cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
只有這樣,后面編譯和運行過程才不會出錯。
編寫程序讀取HBase數據
如果要讓Spark讀取HBase,就需要使用SparkContext提供的newAPIHadoopRDD API將表的內容以RDD的形式加載到Spark中。
請在Linux系統中打開一個終端,然后執行以下命令:
cd /usr/local/spark/mycode
mkdir hbase
cd hbase
mkdir -p src/main/scala
cd src/main/scala
vim SparkOperateHBase.scala
然后,在SparkOperateHBase.scala文件中輸入以下代碼:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkOperateHBase {
def main(args: Array[String]) {
val conf = HBaseConfiguration.create()
val sc = new SparkContext(new SparkConf())
//設置查詢的表名
conf.set(TableInputFormat.INPUT_TABLE, "student")
val stuRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val count = stuRDD.count()
println("Students RDD Count:" + count)
stuRDD.cache()
//遍歷輸出
stuRDD.foreach({ case (_,result) =>
val key = Bytes.toString(result.getRow)
val name = Bytes.toString(result.getValue("info".getBytes,"name".getBytes))
val gender = Bytes.toString(result.getValue("info".getBytes,"gender".getBytes))
val age = Bytes.toString(result.getValue("info".getBytes,"age".getBytes))
println("Row key:"+key+" Name:"+name+" Gender:"+gender+" Age:"+age)
})
}
}
然后就可以用sbt打包編譯。不過,在編譯之前,需要新建一個simple.sbt文件,在simple.sbt配置文件中,需要知道scalaVersion、spark-core、hbase-client、hbase-common、hbase-server的版本號。在前面章節大數據-03-Spark入門的“編寫Scala獨立應用程序”部分,我們已經介紹了如何尋找scalaVersion和spark-core的版本號,這里不再贅述。現在介紹如何找到你自己電腦上安裝的HBase的hbase-client、hbase-common、hbase-server的版本號。
請在Linux系統中打開一個終端,輸入下面命令:
cd /usr/local/hbase # 這是筆者電腦的hbase安裝目錄
cd lib
ls
ls命令會把“/usr/local/hbase/lib”目錄下的所有jar文件全部列出來,其中,就可以看到下面三個文件:
hbase-client-1.1.2.jar
hbase-common-1.1.2.jar
hbase-server-1.1.2.jar
根據上面三個文件,我們就可以得知hbase-client、hbase-common、hbase-server的版本號是1.1.5(當然,你的電腦上可能不是這個版本號,請以你自己的版本號為准)。
有了這些版本號信息,我們就可以新建一個simple.sbt文件:
cd /usr/local/spark/mycode/hbase
vim simple.sbt
然后在simple.sbt中錄入下面內容:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.2"
保存該文件,退出vim編輯器。
然后,輸入下面命令:
find .
應該可以看到類似下面的文件結構:
.
./src
./src/main
./src/main/scala
./src/main/scala/SparkOperateHBase.scala
./simple.sbt
下面就可以運行sbt打包命令:
/usr/local/sbt/sbt package
打包成功以后,生成的 jar 包的位置為 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。
最后,通過 spark-submit 運行程序。我們就可以將生成的 jar 包通過 spark-submit 提交到 Spark 中運行了,命令如下:
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkOperateHBase" /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
特別強調,上面命令中,必須使用“–driver-class-path”參數指定依賴JAR包的路徑,而且必須把”/usr/local/hbase/conf”也加到路徑中。
執行后得到如下結果:
Students RDD Count:2
Row key:1 Name:Xueqian Gender:F Age:23
Row key:2 Name:Weiliang Gender:M Age:24
編寫程序向HBase寫入數據
下面編寫程序向HBase中寫入兩行數據。
請打開一個Linux終端,輸入如下命令:
cd /usr/local/spark/mycode/hbase
vim src/main/scala/SparkWriteHBase.scala
上面命令用vim編輯器新建了一個文件SparkWriteHBase.scala,然后,在SparkWriteHBase.scala文件中輸入下面代碼:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.spark._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
object SparkWriteHBase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkWriteHBase").setMaster("local")
val sc = new SparkContext(sparkConf)
val tablename = "student"
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = new Job(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val indataRDD = sc.makeRDD(Array("3,Rongcheng,M,26","4,Guanhua,M,27")) //構建兩行記錄
val rdd = indataRDD.map(_.split(',')).map{arr=>{
val put = new Put(Bytes.toBytes(arr(0))) //行健的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(arr(1))) //info:name列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("gender"),Bytes.toBytes(arr(2))) //info:gender列的值
put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(arr(3).toInt)) //info:age列的值
(new ImmutableBytesWritable, put)
}}
rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
}
}
保存該文件退出vim編輯器,然后,使用sbt打包編譯,命令如下:
/usr/local/sbt/sbt package
打包成功以后,生成的 jar 包的位置為 /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar。實際上,由於之前我們已經編寫了另外一個代碼文件SparkOperateHBase.scala,所以,simple-project_2.11-1.0.jar中實際包含了SparkOperateHBase.scala和SparkWriteHBase.scala兩個代碼文件的編譯結果(class文件),在運行命令時,可以通過–class后面的名稱參數來決定運行哪個程序, 這個名字就是scala文件名。
最后,通過 spark-submit 運行程序。我們就可以將生成的 jar 包通過 spark-submit 提交到 Spark 中運行了,命令如下:
/usr/local/spark/spark-2.3.0-bin-hadoop2.7/bin/spark-submit --driver-class-path /usr/local/spark/spark-2.3.0-bin-hadoop2.7/jars/hbase/*:/usr/local/hbase/conf --class "SparkWriteHBase" /usr/local/spark/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
執行后,我們可以切換到剛才的HBase終端窗口,在HBase shell中輸入如下命令查看結果:
hbase> scan 'student'