【注】該系列文章以及使用到安裝包/測試數據 可以在《傾情大奉送--Spark入門實戰系列》獲取
1、Spark編程模型
1.1 術語定義
l應用程序(Application): 基於Spark的用戶程序,包含了一個Driver Program 和集群中多個的Executor;
l驅動程序(Driver Program):運行Application的main()函數並且創建SparkContext,通常用SparkContext代表Driver Program;
l執行單元(Executor): 是為某Application運行在Worker Node上的一個進程,該進程負責運行Task,並且負責將數據存在內存或者磁盤上,每個Application都有各自獨立的Executors;
l集群管理程序(Cluster Manager): 在集群上獲取資源的外部服務(例如:Standalone、Mesos或Yarn);
l操作(Operation):作用於RDD的各種操作分為Transformation和Action;
1.2 模型組成
Spark應用程序可分兩部分:Driver部分和Executor部分
1.2.1 Driver部分
Driver部分主要是對SparkContext進行配置、初始化以及關閉。初始化SparkContext是為了構建Spark應用程序的運行環境,在初始化SparkContext,要先導入一些Spark的類和隱式轉換;在Executor部分運行完畢后,需要將SparkContext關閉。
1.2.2 Executor部分
Spark應用程序的Executor部分是對數據的處理,數據分三種:
1.2.2.1 原生數據
包含原生的輸入數據和輸出數據
l對於輸入原生數據,Spark目前提供了兩種:
Ø Scala集合數據集:如Array(1,2,3,4,5),Spark使用parallelize方法轉換成RDD
Ø Hadoop數據集:Spark支持存儲在hadoop上的文件和hadoop支持的其他文件系統,如本地文件、HBase、SequenceFile和Hadoop的輸入格式。例如Spark使用txtFile方法可以將本地文件或HDFS文件轉換成RDD
l對於輸出數據,Spark除了支持以上兩種數據,還支持scala標量
Ø 生成Scala標量數據,如count(返回RDD中元素的個數)、reduce、fold/aggregate;返回幾個標量,如take(返回前幾個元素)。
Ø 生成Scala集合數據集,如collect(把RDD中的所有元素倒入 Scala集合類型)、lookup(查找對應key的所有值)。
Ø 生成hadoop數據集,如saveAsTextFile、saveAsSequenceFile
1.2.2.2 RDD
RDD具體在下一節中詳細描述,RDD提供了四種算子:
l輸入算子:將原生數據轉換成RDD,如parallelize、txtFile等
l轉換算子:最主要的算子,是Spark生成DAG圖的對象,轉換算子並不立即執行,在觸發行動算子后再提交給driver處理,生成DAG圖 --> Stage --> Task --> Worker執行。
l緩存算子:對於要多次使用的RDD,可以緩沖加快運行速度,對重要數據可以采用多備份緩存。
l行動算子:將運算結果RDD轉換成原生數據,如count、reduce、collect、saveAsTextFile等。
1.2.2.3 共享變量
在Spark運行時,一個函數傳遞給RDD內的patition操作時,該函數所用到的變量在每個運算節點上都復制並維護了一份,並且各個節點之間不會相互影響。但是在Spark Application中,可能需要共享一些變量,提供Task或驅動程序使用。Spark提供了兩種共享變量:
l廣播變量(Broadcast Variables):可以緩存到各個節點的共享變量,通常為只讀
– 廣播變量緩存到各個節點的內存中,而不是每個 Task
– 廣播變量被創建后,能在集群中運行的任何函數調用
– 廣播變量是只讀的,不能在被廣播后修改
– 對於大數據集的廣播, Spark 嘗試使用高效的廣播算法來降低通信成本
使用方法:
val broadcastVar = sc.broadcast(Array(1, 2, 3))
l累計器:只支持加法操作的變量,可以實現計數器和變量求和。用戶可以調用SparkContext.accumulator(v)創建一個初始值為v的累加器,而運行在集群上的Task可以使用“+=”操作,但這些任務卻不能讀取;只有驅動程序才能獲取累加器的值。
使用方法:
val accum = sc.accumulator(0)
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum + = x)
accum.value
val num=sc.parallelize(1 to 100)
2、RDD
2.1 術語定義
l彈性分布式數據集(RDD): Resillient Distributed Dataset,Spark的基本計算單元,可以通過一系列算子進行操作(主要有Transformation和Action操作);
l有向無環圖(DAG):Directed Acycle graph,反應RDD之間的依賴關系;
l有向無環圖調度器(DAG Scheduler):根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler;
l任務調度器(Task Scheduler):將Taskset提交給worker(集群)運行並回報結果;
l窄依賴(Narrow dependency):子RDD依賴於父RDD中固定的data partition;
l寬依賴(Wide Dependency):子RDD對父RDD中的所有data partition都有依賴。
2.2 RDD概念
RDD是Spark的最基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark最核心的東西,它表示已被分區,不可變的並能夠被並行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到內存中,每次對RDD數據集的操作之后的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapReduce大量的磁盤IO操作。這對於迭代運算比較常見的機器學習算法, 交互式數據挖掘來說,效率提升非常大。
RDD 最適合那種在數據集上的所有元素都執行相同操作的批處理式應用。在這種情況下, RDD 只需記錄血統中每個轉換就能還原丟失的數據分區,而無需記錄大量的數據操作日志。所以 RDD 不適合那些需要異步、細粒度更新狀態的應用 ,比如 Web 應用的存儲系統,或增量式的 Web 爬蟲等。對於這些應用,使用具有事務更新日志和數據檢查點的數據庫系統更為高效。
2.2.1 RDD的特點
1.來源:一種是從持久存儲獲取數據,另一種是從其他RDD生成
2.只讀:狀態不可變,不能修改
3.分區:支持元素根據 Key 來分區 ( Partitioning ) ,保存到多個結點上,還原時只會重新計算丟失分區的數據,而不會影響整個系統
4.路徑:在 RDD 中叫世族或血統 ( lineage ) ,即 RDD 有充足的信息關於它是如何從其他 RDD 產生而來的
5.持久化:可以控制存儲級別(內存、磁盤等)來進行持久化
6.操作:豐富的動作 ( Action ) ,如Count、Reduce、Collect和Save 等
2.2.2 RDD基礎數據類型
目前有兩種類型的基礎RDD:並行集合(Parallelized Collections):接收一個已經存在的Scala集合,然后進行各種並行計算。 Hadoop數據集(Hadoop Datasets):在一個文件的每條記錄上運行函數。只要文件系統是HDFS,或者hadoop支持的任意存儲系統即可。這兩種類型的RDD都可以通過相同的方式進行操作,從而獲得子RDD等一系列拓展,形成lineage血統關系圖。
1. 並行化集合
並行化集合是通過調用SparkContext的parallelize方法,在一個已經存在的Scala集合上創建的(一個Seq對象)。集合的對象將會被拷貝,創建出一個可以被並行操作的分布式數據集。例如,下面的解釋器輸出,演示了如何從一個數組創建一個並行集合。
例如:val rdd = sc.parallelize(Array(1 to 10)) 根據能啟動的executor的數量來進行切分多個slice,每一個slice啟動一個Task來進行處理。
val rdd = sc.parallelize(Array(1 to 10), 5) 指定了partition的數量
2. Hadoop數據集
Spark可以將任何Hadoop所支持的存儲資源轉化成RDD,如本地文件(需要網絡文件系統,所有的節點都必須能訪問到)、HDFS、Cassandra、HBase、Amazon S3等,Spark支持文本文件、SequenceFiles和任何Hadoop InputFormat格式。
(1)使用textFile()方法可以將本地文件或HDFS文件轉換成RDD
支持整個文件目錄讀取,文件可以是文本或者壓縮文件(如gzip等,自動執行解壓縮並加載數據)。如textFile(”file:///dfs/data”)
支持通配符讀取,例如:
val rdd1 = sc.textFile("file:///root/access_log/access_log*.filter");
val rdd2=rdd1.map(_.split("t")).filter(_.length==6)
rdd2.count()
......
14/08/20 14:44:48 INFO HadoopRDD: Input split: file:/root/access_log/access_log.20080611.decode.filter:134217728+20705903
......
textFile()可選第二個參數slice,默認情況下為每一個block分配一個slice。用戶也可以通過slice指定更多的分片,但不能使用少於HDFS block的分片數。
(2)使用wholeTextFiles()讀取目錄里面的小文件,返回(用戶名、內容)對
(3)使用sequenceFile[K,V]()方法可以將SequenceFile轉換成RDD。SequenceFile文件是Hadoop用來存儲二進制形式的key-value對而設計的一種平面文件(Flat File)。
(4)使用SparkContext.hadoopRDD方法可以將其他任何Hadoop輸入類型轉化成RDD使用方法。一般來說,HadoopRDD中每一個HDFS block都成為一個RDD分區。
此外,通過Transformation可以將HadoopRDD等轉換成FilterRDD(依賴一個父RDD產生)和JoinedRDD(依賴所有父RDD)等。
2.2.3 例子:控制台日志挖掘
假設網站中的一個 WebService 出現錯誤,我們想要從數以 TB 的 HDFS 日志文件中找到問題的原因,此時我們就可以用 Spark 加載日志文件到一組結點組成集群的 RAM 中,並交互式地進行查詢。以下是代碼示例:
首先行 1 從 HDFS 文件中創建出一個 RDD ,而行 2 則衍生出一個經過某些條件過濾后的 RDD 。行 3 將這個 RDD errors 緩存到內存中,然而第一個 RDD lines 不會駐留在內存中。這樣做很有必要,因為 errors 可能非常小,足以全部裝進內存,而原始數據則會非常龐大。經過緩存后,現在就可以反復重用 errors 數據了。我們這里做了兩個操作,第一個是統計 errors 中包含 MySQL 字樣的總行數,第二個則是取出包含 HDFS 字樣的行的第三列時間,並保存成一個集合。
這里要注意的是前面曾經提到過的 Spark 的延遲處理。 Spark 調度器會將 filter 和 map 這兩個轉換保存到管道,然后一起發送給結點去計算。
2.3 轉換與操作
對於RDD可以有兩種計算方式:轉換(返回值還是一個RDD)與操作(返回值不是一個RDD)
l轉換(Transformations) (如:map, filter, groupBy, join等),Transformations操作是Lazy的,也就是說從一個RDD轉換生成另一個RDD的操作不是馬上執行,Spark在遇到Transformations操作時只會記錄需要這樣的操作,並不會去執行,需要等到有Actions操作的時候才會真正啟動計算過程進行計算。
l操作(Actions) (如:count, collect, save等),Actions操作會返回結果或把RDD數據寫到存儲系統中。Actions是觸發Spark啟動計算的動因。
2.3.1 轉換
reduce(func) |
通過函數func聚集數據集中的所有元素。Func函數接受2個參數,返回一個值。這個函數必須是關聯性的,確保可以被正確的並發執行 |
collect() |
在Driver的程序中,以數組的形式,返回數據集的所有元素。這通常會在使用filter或者其它操作后,返回一個足夠小的數據子集再使用,直接將整個RDD集Collect返回,很可能會讓Driver程序OOM |
count() |
返回數據集的元素個數 |
take(n) |
返回一個數組,由數據集的前n個元素組成。注意,這個操作目前並非在多個節點上,並行執行,而是Driver程序所在機器,單機計算所有的元素(Gateway的內存壓力會增大,需要謹慎使用) |
first() |
返回數據集的第一個元素(類似於take(1) |
saveAsTextFile(path) |
將數據集的元素,以textfile的形式,保存到本地文件系統,hdfs或者任何其它hadoop支持的文件系統。Spark將會調用每個元素的toString方法,並將它轉換為文件中的一行文本 |
saveAsSequenceFile(path) |
將數據集的元素,以sequencefile的格式,保存到指定的目錄下,本地系統,hdfs或者任何其它hadoop支持的文件系統。RDD的元素必須由key-value對組成,並都實現了Hadoop的Writable接口,或隱式可以轉換為Writable(Spark包括了基本類型的轉換,例如Int,Double,String等等) |
foreach(func) |
在數據集的每一個元素上,運行函數func。這通常用於更新一個累加器變量,或者和外部存儲系統做交互 |
2.3.2 操作
map(func) |
返回一個新的分布式數據集,由每個原元素經過func函數轉換后組成 |
filter(func) |
返回一個新的數據集,由經過func函數后返回值為true的原元素組成 |
flatMap(func) |
類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) |
flatMap(func) |
類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) |
sample(withReplacement, frac, seed) |
根據給定的隨機種子seed,隨機抽樣出數量為frac的數據 |
union(otherDataset) |
返回一個新的數據集,由原數據集和參數聯合而成 |
groupByKey([numTasks]) |
在一個由(K,V)對組成的數據集上調用,返回一個(K,Seq[V])對的數據集。注意:默認情況下,使用8個並行任務進行分組,你可以傳入numTask可選參數,根據數據量設置不同數目的Task |
reduceByKey(func, [numTasks]) |
在一個(K,V)對的數據集上使用,返回一個(K,V)對的數據集,key相同的值,都被使用指定的reduce函數聚合到一起。和groupbykey類似,任務的個數是可以通過第二個可選參數來配置的。 |
join(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)類型的數據集上調用,返回一個(K,(V,W))對,每個key中的所有元素都在一起的數據集 |
groupWith(otherDataset, [numTasks]) |
在類型為(K,V)和(K,W)類型的數據集上調用,返回一個數據集,組成元素為(K, Seq[V], Seq[W]) Tuples。這個操作在其它框架,稱為CoGroup |
cartesian(otherDataset) |
笛卡爾積。但在數據集T和U上調用時,返回一個(T,U)對的數據集,所有元素交互進行笛卡爾積。 |
flatMap(func) |
類似於map,但是每一個輸入元素,會被映射為0到多個輸出元素(因此,func函數的返回值是一個Seq,而不是單一元素) |
2.4 依賴類型
在 RDD 中將依賴划分成了兩種類型:窄依賴 (Narrow Dependencies) 和寬依賴 (Wide Dependencies) 。窄依賴是指 父 RDD 的每個分區都只被子 RDD 的一個分區所使用 。相應的,那么寬依賴就是指父 RDD 的分區被多個子 RDD 的分區所依賴。例如, Map 就是一種窄依賴,而 Join 則會導致寬依賴 ( 除非父 RDD 是 hash-partitioned ,見下圖 ) 。
l窄依賴(Narrow Dependencies )
Ø 子RDD 的每個分區依賴於常數個父分區(即與數據規模無關)
Ø 輸入輸出一對一的算子,且結果RDD 的分區結構不變,主要是map 、flatMap
Ø 輸入輸出一對一,但結果RDD 的分區結構發生了變化,如union 、coalesce
Ø 從輸入中選擇部分元素的算子,如filter 、distinct 、subtract 、sample
l寬依賴(Wide Dependencies )
Ø 子RDD 的每個分區依賴於所有父RDD 分區
Ø 對單個RDD 基於Key 進行重組和reduce,如groupByKey 、reduceByKey ;
Ø 對兩個RDD 基於Key 進行join 和重組,如join
2.5 RDD緩存
Spark可以使用 persist 和 cache 方法將任意 RDD 緩存到內存、磁盤文件系統中。緩存是容錯的,如果一個 RDD 分片丟失,可以通過構建它的 transformation自動重構。被緩存的 RDD 被使用的時,存取速度會被大大加速。一般的executor內存60%做 cache, 剩下的40%做task。
Spark中,RDD類可以使用cache() 和 persist() 方法來緩存。cache()是persist()的特例,將該RDD緩存到內存中。而persist可以指定一個StorageLevel。StorageLevel的列表可以在StorageLevel 伴生單例對象中找到:
object StorageLevel {
val NONE = new StorageLevel(false, false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false) // Tachyon
}
// 其中,StorageLevel 類的構造器參數如下:
class StorageLevel private( private var useDisk_ : Boolean, private var useMemory_ : Boolean, private var useOf
Spark的不同StorageLevel ,目的滿足內存使用和CPU效率權衡上的不同需求。我們建議通過以下的步驟來進行選擇:
l如果你的RDDs可以很好的與默認的存儲級別(MEMORY_ONLY)契合,就不需要做任何修改了。這已經是CPU使用效率最高的選項,它使得RDDs的操作盡可能的快;
l如果不行,試着使用MEMORY_ONLY_SER並且選擇一個快速序列化的庫使得對象在有比較高的空間使用率的情況下,依然可以較快被訪問;
l盡可能不要存儲到硬盤上,除非計算數據集的函數,計算量特別大,或者它們過濾了大量的數據。否則,重新計算一個分區的速度,和與從硬盤中讀取基本差不多快;
l如果你想有快速故障恢復能力,使用復制存儲級別(例如:用Spark來響應web應用的請求)。所有的存儲級別都有通過重新計算丟失數據恢復錯誤的容錯機制,但是復制存儲級別可以讓你在RDD上持續的運行任務,而不需要等待丟失的分區被重新計算;
l如果你想要定義你自己的存儲級別(比如復制因子為3而不是2),可以使用StorageLevel 單例對象的apply()方法;
l在不使用cached RDD的時候,及時使用unpersist方法來釋放它。
3、RDD動手實戰
在這里我們將對RDD的轉換與操作進行動手實戰,首先通過實驗我們能夠觀測到轉換的懶執行,並通過toDebugString()去查看RDD的LineAge,查看RDD在運行過程中的變換過程,接着演示了從文件讀取數據並進行大數據經典的單詞計數實驗,最后對搜狗提供的搜索數據進行查詢,在此過程中演示緩存等操作。
3.1 啟動Spark Shell
3.1.1 啟動Hadoop
在隨后的實驗中將使用到HDFS文件系統,需要進行啟動
$cd /app/hadoop/hadoop-2.2.0/sbin
$./start-dfs.sh
3.1.2 啟動Spark
$cd /app/hadoop/spark-1.1.0/sbin
$./start-all.sh
3.1.3 啟動Spark Shell
在spark客戶端(這里在hadoop1節點),使用spark-shell連接集群,各個Excetor分配的核數和內存可以根據需要進行指定
$cd /app/hadoop/spark-1.1.0/bin
$./spark-shell --master spark://hadoop1:7077 --executor-memory 1024m --driver-memory 1024m
啟動后查看啟動情況,如下圖所示:
3.2 上傳測試數據
搜狗日志數據可以從http://download.labs.sogou.com/dl/q.html 下載,其中完整版大概2GB左右,文件中字段分別為:訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL。其中SogouQ1.txt、SogouQ2.txt、SogouQ3.txt分別是用head -n 或者tail -n 從SogouQ數據日志文件中截取,分別包含100萬,200萬和1000萬筆數據,這些測試數據也放在該系列配套資源的data\sogou目錄下。
搜狗日志數據放在data\sogou下,把該目錄下的SogouQ1.txt、SogouQ2.txt和SogouQ3.txt解壓,然后通過下面的命令上傳到HDFS中/sogou目錄中
cd /home/hadoop/upload/
ll sogou
tar -zxf *.gz
hadoop fs -mkdir /sogou
hadoop fs -put sogou/SogouQ1.txt /sogou
hadoop fs -put sogou/SogouQ2.txt /sogou
hadoop fs -put sogou/SogouQ3.txt /sogou
hadoop fs -ls /sogou
3.3 轉換與操作
3.3.1 並行化集合例子演示
在該例子中通過parallelize方法定義了一個從1~10的數據集,然后通過map(_*2)對數據集每個數乘以2,接着通過filter(_%3==0)過濾被3整除的數字,最后使用toDebugString顯示RDD的LineAge,並通過collect計算出最終的結果。
val num=sc.parallelize(1 to 10)
val doublenum = num.map(_*2)
val threenum = doublenum.filter(_ % 3 == 0)
threenum.toDebugString
threenum.collect
在下圖運行結果圖中,我們可以看到RDD的LineAge演變,通過paralelize方法建立了一個ParalleCollectionRDD,使用map()方法后該RDD為MappedRDD,接着使用filter()方法后轉變為FilteredRDD。
在下圖中使用collect方法時觸發運行作業,通過任務計算出結果
以下語句和collect一樣,都會觸發作業運行
num.reduce (_ + _)
num.take(5)
num.first
num.count
num.take(5).foreach(println)
運行的情況可以通過頁面進行監控,在Spark Stages頁面中我們可以看到運行的詳細情況,包括運行的Stage id號、Job描述、提交時間、運行時間、Stage情況等,可以點擊作業描述查看更加詳細的情況:
在這個頁面上我們將看到三部分信息:作業的基本信息、Executor信息和Tasks的信息。特別是Tasks信息可以了解到作業的分片情況,運行狀態、數據獲取位置、耗費時間及所處的Executor等信息
3.3.2 Shuffle操作例子演示
在該例子中通過parallelize方法定義了K-V鍵值對數據集合,通過sortByKey()進行按照Key值進行排序,然后通過collect方法觸發作業運行得到結果。groupByKey()為按照Key進行歸組,reduceByKey(_+_)為按照Key進行累和,這三個方法的計算和前面的例子不同,因為這些RDD類型為寬依賴,在計算過程中發生了Shuffle動作。
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
kv1.sortByKey().collect
kv1.groupByKey().collect
kv1.reduceByKey(_+_).collect
調用groupByKey()運行結果
調用reduceByKey ()運行結果
我們在作業運行監控界面上能夠看到:每個作業分為兩個Stage,在第一個Stage中進行了Shuffle Write,在第二個Stage中進行了Shuffle Read。
在Stage詳細運行頁面中可以觀察第一個Stage運行情況,內容包括:Stage運行的基本信息、每個Executor運行信息和任務的運行信息,特別在任務運行中我們可以看到任務的狀態、數據讀取的位置、機器節點、耗費時間和Shuffle Write時間等。
在下面進行了distinct、union、join和cogroup等操作中涉及到Shuffle過程
val kv2=sc.parallelize(List(("A",4),("A",4),("C",3),("A",4),("B",5)))
kv2.distinct.collect
kv1.union(kv2).collect
val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
kv1.join(kv3).collect
kv1.cogroup(kv3).collect
3.3.3文件例子讀取
這個是大數據經典的例子,在這個例子中通過不同方式讀取HDFS中的文件,然后進行單詞計數,最終通過運行作業計算出結果。本例子中通過toDebugString可以看到RDD的變化,
第一步 按照文件夾讀取計算每個單詞出現個數
在該步驟中RDD的變換過程為:HadoopRDD->MappedRDD-> FlatMappedRDD->MappedRDD->PairRDDFunctions->ShuffleRDD->MapPartitionsRDD
val text = sc.textFile("hdfs://hadoop1:9000/class3/directory/")
text.toDebugString
val words=text.flatMap(_.split(" "))
val wordscount=words.map(x=>(x,1)).reduceByKey(_+_)
wordscount.toDebugString
wordscount.collect
RDD類型的變化過程如下:
l 首先使用textFile()讀取HDFS數據形成MappedRDD,這里有可能有疑問,從HDFS讀取的數據不是HadoopRDD,怎么變成了MappedRDD。回答這個問題需要從Spark源碼進行分析,在sparkContext類中的textFile()方法讀取HDFS文件后,使用了map()生成了MappedRDD。
l 然后使用flatMap()方法對文件內容按照空格拆分單詞,拆分形成FlatMappedRDD
l 其次使用map(x=>(x(1),1))對上步驟拆分的單詞形成(單詞,1)數據對,此時生成的MappedRDD,最后使用reduceByKey()方法對單詞的頻度統計,由此生成ShuffledRDD,並由collect運行作業得出結果。
第二步 按照匹配模式讀取計算單詞個數
val rdd2 = sc.textFile("hdfs://hadoop1:9000/class3/directory/*.txt")
rdd2.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect
第三步 讀取gz壓縮文件計算單詞個數
val rdd3 = sc.textFile("hdfs://hadoop1:8000/class2/test.txt.gz")
rdd3.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect
3.3.4 搜狗日志查詢例子演示
搜狗日志數據可以從http://download.labs.sogou.com/dl/q.html 下載,其中完整版大概2GB左右,文件中字段分別為:訪問時間\t用戶ID\t[查詢詞]\t該URL在返回結果中的排名\t用戶點擊的順序號\t用戶點擊的URL。其中SogouQ1.txt、SogouQ2.txt、SogouQ3.txt分別是用head -n 或者tail -n 從SogouQ數據日志文件中截取,分別包含100萬,200萬和1000萬筆數據,這些測試數據也放在該系列配套資源的data\sogou目錄下。
第一步 上傳測試數據
搜狗日志數據放在data\sogou下,把該目錄下的SogouQ1.txt、SogouQ2.txt和SogouQ3.txt解壓,然后通過下面的命令上傳到HDFS中/sogou目錄中
cd /home/hadoop/upload/
ll sogou
tar -zxf *.gz
hadoop fs -mkdir /sogou
hadoop fs -put sogou/SogouQ1.txt /sogou
hadoop fs -put sogou/SogouQ2.txt /sogou
hadoop fs -put sogou/SogouQ3.txt /sogou
hadoop fs -ls /sogou
第二步 查詢搜索結果排名第1點擊次序排在第2的數據
val rdd1 = sc.textFile("hdfs://hadoop1:9000/sogou/SogouQ1.txt")
val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)
rdd2.count()
val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)
rdd3.count()
rdd3.toDebugString
該命令運行的過程如下:
l 首先使用textFile()讀入SogouQ1.txt文件,讀入后由HadoopRDD轉變為MadppedRDD;
l 然后通過rdd1.map(_.split("\t"))對讀入數據使用\t分隔符進行拆分,拆分后RDD類型不變即為MadppedRDD,對這些拆分后的數據使用filter(_.length==6)過濾每行為6個字段的數據,這時數據變為FilteredRDD;
l 運行rdd2.count()啟動對rdd2計數的作業,通過運行結果可以看到該數據集為100條;
l rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)表示對rdd2的數據的第4個字段搜索結果排名第一,第5個字段點擊次序排在第二的數據進行過濾,通過count()方法運行作業得出最終的結果;
使用toDebugString可以查看rdd3的RDD詳細變換過程,如下圖所示:
第三步 Session查詢次數排行榜並把結果保存在HDFS中
val rdd4 = rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)). sortByKey(false).map(x=>(x._2,x._1))
rdd4.toDebugString
rdd4.saveAsTextFile("hdfs://hadoop1:9000/class3/output1")
該命令運行的過程如下:
l rdd4的生成比較復雜,我們分步驟進行解析,軸線map(x=>(x(1),1))是獲取每行的第二個字段(用戶Session)計數為1,然后reduceByKey(_+_)是安排Key進行累和,即按照用戶Session號進行計數求查詢次數,其次map(x=>(x._2,x._1))是把Key和Value位置互換,為后面排序提供條件,使用sortByKey(false)對數據進行按Key值進行倒排,此時需要注意的是Key為查詢次數,最后通過map(x=>(x._2,x._1)再次交換Key和Value位置,得到了(用戶Session號,查詢次數)結果。該過程RDD的變化如下圖所示:
l 計算的結果通過如下命令可以查看到,可以看到由於輸入數據存放在2個節點上,所以結果也分為兩個文件
hadoop fs -ls /class3/output1
這是使用HDFS的getmerge合並這兩個文件並進行查看
$cd /app/hadoop/hadoop-2.2.0/bin
$hdfs dfs -getmerge hdfs://hadoop1:9000/class3/output1 /home/hadoop/upload/result
$cd /home/hadoop/upload/
$head result