spark shell操作


RDD有兩種類型的操作 ,分別是Transformation(返回一個新的RDD)和Action(返回values)。

1.Transformation:根據已有RDD創建新的RDD數據集build

(1)map(func):對調用map的RDD數據集中的每個element都使用func,然后返回一個新的RDD,這個返回的數據集是分布式的數據集。

(2)filter(func) :對調用filter的RDD數據集中的每個元素都使用func,然后返回一個包含使func為true的元素構成的RDD。

(3)flatMap(func):和map很像,但是flatMap生成的是多個結果。

(4)mapPartitions(func):和map很像,但是map是每個element,而mapPartitions是每個partition。

(5)mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一個split上,所以func中應該有index。

(6)sample(withReplacement,faction,seed):抽樣。

(7)union(otherDataset):返回一個新的dataset,包含源dataset和給定dataset的元素的集合。

(8)distinct([numTasks]):返回一個新的dataset,這個dataset含有的是源dataset中的distinct的element。

(9)groupByKey(numTasks):返回(K,Seq[V]),也就是Hadoop中reduce函數接受的key-valuelist。

(10)reduceByKey(func,[numTasks]):就是用一個給定的reduce func再作用在groupByKey產生的(K,Seq[V]),比如求和,求平均數。

(11)sortByKey([ascending],[numTasks]):按照key來進行排序,是升序還是降序,ascending是boolean類型。

2.Action:在RDD數據集運行計算后,返回一個值或者將結果寫入外部存儲

(1)reduce(func):就是聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律的。

(2)collect():一般在filter或者足夠小的結果的時候,再用collect封裝返回一個數組。

(3)count():返回的是dataset中的element的個數。

(4)first():返回的是dataset中的第一個元素。

(5)take(n):返回前n個elements。

(6)takeSample(withReplacement,num,seed):抽樣返回一個dataset中的num個元素,隨機種子seed。

(7)saveAsTextFile(path):把dataset寫到一個textfile中,或者HDFS,或者HDFS支持的文件系統中,Spark把每條記錄都轉換為一行記錄,然后寫到file中。

(8)saveAsSequenceFile(path):只能用在key-value對上,然后生成SequenceFile寫到本地或者Hadoop文件系統。

(9)countByKey():返回的是key對應的個數的一個map,作用於一個RDD。

(10)foreach(func):對dataset中的每個元素都使用func。

用戶id(buyer_id),商品id(goods_id),收藏日期(dt)

用戶id   商品id    收藏日期  
10181   1000481   2010-04-04 16:54:31  
20001   1001597   2010-04-07 15:07:52  
20001   1001560   2010-04-07 15:08:27  
20042   1001368   2010-04-08 08:20:30  
20067   1002061   2010-04-08 16:45:33  
20056   1003289   2010-04-12 10:50:55  
20056   1003290   2010-04-12 11:57:35  
20056   1003292   2010-04-12 12:05:29  
20054   1002420   2010-04-14 15:24:12  
20055   1001679   2010-04-14 19:46:04  
20054   1010675   2010-04-14 15:23:53  
20054   1002429   2010-04-14 17:52:45  
20076   1002427   2010-04-14 19:35:39  
20054   1003326   2010-04-20 12:54:44  
20056   1002420   2010-04-15 11:24:49  
20064   1002422   2010-04-15 11:35:54  
20056   1003066   2010-04-15 11:43:01  
20056   1003055   2010-04-15 11:43:06  
20056   1010183   2010-04-15 11:45:24  
20056   1002422   2010-04-15 11:45:49  
20056   1003100   2010-04-15 11:45:54  
20056   1003094   2010-04-15 11:45:57  
20056   1003064   2010-04-15 11:46:04  
20056   1010178   2010-04-15 16:15:20  
20076   1003101   2010-04-15 16:37:27  
20076   1003103   2010-04-15 16:37:05  
20076   1003100   2010-04-15 16:37:18  
20076   1003066   2010-04-15 16:37:31  
20054   1003103   2010-04-15 16:40:14  
20054   1003100   2010-04-15 16:40:16  

現要求統計用戶收藏數據中,每個用戶收藏商品數量。

1.在Linux上,創建/data/spark3/wordcount目錄,用於存儲實驗所需的數據。

mkdir -p /data/spark3/wordcount  

切換目錄到/data/spark3/wordcount下,並從http://192.168.1.100:60000/allfiles/spark3/wordcount/buyer_favorite下載實驗數據。

cd /data/spark3/wordcount  
wget http://192.168.1.100:60000/allfiles/spark3/wordcount/buyer_favorite  

2.使用jps查看Hadoop以及Spark的相關進程是否已經啟動,若未啟動則執行啟動命令。

jps  
/apps/hadoop/sbin/start-all.sh  
/apps/spark/sbin/start-all.sh  

將Linux本地/data/spark3/wordcount/buyer_favorite文件,上傳到HDFS上的/myspark3/wordcount目錄下。若HDFS上/myspark3目錄不存在則需提前創建。

hadoop fs -mkdir -p /myspark3/wordcount  
hadoop fs -put /data/spark3/wordcount/buyer_favorite /myspark3/wordcount  

3.啟動spark-shell

spark-shell  

4.編寫Scala語句,統計用戶收藏數據中,每個用戶收藏商品數量。

先在spark-shell中,加載數據。

val rdd = sc.textFile("hdfs://localhost:9000/myspark3/wordcount/buyer_favorite");  

執行統計並輸出。

rdd.map(line=> (line.split('\t')(0),1)).reduceByKey(_+_).collect  

去重:使用spark-shell,對上述實驗中,用戶收藏數據文件進行統計。根據商品ID進行去重,統計用戶收藏數據中都有哪些商品被收藏。

1.在Linux上,創建/data/spark3/distinct,用於存儲實驗數據。

mkdir -p /data/spark3/distinct  

切換到/data/spark3/distinct目錄下,並從http://192.168.1.100:60000/allfiles/spark3/distinct/buyer_favorite下載實驗數據。

cd /data/spark3/distinct  
wget http://192.168.1.100:60000/allfiles/spark3/distinct/buyer_favorite  

2.使用jps查看Hadoop,Spark的進程。保證Hadoop、Spark框架相關進程為已啟動狀態。

3.將/data/spark3/distinct/buyer_favorite文件,上傳到HDFS上的/myspark3/distinct目錄下。若HDFS目錄不存在則創建。

hadoop fs -mkdir -p /myspark3/distinct  
hadoop fs -put /data/spark3/distinct/buyer_favorite /myspark3/distinct  

4.在Spark窗口,編寫Scala語句,統計用戶收藏數據中,都有哪些商品被收藏。

先加載數據,創建RDD。

val rdd = sc.textFile("hdfs://localhost:9000/myspark3/distinct/buyer_favorite");  

對RDD進行統計並將結果打印輸出。

rdd.map(line => line.split('\t')(1)).distinct.collect  

排序:電商網站都會對商品的訪問情況進行統計,現有一個goods_visit文件,存儲了電商網站中的各種商品以及此各個商品的點擊次數。

商品id(goods_id) 點擊次數(click_num)

商品ID  點擊次數  
1010037 100  
1010102 100  
1010152 97  
1010178 96  
1010280 104  
1010320 103  
1010510 104  
1010603 96  
1010637 97  

現根據商品的點擊次數進行排序,並輸出所有商品。

輸出結果樣式:

點擊次數 商品ID  
96  1010603  
96  1010178  
97  1010637  
97  1010152  
100 1010102  
100 1010037  
103 1010320  
104 1010510  
104 1010280  

1.在Linux上,創建/data/spark3/sort,用於存儲實驗數據。

mkdir -p /data/spark3/sort  

切換到/data/spark3/sort目錄下,並從http://192.168.1.100:60000/allfiles/spark3/sort/goods_visit下載實驗數據。

cd /data/spark3/sort  
wget http://192.168.1.100:60000/allfiles/spark3/sort/goods_visit  

2.將/data/spark3/sort/goods_visit文件,上傳到HDFS上的/spark3/sort/目錄下。若HDFS目錄不存在則需提前創建。

