大數據技術-spark+hive+hbase研究


大數據 spark 研究

0基礎入門)

一 背景

基礎

Scala 語言基礎:Scala詳細總結(精辟版++)

spark 介紹    :  spark介紹

 

 

二 環境

部署spark

 

<![if !supportLists]>1、<![endif]>環境准備
1)配套軟件版本要求:

Java 6+ 

Python 2.6+. 

Scala version (2.10.x).


2)安裝好linuxjdkpython, 一般linux均會自帶安裝好jdkpython但注意jdk默認為openjdk,建議重新安裝oracle jdk


3IP10.171.29.191  hostnamemaster


2、安裝scala


1)下載scala
wget http://downloads.typesafe.com/scala/2.10.5/scala-2.10.5.tgz

2)解壓文件
tar -zxvf scala-2.10.5.tgz

3)配置環境變量
#vi/etc/profile
#SCALA VARIABLES START
export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5
export PATH=$PATH:$SCALA_HOME/bin
#SCALA VARIABLES END

$ source /etc/profile
$ scala -version
Scala code runner version 2.10.5 -- Copyright 2002-2013, LAMP/EPFL

4)驗證scala
$ scala
Welcome to Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_51).
Type in expressions to have them evaluated.
Type :help for more information.

scala> 9*9
res0: Int = 81

3、安裝spark
1)下載spark
wget http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz

2)解壓spark
tar -zxvf http://mirror.bit.edu.cn/apache/spark/spark-1.3.1/spark-1.3.1-bin-hadoop2.6.tgz

3)配置環境變量
#vi/etc/profile
#SPARK VARIABLES START 
export SPARK_HOME=/mnt/jediael/spark-1.3.1-bin-hadoop2.6
export PATH=$PATH:$SPARK_HOME/bin 
#SPARK VARIABLES END

$ source /etc/profile

4)配置spark
 $ pwd
/mnt/jediael/spark-1.3.1-bin-hadoop2.6/conf

$ mv spark-env.sh.template spark-env.sh
$vi spark-env.sh
export SCALA_HOME=/home/jediael/setupfile/scala-2.10.5
export JAVA_HOME=/usr/java/jdk1.7.0_51
export SPARK_MASTER_IP=10.171.29.191
export SPARK_WORKER_MEMORY=512m 
export master=spark://10.171.29.191:7070

$vi slaves
localhost

5)啟動spark
pwd
/mnt/jediael/spark-1.3.1-bin-hadoop2.6/sbin
$ ./start-all.sh 
注意,hadoop也有start-all.sh腳本,因此必須進入具體目錄執行腳本

$ jps
30302 Worker
30859 Jps
30172 Master

4、驗證安裝情況
1)運行自帶示例
$ bin/run-example  org.apache.spark.examples.SparkPi

2)查看集群環境
http://master:8080/

3)進入spark-shell
$spark-shell

4)查看jobs等信息
http://master:4040/jobs/

 

 

部署開發環境

 

  下載安裝ScalaI IDEScala IDE

  

三 示例入門

  1 建議查看借鑒 spark安裝目錄地下的examples目錄

 

四 爬過的坑

 

1 開啟spark服務時,報錯 

#localhost port 22: Connection refused

  

解決:是因為沒有安裝openssh-server,輸入命令 sudo apt-get install openssh-server安裝之后,即可解決

 

2 在eclipse上建立的spark項目,無法運行,報錯:錯誤: 找不到或無法加載主類 

  問題出現條件,當在項目中添加spark的jar包時,就會出現項目報錯。

  

   解決:右鍵工程 ----> Properties --->Scala Compiler

  1 勾選 Use Project Settings

  2 在Scala Installation 下拉框選擇一個能用的Scala版本,點擊應用即可解決

 

 

 

 

 

 

 

 

 

3 RDD 轉換成為DataFrame

 

  

     val conf = new SparkConf()

    conf.setAppName("SparkSQL")

    conf.setMaster("local")

    

    val sc = new SparkContext(conf)

    val sq = new SQLContext(sc)

    import sq.implicits._ // 加上這句話,才能隱式的轉換

    

    val people = sc.textFile("/wgx-linux/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

    people.registerTempTable("people")

    val result = sq.sql("select * from people")

 

   

4 RDD 操作分為transformation與action

 

在查詢數據時,無論先前是何種變換,都是transformation直到去取結果時,才帶有執行!若在沒取結果前,計算效率時間,是錯誤的!

五 基礎概念

HDFS

 

 

 

 

 

 

 

  一個Namenode節點管理成千上萬個DatanodesNamenode相當於資源管理器,Datanodes相當於數據資源。

一個文件分塊存儲到不同的Datanodes,每個塊都會有副本。

 

MapReduce

 

  假設有這么一個任務,需要計算出一個大文件存儲的最大的數據,下圖給出了mapReduce的計算過程。

   

 

RDD

 

 

 

   RDD將操作分為兩類:transformation與action。無論執行了多少次transformation操作,RDD都不會真正執行運算,只有當action操作被執行時,運算才會觸發。而在RDD的內部實現機制中,底層接口則是基於迭代器的,從而使得數據訪問變得更高效,也避免了大量中間結果對內存的消耗。

 

  例如map操作會返回MappedRDD,而flatMap則返回FlatMappedRDD。當我們執行map或flatMap操作時,不過是將當前RDD對象傳遞給對應的RDD對象而已transformation

 

 

DDL

 

 

 

  數據庫模式定義語言DDL(Data Definition Language),是用於描述數據庫中要存儲的現實世界實體的語言。一個數據庫模式包含該數據庫中所有實體的描述定義。

 

 

六  spark +hbase

 

 

 1 在Spark-evn.sh里添加hbase的庫(否則會報錯誤)

SPARK_CLASSPATH=/home/victor/software/hbase/lib/*

 

2 hbase數據結構

  

 

3 連接數據庫

    val sqconf = HBaseConfiguration.create()  

    sqconf.set("hbase.zookeeper.property.clientPort""2181")

    sqconf.set("hbase.zookeeper.quorum""localhost")

    val admin = new HBaseAdmin(sqconf)

 

 

4 增刪改查

val sqconf = HBaseConfiguration.create()  

     sqconf.set("hbase.zookeeper.property.clientPort""2181")

     sqconf.set("hbase.zookeeper.quorum""localhost")

     val admin = new HBaseAdmin(sqconf)

    

    

    // 建表

     if (!admin.isTableAvailable("test-user")) {//檢查表是否存在  

      print("Table Not Exists! Create Table")  

      val tableDesc = new HTableDescriptor("test-user")  //表名

      tableDesc.addFamily(new HColumnDescriptor("name".getBytes()))//添加列簇

      tableDesc.addFamily(new HColumnDescriptor("id".getBytes()))//添加列簇

      admin.createTable(tableDesc)//建表

      

      println("create table test-user")

      

      

     }

    

    // 增

    val table = new HTable(sqconf"test-user");  

    for (i <- 1 to 10) {  

      var put = new Put(Bytes.toBytes("row"+i)) 

      put.add(Bytes.toBytes("name"), Bytes.toBytes("name"), Bytes.toBytes("value " + i))//往列簇basic添加字段name值為value

      put.add(Bytes.toBytes("name"), Bytes.toBytes("xing"), Bytes.toBytesi))

      put.add(Bytes.toBytes("id"), Bytes.toBytes("id"), Bytes.toBytesi))

      table.put(put

    }  

    table.flushCommits()

    

    // 刪

    val delete = new Delete(Bytes.toBytes("row0"))//刪除row1數據

    table.delete(delete)

    table.flushCommits()

    // 改

    var put = new Put(Bytes.toBytes("row1")) 

    put.add(Bytes.toBytes("id"), Bytes.toBytes("id"), Bytes.toBytes"10001100"))

    table.put(put

    table.flushCommits()

    

    // 查

     val row1 =  new Get(Bytes.toBytes("row1"))

    val HBaseRow = table.get(row1)//獲取row為scutshuxue的數據

    if(HBaseRow != null && !HBaseRow.isEmpty){

      var result:AnyRef = null

      result = Bytes.toString(HBaseRow.getValue(Bytes.toBytes("id"), Bytes.toBytes("id")))//得到列簇為address屬性city的值

      println("result="+result)

}

 

 

5 帶條件的查詢

   import CompareFilter._

    // 帶條件的查詢

    // 查詢列族為id,列為id.值為3的數據行

    val filter  = new SingleColumnValueFilter(Bytes.toBytes("id"), Bytes.toBytes("id"),  

                    CompareOp.EQUAL, Bytes.toBytes("10001100")); 

    

    val scan = new Scan()

    scan.setFilter(filter)

    

    val scanner = table.getScanner(scan)

    var rsu = scanner.next()

    while(rsu != null){

      

      println("rowkey = " + rsu.getRow()); 

      println("value = " + rsu.getValueAsByteBuffer(Bytes.toBytes("id"), Bytes.toBytes("id"))); 

      rsu = scanner.next()

}

 

 

  

 

6 其他操作(group order 等)

  需要一個轉換,將hbase 轉換成RDD, 在轉化成DataFrame,注冊數據表,執行sql

sqconf.set(TableInputFormat.INPUT_TABLE"test-user")

    val usersRDD = sc.newAPIHadoopRDD(sqconfclassOf[TableInputFormat],

      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],

      classOf[org.apache.hadoop.hbase.client.Result])

      

      val personRDD = usersRDD.map(p => Peoson(p._2.getValue(Bytes.toBytes("name"),Bytes.toBytes("name")) +"",

          p._2.getValue(Bytes.toBytes("xing"),Bytes.toBytes("name"))+"",p._2.getValue(Bytes.toBytes("id"),Bytes.toBytes("id"))+""))

    

     if(personRDD != null){

       println("suc")

       println(personRDD.count())

     }

     else

       println("ERROR")

       

    println("to DataFrame suc")

    

    import sq.implicits._ 

    val people = personRDD.toDF()

    people.registerTempTable("people")

    

    val result = sq.sql("select * from people")

    

    result.map(t => "Name: " + t(0)).collect().foreach(println)

 

 

七 spark +hive 

   安裝spark之后,自帶有hive,所以不需要另外部署hive。

 

1 特點

 

  Hive不支持常規的SQL更新語句,如:數據插入,更新,刪除。

  Hive 數據查詢用時以數分鍾甚至數小時來進行計算即非即時性

  Hive 支持類sql的語法,即hql

 

2 hive導入數據

 

  def addDataToTable(sq:HiveContext){

    sq.sql("load data local inpath '/usr/wgx/test_user.txt' into table test_user1")

    sq.sql("load data local inpath '/usr/wgx/test_user_info.txt' into table test_user_info1")

  }

 

 

   

 

八 spark +mysql

 

九 調研分析

 

結果分析

注:1 order by 為隨機數排序

 

                                       (單位:秒)

方式

order by

 group

sum group

left join

數量

spark+hbase

46.727

15.490

13.320

45.943

100

spark+hive 

11.209

4.490

2.814

6.247

100

spark +mysql

 

 

 

 

100

 

續                                     (單位:秒)

方式

=

<>

<

>

數量

spark+hbase

11.682

29.947

10.484

10.849

100

spark+hive 

0.694

0.826

0.900

0.941

100

spark +mysql

 

 

 

 

100

 

   

                                      (單位:秒)

方式

order by

 group

sum group

left join

數量

spark+hbase

72.114

25.347

23.190

66.321

200

spark+hive 

12.009

5.026

3.385

8.620

200

spark +mysql

 

 

 

 

200

 

續                                   (單位:秒)

方式

=

<>

<

>

數量

spark+hbase

20.073

20.134

19.883

20.679

200

spark+hive 

1.33

1.564

1.359

1.355

200

spark +mysql

 

 

 

 

200

 

 

               

線性分析

 

 

          線性分析(spark +hbase

      (單位:秒)

 

100

200

600

order by

46.727

72.114

138.561

 group

15.490

25.347

51.306

sum group

13.320

23.190

50.565

left join

45.943

66.321

281.226 

[join1700萬】

=

11.682

20.073

46.582

<>

29.947

20.134

51.231

<

10.484

19.883

45.515

>

10.849

20.679

47.663

 

          線性分析(spark +hive

               (單位:秒)

 

100

200

600

order by

11.209

12.009

 

 group

4.490

5.026

 

sum group

2.814

3.385

 

left join

6.247

8.620

 

=

0.694

1.331

 

<>

0.826

1.564

 

<

0.900

1.359

 

>

0.941

1.355

 

 

 

 

十 hive hbase整合

 

整合方式一 sparkSQL 通過hive查詢hbase數據

 

原理:

 1 讀取hbase表,轉換成RDD

 2 將RDD轉換成對象模型RDD

 3 對象模型RDD注冊成虛擬臨時表

 4 從第三步的虛擬臨時表的數據導入hive表

 5 讀取hive表為RDD

 6 再將第五步的RDD注冊臨時表

 7 查詢

 

轉換效率

 

注:從hbase轉換成hive (單位:秒)

數量

200

400

600

時間

97.513

144.007

226.221

 

查詢效率線性分析(單位:秒)

 

200

400

600

order by

12.748

26.016

56.231

 group

5.114

7.871

21.625

sum group

3.765

5.869

9.379

left join

10.935

34.467

31.471

=

2.041

7.298

5.727

<>

2.662

5.534

8.502

<

1.907

4.115

5.499

>

2.120

4.049

5.644

 

 

2 整合方式二  以hive sql方式查詢hbase

 

原理:

spark 將hbase表轉換成RDD (模型轉換,並沒執行)

RDD 通過hive Context 注冊為臨時表

hive 執行查詢

 

整合效率

 

200

400

600

order by

 

 

148.701

 group

 

 

78.277

sum group

 

 

53.201

left join

 

 

314.468 [1]

=

 

 

46.615

<>

 

 

53.453

<

 

 

46.097

>

 

 

46.845

 

:600萬的表與1700萬數據表的join

 

整合方式三 hive表外部關聯hbase

 

  原理:

 

  在創建hive表時,在創建表的sql上加上對hbase表的關聯

  

sql("CREATE EXTERNAL TABLE user_t_info(id string,userId string,name string,phone string)"+

 

       "STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'"+

 

"WITH SERDEPROPERTIES (\"hbase.columns.mapping\" = \":key,id:id,info:name,info:phone \")"+

 

"TBLPROPERTIES(\"hbase.table.name\" = \"user_t_info\")")

 

   

整合效率

 

 

200

400

600

order by

 

 

240.608

 group

 

 

88.908

sum group

 

 

86.667

left join

 

 

 

=

 

 

79.768

<>

 

 

80.462

<

 

 

80.645

>

 

 

79.237

 

 

十一 結論分析

1 spark +hive 比spqrk+hbase 效率高

 

2 隨着數據量的增加,spark+hive 沒有成線性增加,spark+hbase大致成線性關系增加,總體上,spark+hive的增加幅度較小

 

3 shpark +hbase +hive 

  從hbase轉換成hive,數據量和時間大致成線性關系,比純線性關系好一點

  查詢效率來講,隨着數據量的增加,雖然時間有所增加,但幅度不大

 

十二 附錄

測試源碼

 

 

 

極點科技

誠信 專注 創新


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM