尚硅谷大數據Spark-2019版最新


第一章:

一.介紹

Spark是基於內存的迭代計算

 

四.Local模式

僅僅本機運行

Local[k]代表有幾個線程在跑

Local[*]代表跑滿

 

五.spark使用

1.bin/spark-submit 參數,可以用來提交任務

參數如下

--master 指定Master的地址,默認為Local
--class: 你的應用的啟動類 (如 org.apache.spark.examples.SparkPi)
--deploy-mode: 是否發布你的驅動到worker節點(cluster) 或者作為一個本地客戶端 (client) (default: client)*
--conf: 任意的Spark配置屬性, 格式key=value. 如果值包含空格,可以加引號“key=value” 
application-jar: 打包好的應用jar,包含依賴. 這個URL在集群中全局可見。 比如hdfs:// 共享存儲系統, 如果是 file:// path, 那么所有的節點的path都包含同樣的jar
application-arguments: 傳給main()方法的參數
--executor-memory 1G 指定每個executor可用內存為1G
--total-executor-cores 2 指定每個executor使用的cup核數為2個

執行如下

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--executor-memory 1G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100

 

2.bin/spark-shell,進入命令行環境,默認很多東西會創建好,比如sc變量

jsp命令查看java運行的程序

spark-shell提示的,網址,比如hadoop102:4040,是查看網頁版的程序運行狀態器,即Spark Jobs

yarn application -list,查看應用id

 

六.WordCount程序

1.load(textFile)

2.flat(flatMap)

3.group(groupBy或者map為元組)

4.聚合(reduceByKey)

5.打印(collect)

6.停止(sc.stop)

 

七.Idea開發

1.maven項目

2.引入spark依賴

3.添加build段落

4.新建scala,標記為source dir

5.新建scala文件。因為是maven build有引入,所以可以自動添加scala標記

6.new SparkContext,創建context

7.new SparkConf,創建配置,有一系列set方法

 

八.Yarn部署

yarn-client:有交互

yarn-cluster:無交互

區別是driver程序要哪里執行,運行節點

需要修改以下配置文件

 

需要啟動HDFS和Yarn集群

 

spark-submit要添加以下參數

 bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100

 

日志查看:

1.修改配置文件spark-defaults.conf

spark.yarn.historyServer.address=hadoop102:18080
spark.history.ui.port=18080

2.重啟歷史服務

sbin/stop-history-server.sh stopping org.apache.spark.deploy.history.HistoryServer
sbin/start-history-server.sh starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out

3.提交任務

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar \
100

4.去hadoop網頁查看

 

spark-shell設置為master yarn模式

spark-shell --master yarn

 

spark程序打包:

使用以下maven插件

<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>WordCount</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
      </plugin>

雙擊package部分

 

文件路徑查找:

默認從部署環境中查找

 

九.Standalone模式

Spark獨立部署,不依賴其他大數據組件

xcall jps:在所有集群上運行jps

 

十.Driver和Executor關系

Driver:創建Spark應用上下文環境對象的程序,是個驅動器

Executor:接收任務並執行任務

Driver發送任務

Executor向Driver反饋任務執行情況

所有算子里的計算功能(參數函數)都是Executor執行

如果Executor里用到了Driver里的變量,這個變量要可以序列化,方便Driver傳送這個變量給Executor

 

十一.歷史服務

可以在網頁上查看歷史任務

 

十二.高可用HA

利用zookeeper實現高可用,編寫配置文件即可

 

第二章.RDD

一.JAVA IO回顧

 輸入 輸出

字節 字符

 FileInputStream

BufferedInputStream

BufferedReader,InputStreamReader

 

二.RDD概述

RDD運算是裝飾者模式,將數據處理邏輯封裝,即對數據計算的抽象,不保留數據

可分區:適合並行計算

RDD屬性:

1)      一組分區(Partition),即數據集的基本組成單位;

2)      一個計算每個分區的函數;

3)      RDD之間的依賴關系;

4)      一個Partitioner,即RDD的分片函數;

5)      一個列表,存儲存取每個Partition的優先位置(preferred location)。

 依賴關系:血緣

 

移動數據不如移動計算

 

算子:

1.轉換算子,進行數據變換

2.行動算子,觸發計算,產生輸出

 

二.RDD編程

1.RDD創建

從集合里:makeRDD,parallelize

從文件:sc.textFile

從其他RDD

 

保存到文件:rdd.saveAsTextFile

 

RDD分區規則用hadoopFile,分片規則完全一樣

 

計算數據的分區與存儲數據到分區是分開的

hadoopfs按行存儲於讀取

 

三.RDD轉換算子

分為Value類型和Key-Value類型

 

四.Vaule類型轉換算子

1.map(func)

返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成

 

2.mapPartitions(func)

類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區。

目的是將分區里的計算集中分給一個Executor,容易OOM內存不夠用

 

3.mapPartitionsWithInex(func)

類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U];

可以根據分區號追蹤任務

 

4.flatMap(func)

類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)

 

5.glom()

將每一個分區形成一個數組,形成新的RDD類型時RDD[Array[T]]

 

6.groupBy(func)

分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器。返回對偶元祖

 

7.filter(func)

過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成

 

8.sample(withReplacement, fraction, seed)

以指定的隨機種子隨機抽樣出數量為fraction的數據,withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣,seed用於指定隨機數生成器種子

 

9.distinct([numTasks])

對源RDD進行去重后返回一個新的RDD。默認情況下,只有8個並行任務來操作,但是可以傳入一個可選的numTasks參數改變它

 

沒有shuffle性能好

 

10.coalesce(numPartitions)

縮減分區數,用於大數據集過濾后,提高小數據集的執行效率

 

11.repatition(numPartitions) 

根據分區數,重新通過網絡隨機洗牌所有數據

 

12.sortBy(func,[ascending], [numTasks])

使用func先對數據進行處理,按照處理后的數據比較結果排序,默認為正序

 

分區數量改變,task改變,並行度度改變

 

13.pipe(command, [envVars]) 

管道,針對每個分區,都執行一個shell腳本,返回輸出的RDD

 

五.雙Value

 兩個RDD集合操作,交並差笛卡爾積zip等

 

六.kv算子

這些算子被調用,要求數據一定要是kv格式

 

1.partitionBy

對pairRDD進行分區操作,如果原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 否則會生成ShuffleRDD,即會產生shuffle過程

 

2.groupBykey

groupByKey也是對每個key進行操作,但只生成一個sequence

 

3.reduceBykey

在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置

 

4.aggregateBykey

在kv對的RDD中,按key將value進行分組合並,合並時,將每個value和初始值作為seq函數的參數,進行計算,返回的結果作為一個新的kv對,然后再將結果按照key進行合並,最后將每個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果作為一個新的kv對輸出

 

5.foldBykey

aggregateByKey的簡化操作,seqop和combop相同

 

6.combineBykey

對相同K,把V合並成一個集合。第一個值也是用函數計算出來。得到一個V的計算值

 

7.sortByKey

在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD

 

8.mapValues

 針對於(K,V)形式的類型只對V進行操作

 

9.join

在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD

 

10.cogroup

在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD

 

七.行動算子

把RDD里的數據當做一個數組來處理

 

1.reduce

通過func函數聚集RDD中的所有元素,先聚合分區內數據,再聚合分區間數據

 

2.collect

在驅動程序中,以數組的形式返回數據集的所有元素

 

3.count

返回RDD中元素的個數

 

4.first

返回RDD中元素的個數

 

5.take

返回一個由RDD的前n個元素組成的數組

 

6.takeOrdered

返回該RDD排序后的前n個元素組成的數組

 

7.aggregate

aggregate函數將每個分區里面的元素通過seqOp和初始值進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致

 

8.fold

折疊操作,aggregate的簡化操作,seqop和combop一樣

 

9.saveAsTextFile

將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本

 

10.saveAsSequenceFile

將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統

 

11.saveAsObjectFile
用於將RDD中的元素序列化成對象,存儲到文件中

 

12.countByKey

針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數

 

13.foreach(func)

 在數據集的每一個元素上,運行函數func進行更新

 

八.Task執行序列化

網絡里傳遞的對象一定要序列化

 

九.RDD依賴關系

 RDD里保存了依賴關系,出錯了可以重來一遍

toDebugString:查看血緣

dependicies:查看依賴

 

窄依賴:窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴我們形象的比喻為獨生子女

 

寬依賴:寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引起shuffle,總結:寬依賴我們形象的比喻為超生

 

十.DAG

有向無環圖 

DAG划分stage,寬依賴是划分Stage的依據

 

十一.任務划分

RDD任務切分中間分為:Application、Job、Stage和Task

 

十二.緩存

persist或者cache方法內存緩存

為的是存儲耗時動作

 

十三.檢查點 

 checkponit函數持久化,要先用setCheckoutDir設置

緩存和檢查點適合長鏈計算,防出錯

 

十四.RDD分區器

 通過使用RDD的partitioner 屬性來獲取 RDD 的分區方式

 

Hash分區

 HashPartitioner分區的原理:對於給定的key,計算其hashCode,並除以分區的個數取余,如果余數小於0,則用余數+分區的個數(否則加0),最后返回的值就是這個key所屬的分區ID。

 

Ranger分區

HashPartitioner分區弊端:可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據。

RangePartitioner作用:將一定范圍內的數映射到某一個分區內,盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大,但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內

 

自定義分區

要實現自定義的分區器,你需要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。

(1)numPartitions: Int:返回創建出來的分區數。

(2)getPartition(key: Any): Int:返回給定鍵的分區編號(0到numPartitions-1)。

(3)equals():Java 判斷相等性的標准方法。這個方法的實現非常重要,Spark 需要用這個方法來檢查你的分區器對象是否和其他分區器實例相同,這樣 Spark 才可以判斷兩個 RDD 的分區方式是否相同

 

 使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可。Spark 中有許多依賴於數據混洗的方法,比如 join() 和 groupByKey(),它們也可以接收一個可選的 Partitioner 對象來控制輸出數據的分區方式

 

 

 十五.數據讀取與保存

Spark的數據讀取及數據保存可以從兩個維度來作區分:文件格式以及文件系統。

文件格式分為:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;

文件系統分為:本地文件系統、HDFS、HBASE以及數據庫。

使用RDD讀取JSON文件處理很復雜,同時SparkSQL集成了很好的處理JSON文件的方式,所以應用中多是采用SparkSQL處理JSON文件

 

讀取mysql:引入mysql依賴,通過JdbcRDD訪問

為避免連接數過多,可以以partition為單位操作數據庫,可能OOM

 

讀取hbase:引入hbase依賴,newAPIHadoopRDD創建

 

十六.累加器

Spark三大數據結構:

1.RDD:分布式數據集

2.廣播變量:分布式只讀共享變量

3.累加器:分布式只寫共享變量

 

創建累加器:sc.longAccumulator

add,value

也可以自定義累加器

 

十七.廣播變量

目的是提高效率,減少傳輸,傳送大變量

創建:broadcast

 

第三章.SparkSql

一.概述

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了2個編程抽象:DataFrame和DataSet,並且作為分布式SQL查詢引擎的作用

Spark SQL是將SQL轉換成RDD,然后提交到集群執行,執行效率非常快

HDFS:數據沒有結構存儲;數倉:數據結構化(加標簽)分門別類存儲

Spark SQL通過DataFrame和DataSet解決了RDD里的數據沒有結構的問題

 

二.DataFrame於DataSet介紹

RDD是計算的抽象,DataFrame更近一步,是結構(Schema)的抽象,不過只包含數據表的字段名及給字段類型沒有類ORM映射(只能在SQL語句里用,不能在后續程序取結果集的時候用) 。DataSet添加了類型靜態編譯檢查,最新版本的DataFrame是Dataset[Row]的特例不能直接獲取列的屬性名,即ORM(把數據庫的東西放進類里映射到相應屬性),但區別是DataSet是以類來指定結構的,DataFram是一個個基本類型來指定結構的,即Row。