hadoop fs -mkdir -p /myspark3/sort  
hadoop fs -put /data/spark3/sort/goods_visit /myspark3/sort  

3.在Spark窗口,加載數據,將數據轉變為RDD。

val rdd1 = sc.textFile("hdfs://localhost:9000/myspark3/sort/goods_visit");  

對RDD進行統計並將結果打印輸出。

rdd1.map(line => ( line.split('\t')(1).toInt, line.split('\t')(0) ) ).sortByKey(true).collect  
4.輸出結果樣式為:


Join:現有某電商在2011年12月15日的部分交易數據。數據有訂單表orders和訂單明細表order_items,表結構及數據分別為:

orders表:(訂單id order_id, 訂單號 order_number, 買家ID buyer_id, 下單日期 create_dt)

訂單ID 訂單號 用戶ID 下單日期
52304 111215052630 176474 2011-12-15 04:58:21
52303 111215052629 178350 2011-12-15 04:45:31
52302 111215052628 172296 2011-12-15 03:12:23
52301 111215052627 178348 2011-12-15 02:37:32
52300 111215052626 174893 2011-12-15 02:18:56
52299 111215052625 169471 2011-12-15 01:33:46
52298 111215052624 178345 2011-12-15 01:04:41
52297 111215052623 176369 2011-12-15 01:02:20
52296 111215052622 178343 2011-12-15 00:38:02
52295 111215052621 178342 2011-12-15 00:18:43
52294 111215052620 178341 2011-12-15 00:14:37
52293 111215052619 178338 2011-12-15 00:13:07
order_items表:(明細ID item_id, 訂單ID order_id, 商品ID goods_id )


明細ID 訂單ID 商品ID
252578 52293 1016840
252579 52293 1014040
252580 52294 1014200
252581 52294 1001012
252582 52294 1022245
252583 52294 1014724
252584 52294 1010731
252586 52295 1023399
252587 52295 1016840
252592 52296 1021134
252593 52296 1021133
252585 52295 1021840
252588 52295 1014040
252589 52296 1014040
252590 52296 1019043

orders表和order_items表,通過訂單id進行關聯,是一對多的關系。

下面開啟spark-shell,查詢在當天該電商網站,都有哪些用戶購買了什么商品。

1.在Linux上,創建/data/spark3/join,用於存儲實驗數據。

```bash
mkdir -p /data/spark3/join  

切換目錄到/data/spark3/join目錄下,並從http://192.168.1.100:60000/allfiles/spark3/join/order_items及http://192.168.1.100:60000/allfiles/spark3/join/orders下載實驗數據。

cd /data/spark3/join  
wget http://192.168.1.100:60000/allfiles/spark3/join/order_items  
wget http://192.168.1.100:60000/allfiles/spark3/join/orders  

2.在HDFS上創建/myspark3/join目錄,並將Linux上/data/spark3/join目錄下的數據,上傳到HDFS。

hadoop fs -mkdir -p /myspark3/join  
hadoop fs -put /data/spark3/join/orders /myspark3/join  
hadoop fs -put /data/spark3/join/order_items /myspark3/join  

3.在Spark窗口創建兩個RDD,分別加載orders文件以及order_items文件中的數據。

val rdd1 = sc.textFile("hdfs://localhost:9000/myspark3/join/orders");  
val rdd2 = sc.textFile("hdfs://localhost:9000/myspark3/join/order_items");  

4.我們的目的是查詢每個用戶購買了什么商品。所以對rdd1和rdd2進行map映射,得出關鍵的兩個列的數據。

val rdd11 = rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )  
val rdd22 = rdd2.map(line=> (line.split('\t')(1), line.split('\t')(2)) )  
5.將rdd11以及rdd22中的數據,根據Key值,進行Join關聯,得到最終結果。

```scala
val rddresult = rdd11 join rdd22  

6.最后將結果輸出,查看輸出效果。

rddresult.collect  

最終的執行結果為:

7.將輸出數據進行格式化:

(52294,(178341,1014200)),  
(52294,(178341,1001012)),  
(52294,(178341,1022245)),  
(52294,(178341,1014724)),  
(52294,(178341,1010731)),  
(52296,(178343,1021134)),  
(52296,(178343,1021133)),  
(52296,(178343,1014040)),  
(52296,(178343,1019043)),  
(52295,(178342,1023399)),  
(52295,(178342,1016840)),  
(52295,(178342,1021840)),  
(52295,(178342,1014040)),  
(52293,(178338,1016840)),  
(52293,(178338,1014040)) 

可以看到上面數據關聯后一共有3列,分別為訂單ID,用戶ID,商品ID。

求平均值:電商網站都會對商品的訪問情況進行統計。現有一個goods_visit文件,存儲了全部商品及各商品的點擊次數。還有一個文件goods,記錄了商品的基本信息。兩張表的數據結構如下:

goods表:商品ID(goods_id),商品狀態(goods_status),商品分類id(cat_id),評分(goods_score)

goods_visit表:商品ID(goods_id),商品點擊次數(click_num)

商品表(goods)及商品訪問情況表(goods_visit)可以根據商品id進行關聯。現在統計每個分類下,商品的平均點擊次數是多少?

1.在Linux上,創建目錄/data/spark3/avg,用於存儲實驗數據。

mkdir -p /data/spark3/avg  

切換到/data/spark3/avg目錄下,並從http://192.168.1.100:60000/allfiles/spark3/avg/goods以及http://192.168.1.100:60000/allfiles/spark3/avg/goods_visit兩個網址下載實驗數據。

cd /data/spark3/avg  
wget http://192.168.1.100:60000/allfiles/spark3/avg/goods  
wget http://192.168.1.100:60000/allfiles/spark3/avg/goods_visit  

2.在HDFS上創建目錄/myspark3/avg,並將Linux/data/spark3/avg目錄下的數據,上傳到HDFS的/myspark3/avg。

hadoop fs -mkdir -p /myspark3/avg  
hadoop fs -put /data/spark3/avg/goods /myspark3/avg  
hadoop fs -put /data/spark3/avg/goods_visit /myspark3/avg  

3.在Spark窗口創建兩個RDD,分別加載goods文件以及goods_visit文件中的數據。

val rdd1 = sc.textFile("hdfs://localhost:9000/myspark3/avg/goods")  
val rdd2 = sc.textFile("hdfs://localhost:9000/myspark3/avg/goods_visit")  

4.我們的目的是統計每個分類下,商品的平均點擊次數,我們可以分三步來做。

首先,對rdd1和rdd2進行map映射,得出關鍵的兩個列的數據。

val rdd11 = rdd1.map(line=> (line.split('\t')(0), line.split('\t')(2)) )  
val rdd22 = rdd2.map(line=> (line.split('\t')(0), line.split('\t')(1)) )  

用collect()方法啟動程序。

rdd11.collect  

查看rdd11的結果如下:

rdd11.collect  
res2: Array[(String, String)] = Array((1000002,52137), (1000003,52137), (1000004,52137), (1000006,52137),  
(1000007,52137), (1000008,52137), (1000010,52137), (1000011,52137), (1000015,52137), (1000018,52137),  
(1000020,52137), (1000021,52137), (1000025,52137), (1000028,52137), (1000030,52137), (1000033,52137),  
(1000035,52137), (1000037,52137), (1000041,52137), (1000044,52137), (1000048,52137), (1000050,52137),  
(1000053,52137), (1000057,52137), (1000059,52137), (1000063,52137), (1000065,52137), (1000067,52137),  
(1000071,52137), (1000073,52137), (1000076,52137), (1000078,52137), (1000080,52137), (1000082,52137),  
(1000084,52137), (1000086,52137), (1000087,52137), (1000088,52137), (1000090,52137), (1000091,52137),  
(1000094,52137), (1000098,52137), (1000101,52137), (1000103,52137), (1000106,52...  
scala>>  

用collect()方法啟動程序。

rdd22.collect  

查看rdd22的結果如下:

rdd22.collect  

res3: Array[(String, String)] = Array((1010000,4), (1010001,0), (1010002,0), (1010003,0), (1010004,0),
(1010005,0), (1010006,74), (1010007,0), (1010008,0), (1010009,1081), (1010010,0), (1010011,0), (1010012,0),
(1010013,44), (1010014,1), (1010018,0), (1010019,542), (1010020,1395), (1010021,18), (1010022,13), (1010023,27),
(1010024,22), (1010025,295), (1010026,13), (1010027,1), (1010028,410), (1010029,2), (1010030,8), (1010031,6),
(1010032,729), (1010033,72), (1010034,3), (1010035,328), (1010036,153), (1010037,100), (1010038,4), (1010039,3),
(1010040,69), (1010041,1), (1010042,1), (1010043,21), (1010044,268), (1010045,11), (1010046,1), (1010047,1),
(1010048,59), (1010049,15), (1010050,19), (1010051,424), (1010052,462), (1010053,9), (1010054,41), (1010055,64),
(1010056,10), (1010057,3), (...
scala>
然后,將rdd11以及rdd22中的數據根據商品ID,也就是key值進行關聯,得到一張大表。表結構變為:(商品id,(商品分類,商品點擊次數))

view plain copy
val rddjoin = rdd11 join rdd22
用collect()方法啟動程序。

view plain copy
rddjoin.collect
查看rddjoin的結果如下:

view plain copy
rddjoin.collect

res4: Array[(String, (String, String))] = Array((1013900,(52137,0)), (1010068,(52007,1316)), (1018970,(52006,788)),
(1020975,(52091,68)), (1019960,(52111,0)), (1019667,(52045,16)), (1010800,(52137,6)), (1019229,(52137,20)), (1022649,
(52119,90)), (1020382,(52137,0)), (1022667,(52021,150)), (1017258,(52086,0)), (1021963,(52072,83)), (1015809,(52137,285)),
(1024340,(52084,0)), (1011043,(52132,0)), (1011762,(52137,2)), (1010976,(52132,34)), (1010512,(52090,8)), (1023965,(52095,0)),
(1017285,(52069,41)), (1020212,(52026,46)), (1010743,(52137,0)), (1020524,(52064,52)), (1022577,(52090,13)), (1021974,(52069,22)),
(1010543,(52137,0)), (1010598,(52136,53)), (1017212,(52108,45)), (1010035,(52006,328)), (1010947,(52089,8)), (1020964,(52071,86)),
(1024001,(52063,0)), (1020191,(52046,0)), (1015739,(...
scala>
最后,在大表的基礎上,進行統計。得到每個分類,商品的平均點擊次數。

view plain copy
rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>{(x._1, x._2._1*1.0/x._2._2)}).collect
將結果輸出,查看輸出效果。

view plain copy
scala> rddjoin.map(x=>{(x._2._1, (x._2._2.toLong, 1))}).reduceByKey((x,y)=>{(x._1+y._1, x._2+y._2)}).map(x=>
{(x._1, x._2._1*1.0/x._2._2)}).collect
res40: Array[(String, Double)] = Array((52009,463.3642857142857), (52135,36.69230769230769), (52128,9.0), (52072,42.8),
(52078,16.5), (52137,34.735241502683365), (52047,20.96551724137931), (52050,0.0), (52056,24.57894736842105),
(52087,17.008928571428573), (52085,31.17142857142857), (52007,547.3076923076923), (52052,19.6), (52081,50.833333333333336),
(52016,106.75), (52058,34.23170731707317), (52124,0.0), (52092,28.453703703703702), (52065,8.644444444444444), (52106,22.5),
(52120,96.7843137254902), (52027,114.7), (52089,17.81159420289855), (52098,57.793103448275865), (52038,74.2), (52061,52.609375),
(52104,49.0), (52014,45.4), (52012,53.26), (52100,22.0), (52043,23.0), (52030,532.48), (52023,150.0), (52083,57.857142857142854),
(52041,40.0), (52049,18.058823529411764), (52074,33.17647058...
scala>


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM