運行環境
本文的具體運行環境如下:
- CentOS 7.6
- Spark 2.4
- Hadoop 2.6.0
- Java JDK 1.8
- Scala 2.10.5
一、下載安裝
首先在官網
https://spark.apache.org/downloads.html
下載對應版本的Spark
丟到你的服務器上 自己的路徑 比如 /user/hadoop/My_Spark
解壓
tar -xvf XXX.tar.gz(你的壓縮包名稱)
然后 記錄你的 路徑 /user/hadoop/My_Spark/spark-2.4.0-bin-hadoop2.7
配置spark用戶權限
sudo chown -R hadoop:hadoop ./spark # 此處的 hadoop 為你的用戶名 ./spark為你的路徑名
安裝后,需要在 ./conf/spark-env.sh 中修改 Spark 的 Classpath,執行如下命令拷貝一個配置文件:
- cd /user/hadoop/My_Spark/spark-2.4.0-bin-hadoop2.7
- cp ./conf/spark-env.sh.template ./conf/spark-env.sh
編輯 ./conf/spark-env.sh(vim ./conf/spark-env.sh
) ,在最后面加上如下一行:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
保存后,Spark 就可以啟動運行了。
當然 還少不了設置環境變量
vi ~/.bash_profile
在最后加入
# spark
export SPARK_HOME= (你的Spark路徑)
export PATH=$PATH:$SPARK_HOME/bin
運行 Spark 示例
注意,必須安裝 Hadoop 才能使用 Spark,但如果使用 Spark 過程中沒用到 HstudentS,不啟動 Hadoop 也是可以的。此外,接下來教程中出現的命令、目錄,若無說明,則一般以 Spark 的安裝目錄(/usr/local/spark)為當前路徑,請注意區分。
在 ./examples/src/main 目錄下有一些 Spark 的示例程序,有 Scala、Java、Python、R 等語言的版本。我們可以先運行一個示例程序 SparkPi(即計算 π 的近似值),執行如下命令:
- cd /user/hadoop/My_Spark/spark-2.4.0-bin-hadoop2.7 #你的路徑
- ./bin/run-example SparkPi
通過 Spark Shell 進行交互分析
Spark shell 提供了簡單的方式來學習 API,也提供了交互的方式來分析數據。Spark Shell 支持 Scala 和 Python,本教程選擇使用 Scala 來進行介紹。
Scala 是一門現代的多范式編程語言,志在以簡練、優雅及類型安全的方式來表達常用編程模式。它平滑地集成了面向對象和函數語言的特性。Scala 運行於 Java 平台(JVM,Java 虛擬機),並兼容現有的 Java 程序。
Scala 是 Spark 的主要編程語言,如果僅僅是寫 Spark 應用,並非一定要用 Scala,用 Java、Python 都是可以的。使用 Scala 的優勢是開發效率更高,代碼更精簡,並且可以通過 Spark Shell 進行交互式實時查詢,方便排查問題。
執行如下命令啟動 Spark Shell:
- ./bin/spark-shell
啟動成功后如圖所示,會有 “scala >” 的命令提示符。
成功啟動Spark Shell
基礎操作
Spark 的主要抽象是分布式的元素集合(distributed collection of items),稱為RDD(Resilient Distributed Dataset,彈性分布式數據集),它可被分發到集群各個節點上,進行並行操作。RDDs 可以通過 Hadoop InputFormats 創建(如 HstudentS),或者從其他 RDDs 轉化而來。
我們從本地路徑讀取一個預先准備好的student.txt文件
student.txt 文件內容如下 有四個字段 id name age score
在交互式窗口中輸入
import org.apache.spark.sql.types._
import org.apache.spark.sql.SparkSession
val spark=SparkSession.builder().getOrCreate()
val student = spark.read.option("header", true).option("sep",",").schema("id int,name string,age int,score int").csv("file:///u01/isi/app/hadoop/run/student.txt")
這里student是 spark的一個DataFrameReader 也就是RDD DataFrame 通過讀取本地文件獲得的。
整個相當於一個臨時的表
我們這里以CSV格式讀入 分隔符為 , 然后首行開啟 .schema表示以指定的模式讀入
代碼中通過 “file://” 前綴指定讀取本地文件。Spark shell 默認是讀取 HstudentS 中的文件,需要先上傳文件到 HstudentS 中,否則會有“org.apache.hadoop.mapred.InvalidInputException: Input path does not exist:”的錯誤。
讀入操作詳解
DataFrameReader支持若干種操作方式 可使用.option在后面具體加入參數。
截取部分參數如下
sep (default ,) |
設置單個字符作為每個字段和值的分隔符。 |
encoding (default UTF-8) |
根據給定的編碼類型解碼CSV文件。 |
quote (default ") |
設置用於轉義引號值的單個字符,分隔符可以是該值的一部分。如果要關閉引號,需要設置的不是null,而是一個空字符串。此行為與com. database .spark.csv不同。 |
escape (default \) |
設置一個字符,用於轉義已引用值中的引號 |
charToEscapeQuoteEscaping (default escape or \0) |
設置用於轉義引號字符的單個字符。當轉義和引號字符不同時,默認值為轉義字符,否則為\0。 |
comment (default empty string) |
:設置一個字符,用於跳過以該字符開頭的行。默認情況下,它是禁用的。 |
header (default false) |
使用第一行作為列的名稱。 |
DataFrame基本動作運算
可以用show() 方法來展示數據,show有以下幾種不同的使用方式:
show():顯示所有數據
show(n) :顯示前n條數據
show(true): 最多顯示20個字符,默認為true
show(false): 去除最多顯示20個字符的限制
show(n, true):顯示前n條並最多顯示20個自負
student.show() student.show(3) student.show(true) student.show(false) student.show(3,true)
輸入
student.show(student.count().toInt)

按成績倒序排序輸出
輸入student.sort(student("score").desc).show(student.count().toInt)
describe(cols: String*):獲取指定字段的統計信息
這個方法可以動態的傳入一個或多個String類型的字段名,結果仍然為DataFrame對象,用於統計數值類型字段的統計值,比如count, mean, stddev, min, max等。


求平均分數並輸出
student.agg(mean("score")).show //或者下面這種 兩種方式都可以 student.describe("score").where("summary ='mean'").show()
單個DataFrame操作
使用where篩選條件
where(conditionExpr: String):SQL語言中where關鍵字后的條件 ,傳入篩選條件表達式,可以用and和or。得到DataFrame類型的返回結果, 比如我們想得到用戶1或者使用助手1的操作記錄:
student.where("user=1 or type ='助手1'").show()
或者如上圖
student.describe("score").where("summary ='mean'").show()
select:獲取指定字段值
根據傳入的String類型字段名,獲取指定字段的值,以DataFrame類型返回,比如我們想要查找user和type兩列:
student.select("user","type").show()
好了,本次教程到此結束。剩下的自己看文檔吧。
退出
:quit