第一章:
一.介紹
Spark是基於內存的迭代計算
四.Local模式
僅僅本機運行
Local[k]代表有幾個線程在跑
Local[*]代表跑滿
五.spark使用
1.bin/spark-submit 參數,可以用來提交任務
參數如下
--master 指定Master的地址,默認為Local --class: 你的應用的啟動類 (如 org.apache.spark.examples.SparkPi) --deploy-mode: 是否發布你的驅動到worker節點(cluster) 或者作為一個本地客戶端 (client) (default: client)* --conf: 任意的Spark配置屬性, 格式key=value. 如果值包含空格,可以加引號“key=value” application-jar: 打包好的應用jar,包含依賴. 這個URL在集群中全局可見。 比如hdfs:// 共享存儲系統, 如果是 file:// path, 那么所有的節點的path都包含同樣的jar application-arguments: 傳給main()方法的參數 --executor-memory 1G 指定每個executor可用內存為1G --total-executor-cores 2 指定每個executor使用的cup核數為2個
執行如下
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --executor-memory 1G \ --total-executor-cores 2 \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100
2.bin/spark-shell,進入命令行環境,默認很多東西會創建好,比如sc變量
jsp命令查看java運行的程序
spark-shell提示的,網址,比如hadoop102:4040,是查看網頁版的程序運行狀態器,即Spark Jobs
yarn application -list,查看應用id
六.WordCount程序
1.load(textFile)
2.flat(flatMap)
3.group(groupBy或者map為元組)
4.聚合(reduceByKey)
5.打印(collect)
6.停止(sc.stop)
七.Idea開發
1.maven項目
2.引入spark依賴
3.添加build段落
4.新建scala,標記為source dir
5.新建scala文件。因為是maven build有引入,所以可以自動添加scala標記
6.new SparkContext,創建context
7.new SparkConf,創建配置,有一系列set方法
八.Yarn部署
yarn-client:有交互
yarn-cluster:無交互
區別是driver程序要哪里執行,運行節點
需要修改以下配置文件
需要啟動HDFS和Yarn集群
spark-submit要添加以下參數
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100
日志查看:
1.修改配置文件spark-defaults.conf
spark.yarn.historyServer.address=hadoop102:18080 spark.history.ui.port=18080
2.重啟歷史服務
sbin/stop-history-server.sh stopping org.apache.spark.deploy.history.HistoryServer sbin/start-history-server.sh starting org.apache.spark.deploy.history.HistoryServer, logging to /opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out
3.提交任務
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ ./examples/jars/spark-examples_2.11-2.1.1.jar \ 100
4.去hadoop網頁查看
spark-shell設置為master yarn模式
spark-shell --master yarn
spark程序打包:
使用以下maven插件
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <archive> <manifest> <mainClass>WordCount</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
雙擊package部分
文件路徑查找:
默認從部署環境中查找
九.Standalone模式
Spark獨立部署,不依賴其他大數據組件
xcall jps:在所有集群上運行jps
十.Driver和Executor關系
Driver:創建Spark應用上下文環境對象的程序,是個驅動器
Executor:接收任務並執行任務
Driver發送任務
Executor向Driver反饋任務執行情況
所有算子里的計算功能(參數函數)都是Executor執行
如果Executor里用到了Driver里的變量,這個變量要可以序列化,方便Driver傳送這個變量給Executor
十一.歷史服務
可以在網頁上查看歷史任務
十二.高可用HA
利用zookeeper實現高可用,編寫配置文件即可
第二章.RDD
一.JAVA IO回顧
輸入 輸出
字節 字符
FileInputStream
BufferedInputStream
BufferedReader,InputStreamReader
二.RDD概述
RDD運算是裝飾者模式,將數據處理邏輯封裝,即對數據計算的抽象,不保留數據
可分區:適合並行計算
RDD屬性:
1) 一組分區(Partition),即數據集的基本組成單位;
2) 一個計算每個分區的函數;
3) RDD之間的依賴關系;
4) 一個Partitioner,即RDD的分片函數;
5) 一個列表,存儲存取每個Partition的優先位置(preferred location)。
依賴關系:血緣
移動數據不如移動計算
算子:
1.轉換算子,進行數據變換
2.行動算子,觸發計算,產生輸出
二.RDD編程
1.RDD創建
從集合里:makeRDD,parallelize
從文件:sc.textFile
從其他RDD
保存到文件:rdd.saveAsTextFile
RDD分區規則用hadoopFile,分片規則完全一樣
計算數據的分區與存儲數據到分區是分開的
hadoopfs按行存儲於讀取
三.RDD轉換算子
分為Value類型和Key-Value類型
四.Vaule類型轉換算子
1.map(func)
返回一個新的RDD,該RDD由每一個輸入元素經過func函數轉換后組成
2.mapPartitions(func)
類似於map,但獨立地在RDD的每一個分片上運行,因此在類型為T的RDD上運行時,func的函數類型必須是Iterator[T] => Iterator[U]。假設有N個元素,有M個分區,那么map的函數的將被調用N次,而mapPartitions被調用M次,一個函數一次處理所有分區。
目的是將分區里的計算集中分給一個Executor,容易OOM內存不夠用
3.mapPartitionsWithInex(func)
類似於mapPartitions,但func帶有一個整數參數表示分片的索引值,因此在類型為T的RDD上運行時,func的函數類型必須是(Int, Interator[T]) => Iterator[U];
可以根據分區號追蹤任務
4.flatMap(func)
類似於map,但是每一個輸入元素可以被映射為0或多個輸出元素(所以func應該返回一個序列,而不是單一元素)
5.glom()
將每一個分區形成一個數組,形成新的RDD類型時RDD[Array[T]]
6.groupBy(func)
分組,按照傳入函數的返回值進行分組。將相同的key對應的值放入一個迭代器。返回對偶元祖
7.filter(func)
過濾。返回一個新的RDD,該RDD由經過func函數計算后返回值為true的輸入元素組成
8.sample(withReplacement, fraction, seed)
以指定的隨機種子隨機抽樣出數量為fraction的數據,withReplacement表示是抽出的數據是否放回,true為有放回的抽樣,false為無放回的抽樣,seed用於指定隨機數生成器種子
9.distinct([numTasks])
對源RDD進行去重后返回一個新的RDD。默認情況下,只有8個並行任務來操作,但是可以傳入一個可選的numTasks參數改變它
沒有shuffle性能好
10.coalesce(numPartitions)
縮減分區數,用於大數據集過濾后,提高小數據集的執行效率
11.repatition(numPartitions)
根據分區數,重新通過網絡隨機洗牌所有數據
12.sortBy(func,[ascending], [numTasks])
使用func先對數據進行處理,按照處理后的數據比較結果排序,默認為正序
分區數量改變,task改變,並行度度改變
13.pipe(command, [envVars])
管道,針對每個分區,都執行一個shell腳本,返回輸出的RDD
五.雙Value
兩個RDD集合操作,交並差笛卡爾積zip等
六.kv算子
這些算子被調用,要求數據一定要是kv格式
1.partitionBy
對pairRDD進行分區操作,如果原有的partionRDD和現有的partionRDD是一致的話就不進行分區, 否則會生成ShuffleRDD,即會產生shuffle過程
2.groupBykey
groupByKey也是對每個key進行操作,但只生成一個sequence
3.reduceBykey
在一個(K,V)的RDD上調用,返回一個(K,V)的RDD,使用指定的reduce函數,將相同key的值聚合到一起,reduce任務的個數可以通過第二個可選的參數來設置
4.aggregateBykey
在kv對的RDD中,按key將value進行分組合並,合並時,將每個value和初始值作為seq函數的參數,進行計算,返回的結果作為一個新的kv對,然后再將結果按照key進行合並,最后將每個分組的value傳遞給combine函數進行計算(先將前兩個value進行計算,將返回結果和下一個value傳給combine函數,以此類推),將key與計算結果作為一個新的kv對輸出
5.foldBykey
aggregateByKey的簡化操作,seqop和combop相同
6.combineBykey
對相同K,把V合並成一個集合。第一個值也是用函數計算出來。得到一個V的計算值
7.sortByKey
在一個(K,V)的RDD上調用,K必須實現Ordered接口,返回一個按照key進行排序的(K,V)的RDD
8.mapValues
針對於(K,V)形式的類型只對V進行操作
9.join
在類型為(K,V)和(K,W)的RDD上調用,返回一個相同key對應的所有元素對在一起的(K,(V,W))的RDD
10.cogroup
在類型為(K,V)和(K,W)的RDD上調用,返回一個(K,(Iterable<V>,Iterable<W>))類型的RDD
七.行動算子
把RDD里的數據當做一個數組來處理
1.reduce
通過func函數聚集RDD中的所有元素,先聚合分區內數據,再聚合分區間數據
2.collect
在驅動程序中,以數組的形式返回數據集的所有元素
3.count
返回RDD中元素的個數
4.first
返回RDD中元素的個數
5.take
返回一個由RDD的前n個元素組成的數組
6.takeOrdered
返回該RDD排序后的前n個元素組成的數組
7.aggregate
aggregate函數將每個分區里面的元素通過seqOp和初始值進行聚合,然后用combine函數將每個分區的結果和初始值(zeroValue)進行combine操作。這個函數最終返回的類型不需要和RDD中元素類型一致
8.fold
折疊操作,aggregate的簡化操作,seqop和combop一樣
9.saveAsTextFile
將數據集的元素以textfile的形式保存到HDFS文件系統或者其他支持的文件系統,對於每個元素,Spark將會調用toString方法,將它裝換為文件中的文本
10.saveAsSequenceFile
將數據集中的元素以Hadoop sequencefile的格式保存到指定的目錄下,可以使HDFS或者其他Hadoop支持的文件系統
11.saveAsObjectFile
用於將RDD中的元素序列化成對象,存儲到文件中
12.countByKey
針對(K,V)類型的RDD,返回一個(K,Int)的map,表示每一個key對應的元素個數
13.foreach(func)
在數據集的每一個元素上,運行函數func進行更新
八.Task執行序列化
網絡里傳遞的對象一定要序列化
九.RDD依賴關系
RDD里保存了依賴關系,出錯了可以重來一遍
toDebugString:查看血緣
dependicies:查看依賴
窄依賴:窄依賴指的是每一個父RDD的Partition最多被子RDD的一個Partition使用,窄依賴我們形象的比喻為獨生子女
寬依賴:寬依賴指的是多個子RDD的Partition會依賴同一個父RDD的Partition,會引起shuffle,總結:寬依賴我們形象的比喻為超生
十.DAG
有向無環圖
DAG划分stage,寬依賴是划分Stage的依據
十一.任務划分
RDD任務切分中間分為:Application、Job、Stage和Task
十二.緩存
persist或者cache方法內存緩存
為的是存儲耗時動作
十三.檢查點
checkponit函數持久化,要先用setCheckoutDir設置
緩存和檢查點適合長鏈計算,防出錯
十四.RDD分區器
通過使用RDD的partitioner 屬性來獲取 RDD 的分區方式
Hash分區
HashPartitioner分區的原理:對於給定的key,計算其hashCode,並除以分區的個數取余,如果余數小於0,則用余數+分區的個數(否則加0),最后返回的值就是這個key所屬的分區ID。
Ranger分區
HashPartitioner分區弊端:可能導致每個分區中數據量的不均勻,極端情況下會導致某些分區擁有RDD的全部數據。
RangePartitioner作用:將一定范圍內的數映射到某一個分區內,盡量保證每個分區中數據量的均勻,而且分區與分區之間是有序的,一個分區中的元素肯定都是比另一個分區內的元素小或者大,但是分區內的元素是不能保證順序的。簡單的說就是將一定范圍內的數映射到某一個分區內
自定義分區
要實現自定義的分區器,你需要繼承 org.apache.spark.Partitioner 類並實現下面三個方法。
(1)numPartitions: Int:返回創建出來的分區數。
(2)getPartition(key: Any): Int:返回給定鍵的分區編號(0到numPartitions-1)。
(3)equals():Java 判斷相等性的標准方法。這個方法的實現非常重要,Spark 需要用這個方法來檢查你的分區器對象是否和其他分區器實例相同,這樣 Spark 才可以判斷兩個 RDD 的分區方式是否相同
使用自定義的 Partitioner 是很容易的:只要把它傳給 partitionBy() 方法即可。Spark 中有許多依賴於數據混洗的方法,比如 join() 和 groupByKey(),它們也可以接收一個可選的 Partitioner 對象來控制輸出數據的分區方式
十五.數據讀取與保存
Spark的數據讀取及數據保存可以從兩個維度來作區分:文件格式以及文件系統。
文件格式分為:Text文件、Json文件、Csv文件、Sequence文件以及Object文件;
文件系統分為:本地文件系統、HDFS、HBASE以及數據庫。
使用RDD讀取JSON文件處理很復雜,同時SparkSQL集成了很好的處理JSON文件的方式,所以應用中多是采用SparkSQL處理JSON文件
讀取mysql:引入mysql依賴,通過JdbcRDD訪問
為避免連接數過多,可以以partition為單位操作數據庫,可能OOM
讀取hbase:引入hbase依賴,newAPIHadoopRDD創建
十六.累加器
Spark三大數據結構:
1.RDD:分布式數據集
2.廣播變量:分布式只讀共享變量
3.累加器:分布式只寫共享變量
創建累加器:sc.longAccumulator
add,value
也可以自定義累加器
十七.廣播變量
目的是提高效率,減少傳輸,傳送大變量
創建:broadcast
第三章.SparkSql
一.概述
Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了2個編程抽象:DataFrame和DataSet,並且作為分布式SQL查詢引擎的作用
Spark SQL是將SQL轉換成RDD,然后提交到集群執行,執行效率非常快
HDFS:數據沒有結構存儲;數倉:數據結構化(加標簽)分門別類存儲
Spark SQL通過DataFrame和DataSet解決了RDD里的數據沒有結構的問題
二.DataFrame於DataSet介紹
RDD是計算的抽象,DataFrame更近一步,是結構(Schema)的抽象,不過只包含數據表的字段名及給字段類型沒有類ORM映射(只能在SQL語句里用,不能在后續程序取結果集的時候用) 。DataSet添加了類型靜態編譯檢查,最新版本的DataFrame是Dataset[Row]的特例不能直接獲取列的屬性名,即ORM(把數據庫的東西放進類里映射到相應屬性),但區別是DataSet是以類來指定結構的,DataFram是一個個基本類型來指定結構的,即Row。
DataFrame沒有類,那么獲得的Row類型對象,只能用類似SQL獲取結果集的方式->getInt(0)這樣的方式取列的值,而不是直接用屬性名(或者用case匹配)
RDD,增加結構信息->DataFrame,增加類->DataSet
三.DataFrame操作
SparkSession是Spark最新的SQL查詢起始點
命令行里SparkSession的對象叫做spark
創建DataFrame方式:
1.通過Spark的數據源進行創建
spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
2.從一個存在的RDD進行轉換
3.還可以從Hive Table進行查詢返回
df.show:打印數據集結果
語法風格:一種是傳統SQL語句,一種是DSL以函數風格調用
SQL風格:
視圖:只能查不能改
表:能查能改
createTempView:創建臨時視圖
createGlobalView:創建全局視圖,訪問的時候加全局限定名,修飾視圖名
spark.sql:傳入SQL語句
show:顯示
DSL風格:
printSchema:打印表結構
select函數的參數即是列名,也是要計算的函數體語句
$"name":傳參的時候,不會與后面的數據進行計算來得到列名。遍歷每一行的時候,引用name列的值
好處一是編寫簡單,一個函數可以完成很多SQL語句,二是有類型檢查
RDD與DataFrame轉換
如果需要RDD與DF或者DS之間操作,那么都需要引入 import spark.implicits._ 【spark不是包名,而是sparkSession對象的名稱】
手動轉換:toDF("name","age"),轉為DataFrame並制定Schema
反射轉換:
(1)創建一個樣例類
scala> case class People(name:String, age:Int)
(2)根據樣例類將RDD轉換為DataFrame
scala> peopleRDD.map{ x => val para = x.split(",");People(para(0),para(1).trim.toInt)}.toDF
res2: org.apache.spark.sql.DataFrame = [name: string, age: int]
就是RDD的范型要是一個類,toDF可以反射
編程方式:
(1)導入所需的類型
scala> import org.apache.spark.sql.types._
import org.apache.spark.sql.types._
(2)創建Schema
scala> val structType: StructType = StructType(StructField("name", StringType) :: StructField("age", IntegerType) :: Nil)
structType: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,IntegerType,true))
(3)導入所需的類型
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row
(4)根據給定的類型創建二元組RDD
scala> val data = peopleRDD.map{ x => val para = x.split(",");Row(para(0),para(1).trim.toInt)}
data: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:33
(5)根據數據及給定的schema創建DataFrame
scala> val dataFrame = spark.createDataFrame(data, structType)
dataFrame: org.apache.spark.sql.DataFrame = [name: string, age: int]
手動構建DataFrame需要的源RDD數據和數據結構
DataFrame轉為為RDD
DataFrame的rdd方法,RDD里的泛型為Row
四.DataSet操作
Dataset是具有強類型的數據集合,需要提供對應的類型信息
創建:
1)創建一個樣例類
scala> case class Person(name: String, age: Long)
defined class Person
2)創建DataSet
scala> val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
RDD轉換為DataSet
1)創建一個RDD
scala> val peopleRDD = sc.textFile("examples/src/main/resources/people.txt")
peopleRDD: org.apache.spark.rdd.RDD[String] = examples/src/main/resources/people.txt MapPartitionsRDD[3] at textFile at <console>:27
2)創建一個樣例類
scala> case class Person(name: String, age: Long)
defined class Person
3)將RDD轉化為DataSet
scala> peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS()
DataSet轉為RDD:
調用rdd方法即可
DataFrame與DataSet互相操作:
1.DataFrame轉為DataSet
思路是提供類信息
(1)導入隱式轉換
import spark.implicits._
(2)創建樣例類
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
(3)轉換
val testDS = testDF.as[Coltest]
2.DataSet轉為DataFrame
這個很簡單,因為只是把case class封裝成Row
(1)導入隱式轉換
import spark.implicits._
(2)轉換
val testDF = testDS.toDF
RDD,DataFrame,DataSet關系
RDD (Spark1.0) —> Dataframe(Spark1.3) —> Dataset(Spark1.6)
如果同樣的數據都給到這三個數據結構,他們分別計算之后,都會給出相同的結果。不同是的他們的執行效率和執行方式。
在后期的Spark版本中,DataSet會逐步取代RDD和DataFrame成為唯一的API接口。
五.IDEA寫Spark SQL
引入Spark SQL依賴
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
六.自定義函數
udf:
SparkSession的udf.register注冊自定義的函數,一個函數名一個函數體 。用於遍歷每一項時使用
自定義聚合函數:
弱類型用戶自定義聚合函數:通過繼承UserDefinedAggregateFunction來實現用戶自定義聚合函數
import org.apache.spark.sql.expressions.MutableAggregationBuffer import org.apache.spark.sql.expressions.UserDefinedAggregateFunction import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession object MyAverage extends UserDefinedAggregateFunction { // 聚合函數輸入參數的數據類型
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil) // 聚合緩沖區中值得數據類型
def bufferSchema: StructType = { StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) } // 返回值的數據類型
def dataType: DataType = DoubleType // 對於相同的輸入是否一直返回相同的輸出。 def deterministic: Boolean = true // 初始化
def initialize(buffer: MutableAggregationBuffer): Unit = { // 存工資的總額 buffer(0) = 0L // 存工資的個數 buffer(1) = 0L } // 相同Execute間的數據合並。 def update(buffer: MutableAggregationBuffer, input: Row): Unit =
{ if (!input.isNullAt(0))
{ buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1 } } // 不同Execute間的數據合並 def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // 計算最終結果 def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1) } // 注冊函數
spark.udf.register("myAverage", MyAverage) val df = spark.read.json("examples/src/main/resources/employees.json") df.createOrReplaceTempView("employees")
df.show() // +-------+------+ // | name|salary| // +-------+------+ // |Michael| 3000| // | Andy| 4500| // | Justin| 3500| // | Berta| 4000| // +-------+------+ val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show() // +--------------+ // |average_salary| // +--------------+ // | 3750.0| // +--------------+
強類型用戶自定義聚合函數:
通過繼承Aggregator來實現強類型自定義聚合函數。可以不用取buffer的序列,直接通過字段名獲得值
使用的時候先用toColumn.name轉為列,DSL方式來用
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary") val result = ds.select(averageSalary)
result.show()
七.不同數據源的讀取與保存
1.通用加載/保存方法
手動指定選項:
SparkSession提供的read.load方法用於通用加載數據,使用write.save保存數據
format方法指定數據類型
sql方法指定要在文件上運行的sql語句
文件保存選項:
mode,saveMode方法設置
Scala/Java |
Any Language |
Meaning |
SaveMode.ErrorIfExists(default) |
"error"(default) |
如果文件存在,則報錯 |
SaveMode.Append |
"append" |
追加 |
SaveMode.Overwrite |
"overwrite" |
覆寫 |
SaveMode.Ignore |
"ignore" |
數據存在,則忽略 |
2.Hive
Hive里的語句除了跑SQL,也可以操作數據庫和表
Hive分Spark SQL自帶的內置Hive和外部Hive
配置:本地使用 --conf spark.sql.warehouse.dir=hdfs://hadoop102/spark-wearhouse 確定內置Hive的位置
注意:如果你使用的是內部的Hive,在Spark2.0之后,spark.sql.warehouse.dir用於指定數據倉庫的地址,如果你需要是用HDFS作為路徑,那么需要將core-site.xml和hdfs-site.xml 加入到Spark conf目錄,否則只會創建master節點上的warehouse目錄,查詢時會出現文件找不到的問題,這是需要使用HDFS,則需要將metastore刪除,重啟集群
外部Hive
1) 將Hive中的hive-site.xml拷貝或者軟連接到Spark安裝目錄下的conf目錄下。
2) 打開spark shell,注意帶上訪問Hive元數據庫的JDBC客戶端
$ bin/spark-shell --jars mysql-connector-java-5.1.27-bin.jar
Saprk SQL CLI
Spark SQL CLI可以很方便的在本地運行Hive元數據服務以及從命令行執行查詢任務。在Spark目錄下執行如下
代碼里使用Hive
1)添加依賴:
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>
(2)創建SparkSession時需要添加hive支持(紅色部分)
val warehouseLocation: String = new File("spark-warehouse").getAbsolutePath
val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
注意:藍色部分為使用內置Hive需要指定一個Hive倉庫地址。若使用的是外部Hive,則需要將hive-site.xml添加到ClassPath下。
第四章.SpardStreaming