DataFrame沒有類,那么獲得的Row類型對象,只能用類似SQL獲取結果集的方式->getInt(0)這樣的方式取列的值,而不是直接用屬性名(或者用case匹配)

RDD,增加結構信息->DataFrame,增加類->DataSet

 

三.DataFrame操作

SparkSession是Spark最新的SQL查詢起始點

命令行里SparkSession的對象叫做spark

創建DataFrame方式:

1.通過Spark的數據源進行創建

spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet   schema   table   text   textFile

2.從一個存在的RDD進行轉換

3.還可以從Hive Table進行查詢返回

 

df.show:打印數據集結果

 

語法風格:一種是傳統SQL語句,一種是DSL以函數風格調用

 

SQL風格:

視圖:只能查不能改

表:能查能改

 

createTempView:創建臨時視圖

createGlobalView:創建全局視圖,訪問的時候加全局限定名,修飾視圖名

 

spark.sql:傳入SQL語句

 

show:顯示

 

DSL風格:

printSchema:打印表結構

select函數的參數即是列名,也是要計算的函數體語句

$"name":傳參的時候,不會與后面的數據進行計算來得到列名。遍歷每一行的時候,引用name列的值

好處一是編寫簡單,一個函數可以完成很多SQL語句,二是有類型檢查

 

RDD與DataFrame轉換

如果需要RDD與DF或者DS之間操作,那么都需要引入 import spark.implicits._  spark不是包名,而是sparkSession對象的名稱

手動轉換:toDF("name","age"),轉為DataFrame並制定Schema

 

反射轉換:

(1)創建一個樣例類

scala> case class People(name:String, age:Int)

(2)根據樣例類將RDD轉換為DataFrame

scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF

res2: org.apache.spark.sql.DataFrame = [name: string, age: int]

就是RDD的范型要是一個類,toDF可以反射

 

編程方式:

(1)導入所需的類型

scala> import org.apache.spark.sql.types._

import org.apache.spark.sql.types._

(2)創建Schema

scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)

structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))

(3)導入所需的類型

scala> import org.apache.spark.sql.Row

import org.apache.spark.sql.Row

(4)根據給定的類型創建二元組RDD

scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}

data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33

(5)根據數據及給定的schema創建DataFrame

scala> val dataFrame = spark.createDataFrame(data, structType)

dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]

手動構建DataFrame需要的源RDD數據和數據結構

 

DataFrame轉為為RDD

DataFrame的rdd方法,RDD里的泛型為Row

 

四.DataSet操作

Dataset是具有強類型的數據集合,需要提供對應的類型信息

創建:

1)創建一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

2)創建DataSet

scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()

caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

 

RDD轉換為DataSet

1)創建一個RDD

scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")

peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27

2)創建一個樣例類

scala> case class Person(name: String, age: Long)

defined class Person

3)將RDD轉化為DataSet

scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS()

 

DataSet轉為RDD:

調用rdd方法即可

 

DataFrame與DataSet互相操作:

1.DataFrame轉為DataSet

思路是提供類信息

(1)導入隱式轉換

import spark.implicits._

(2)創建樣例類

case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型

(3)轉換

val testDS = testDF.as[Coltest]

 

2.DataSet轉為DataFrame

這個很簡單,因為只是把case class封裝成Row

(1)導入隱式轉換

import spark.implicits._

(2)轉換

val testDF = testDS.toDF

 

RDD,DataFrame,DataSet關系

RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)

 

如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同是的他們的執行效率和執行方式。

 

在后期的Spark版本中,DataSet會逐步取代RDD和DataFrame成為唯一的API接口。

 

五.IDEA寫Spark SQL

引入Spark SQL依賴

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.1</version></dependency>

 

六.自定義函數

udf:

SparkSession的udf.register注冊自定義的函數,一個函數名一個函數體 。用於遍歷每一項時使用

 

自定義聚合函數: 

弱類型用戶自定義聚合函數:通過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數

import org.apache.spark.sql.expressions.MutableAggregationBuffer

import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

import org.apache.spark.sql.SparkSession

object 
MyAverage extends UserDefinedAggregateFunction {


// 聚合函數輸入參數的數據類型 

def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
 // 聚合緩沖區中值得數據類型

def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
 // 返回值的數據類型 

def dataType: DataType = DoubleType
 // 對於相同的輸入是否一直返回相同的輸出。 
def deterministic: Boolean = true
 // 初始化

def initialize(buffer: MutableAggregationBuffer): Unit = { // 存工資的總額
 buffer(0) = 0L // 存工資的個數
 buffer(1) = 0L
 }
 // 相同Execute間的數據合並。 def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{
if (!input.isNullAt(0))
{
buffer(0) = buffer.getLong(0) + input.getLong(0)

buffer(1) = buffer.getLong(1) + 1
}
}
 // 不同Execute間的數據合並 
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
 // 計算最終結果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
} 

// 注冊函數

spark.udf.register("myAverage", MyAverage)

 val df = spark.read.json("examples/src/main/resources/employees.json")
 df.createOrReplaceTempView("employees")

df.show()
 // +-------+------+
 // | name|salary| 
// +-------+------+
 // |Michael| 3000| 
// | Andy| 4500|
 // | Justin| 3500| 
// | Berta| 4000| 
// +-------+------+

 val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")

result.show()
 // +--------------+
 // |average_salary| 
// +--------------+ 
// | 3750.0| 
// +--------------+

 

強類型用戶自定義聚合函數:

通過繼承Aggregator來實現強類型自定義聚合函數。可以不用取buffer的序列,直接通過字段名獲得值

使用的時候先用toColumn.name轉為列,DSL方式來用

// Convert the function to a `TypedColumn` and give it a name

val averageSalary = MyAverage.toColumn.name("average_salary")
 val result = ds.select(averageSalary)

result.show()

 

七.不同數據源的讀取與保存

1.通用加載/保存方法

手動指定選項:

SparkSession提供的read.load方法用於通用加載數據,使用write.save保存數據

format方法指定數據類型

sql方法指定要在文件上運行的sql語句

 

文件保存選項:

mode,saveMode方法設置

Scala/Java

Any Language

Meaning

SaveMode.ErrorIfExists(default)

"error"(default)

如果文件存在,則報錯

SaveMode.Append

"append"

追加

SaveMode.Overwrite

"overwrite"

覆寫

SaveMode.Ignore

"ignore"

數據存在,則忽略

 

 

 

 

 

 

 

 

2.Hive

Hive里的語句除了跑SQL,也可以操作數據庫和表

Hive分Spark SQL自帶的內置Hive和外部Hive

配置:本地使用 --conf spark.sql.warehouse.dir=hdfs://hadoop102/spark-wearhouse 確定內置Hive的位置

 

注意:如果你使用的是內部的Hive,在Spark2.0之后,spark.sql.warehouse.dir用於指定數據倉庫的地址,如果你需要是用HDFS作為路徑,那么需要將core-site.xml和hdfs-site.xml 加入到Spark conf目錄,否則只會創建master節點上的warehouse目錄,查詢時會出現文件找不到的問題,這是需要使用HDFS,則需要將metastore刪除,重啟集群

 

外部Hive

1)     將Hive中的hive-site.xml拷貝或者軟連接到Spark安裝目錄下的conf目錄下。

2)     打開spark shell,注意帶上訪問Hive元數據庫的JDBC客戶端

$ bin/spark-shell  --jars mysql-connector-java-5.1.27-bin.jar

 

Saprk SQL CLI

Spark SQL CLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行如下

 

代碼里使用Hive

1)添加依賴:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-hive_2.11</artifactId>

    <version>2.1.1</version>

</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->

<dependency>

    <groupId>org.apache.hive</groupId>

    <artifactId>hive-exec</artifactId>

    <version>1.2.1</version>

</dependency>

(2)創建SparkSession時需要添加hive支持(紅色部分)

val warehouseLocation: String = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()

注意:藍色部分為使用內置Hive需要指定一個Hive倉庫地址。若使用的是外部Hive,則需要將hive-site.xml添加到ClassPath下。

 

 

第四章.SpardStreaming


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM