Spark學習之JavaRdd


RDD 介紹

RDD,全稱Resilient Distributed Datasets(彈性分布式數據集),是Spark最為核心的概念,是Spark對數據的抽象。RDD是分布式的元素集合,每個RDD只支持讀操作,且每個RDD都被分為多個分區存儲到集群的不同節點上。除此之外,RDD還允許用戶顯示的指定數據存儲到內存和磁盤中,掌握了RDD編程是SPARK開發的第一步。

 

1:創建操作(creation operation):RDD的創建由SparkContext來負責。
2:轉換操作(transformation operation):將一個RDD通過一定操作轉換為另一個RDD。
3:行動操作(action operation):Spark為惰性計算,對RDD的行動操作都會觸發Spark作業的運行
4:控制操作(control operation):對RDD進行持久化等。

 

DEMO代碼地址:https://github.com/zhp8341/sparkdemo/blob/master/src/main/java/com/demo/spark/rdddemo/OneRDD.java

一:創建操作

創建RDD有兩種方式:
1 讀取一個數據集(SparkContext.textFile()) :

JavaDStreamlines=jssc.textFileStream("/Users/huipeizhu/Documents/sparkdata/input/"); 
JavaReceiverInputDStreamlines = jssc.socketTextStream("localhost", 9999);


2 讀取一個集合(SparkContext.parallelize()) :

Listlist = Arrays.asList(5, 4, 3, 2, 1);
JavaRDDrdd = sc.parallelize(list);

二:轉換操作

1:單個RDD轉換操作
map() : 對每個元素進行操作,返回一個新的RDD
System.out.println("RDD每個元素乘10:" + rdd.map(v -> v * 10)


filter() : 最每個元素進行篩選,返回符合條件的元素組成的一個新RDD
System.out.println("RDD去掉1的元素:" + rdd.filter(v -> v != 1));

flatMap() : 對每個元素進行操作,將返回的迭代器的所有元素組成一個新的RDD返回
r.dd.flatMap(x -> x.to(3)).collect()

distinct():去重操作
System.out.println("RDD去重操作:" + rdd.distinct());

rdd最大和最小值

Integer max=  rdd.reduce((v1, v2) -> Math.max(v1, v2));

Integer min=  rdd.reduce((v1, v2) -> Math.min(v1, v2))


2:兩個RDD的轉化操作:


[1, 2, 3] [3, 4, 5] 兩個個RDD簡單相關操作

union() :合並,不去重
System.out.println("兩個RDD集合:" + rdd1.union(rdd2).collect());

intersection() :交集
System.out.println("兩個RDD集合共同元素:" + rdd1.intersection(rdd2).collect());

cartesian() :笛卡兒積
System.out.println("和另外一個RDD集合的笛卡爾積:" + rdd1.cartesian(rdd2).collect());

subtract() : 移除相同的內容
rdd1.subtract(rdd2).collect()

 

三:行動操作


collect() :返回所有元素
System.out.println("原始數據:" + rdd.collect());

count() :返回元素個數
System.out.println("統計RDD的所有元素:" + rdd.count());

countByValue() : 各個元素出現的次數
System.out.println("每個元素出現的次數:" + rdd.countByValue());

take(num) : 返回num個元素
System.out.println("取出rdd返回2個元素:" + rdd.take(2));

top(num) : 返回前num個元素
System.out.println("取出rdd返回最前2個元素:" + rdd.top(2));


reduce(func) :並行整合RDD中的所有數據(最常用的)
System.out.println("整合RDD中所有數據(sum):" + rdd.reduce((v1, v2) -> v1 + v2));

foreach(func):對每個元素使用func
rdd.foreach(t -> System.out.print(t));


四:控制操作


cache():

persist():保留着RDD的依賴關系

checkpoint(level:StorageLevel):RDD[T]切斷RDD依賴關系

所謂的控制操作就是持久化
你能通過persist()或者cache()方法持久化一個rdd。首先,在action中計算得到rdd;然后,將其保存在每個節點的內存中。Spark的緩存是一個容錯的技術-如果RDD的任何一個分區丟失,它 可以通過原有的轉換(transformations)操作自動的重復計算並且創建出這個分區。
此外,我們可以利用不同的存儲級別存儲每一個被持久化的RDD。
Spark自動的監控每個節點緩存的使用情況,利用最近最少使用原則刪除老舊的數據。如果你想手動的刪除RDD,可以使用RDD.unpersist()方法。
在實際操作當中我們可以借助第三方進行數據持久化 如:redis


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM