學習隨筆--Spark java開發入門


  1 package stuSpark.com;
  2 
  3 import scala.Tuple2;
  4 
  5 import org.apache.spark.SparkConf;
  6 import org.apache.spark.api.java.JavaPairRDD;
  7 import org.apache.spark.api.java.JavaRDD;
  8 import org.apache.spark.api.java.JavaSparkContext;
  9 import org.apache.spark.api.java.function.FlatMapFunction;
 10 import org.apache.spark.api.java.function.Function;
 11 import org.apache.spark.api.java.function.Function2;
 12 import org.apache.spark.api.java.function.PairFunction;
 13 import org.apache.spark.storage.StorageLevel;
 14 
 15 import java.util.Arrays;
 16 import java.util.Iterator;
 17 import java.util.List;
 18 import java.util.regex.Pattern;
 19 
 20 public final class JavaWordCount {
 21     private static final Pattern SPACE = Pattern.compile(" ");
 22     //pattern 對象是一個正則表達式的編譯表示
 23     //compile()方法表示編譯此正則表達式regExp,返回regExp被編譯后的pattern
 24 
 25     public static void main(String[] args) throws Exception {
 26 
 27         // file 代表本地路徑,反之代表hdfs路徑
 28         String filePath = "file:\\E:\\test.txt";
 29 
 30         SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount")
 31                 .setMaster("local[2]");
 32         //設置該程序名稱   設置本地模式
 33         JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 34         //創建JavaSparkContext對象實例sc
 35         
 36         JavaRDD<String> lines = ctx.textFile(filePath, 1);
 37         //直接從集合轉化 sc.parallelize(List(1,2,3,4,5,6,7,8,9,10))
 38         //從HDFS文件轉化 sc.textFile("hdfs://")
 39         //從本地文件轉化 sc.textFile("file:/")
 40         
 41         lines.cache();
 42         lines.persist(StorageLevel.MEMORY_ONLY());
 43         //持久化RDD
 44         /*
 45          * cache()方法表示:使用非序列化的方式將RDD的數據全部嘗試持久化到內存中,
 46          * cache是一個transformtion,是lazy的,必須通過一個action觸發,
 47          * 才能真正的將該RDD cache到內存中。
 48          * 
 49          * persist()方法表示:手動選擇持久化級別,並使用指定的方式進行持久化
 50          * DISK_ONLY:磁盤
 51         DISK_ONLY_2:磁盤;雙副本
 52         MEMORY_ONLY: 內存;反序列化;把RDD作為反序列化的方式存儲,假如RDD的內容存不下,剩余的分區在以后需要時會重新計算,不會刷到磁盤上。
 53         MEMORY_ONLY_2:內存;反序列化;雙副本
 54         MEMORY_ONLY_SER:內存;序列化;這種序列化方式,每一個partition以字節數據存儲,好處是能帶來更好的空間存儲,但CPU耗費高
 55         MEMORY_ONLY_SER_2 : 內存;序列化;雙副本
 56         MEMORY_AND_DISK:內存 + 磁盤;反序列化;雙副本;RDD以反序列化的方式存內存,假如RDD的內容存不下,剩余的會存到磁盤
 57         MEMORY_AND_DISK_2 : 內存 + 磁盤;反序列化;雙副本
 58         MEMORY_AND_DISK_SER:內存 + 磁盤;序列化  
 59         MEMORY_AND_DISK_SER_2:內存 + 磁盤;序列化;雙副本
 60          * */
 61         
 62 
 63         // 並行化集合
 64         //並行數組中一個很重要的參數是partitions,它來描述數組被切割的數據集數量。Spark會在每一個partitions上運行任務
 65         List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
 66         //數組連接list,當更新其中之一時,另一個自動更新
 67         JavaRDD<Integer> distData = ctx.parallelize(data);
 68         //分發本地Scala集合以形成RDD
 69         //初始化一個已經存在的集合
 70         
 71         //filter()參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD
 72         lines.filter(new Function<String,Boolean>(){
 73 
 74             public Boolean call(String arg0) throws Exception {
 75                 // TODO Auto-generated method stub
 76                 return null;
 77             }
 78             
 79         });
 80         //map參數是函數,函數應用於RDD每一個元素,返回值是新的RDD
 81         JavaRDD<Integer> lineLengths = lines
 82                 .map(new Function<String, Integer>() {
 83                     public Integer call(String s) {
 84                         return s.length();
 85                     }
 86                 });
 87         //reduce聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律
 88         int totalLength = lineLengths
 89                 .reduce(new Function2<Integer, Integer, Integer>() {
 90                     public Integer call(Integer a, Integer b) {
 91                         return a + b;
 92                     }
 93                 });
 94         //flatMap和map差不多,但是flatMap生成的是多個結果
 95         JavaRDD<String> words = lines
 96                 .flatMap(new FlatMapFunction<String, String>() {
 97                     //Iterable迭代的
 98                     public Iterable<String> call(String s) {
 99                         return Arrays.asList(SPACE.split(s));
100                     }
101                 });
102         //maptopair 將集合數據存為key value
103         JavaPairRDD<String, Integer> ones = words
104                 .mapToPair(new PairFunction<String, String, Integer>() {
105                     public Tuple2<String, Integer> call(String s) {
106                         return new Tuple2<String, Integer>(s, 1);
107                     }
108                 });
109         //reduceBykey 根據key聚集,對value進行操作
110         JavaPairRDD<String, Integer> counts = ones
111                 .reduceByKey(new Function2<Integer, Integer, Integer>() {
112                     public Integer call(Integer i1, Integer i2) {
113                         return i1 + i2;
114                     }
115                 });
116         //collect封裝返回一個數組
117         List<Tuple2<String, Integer>> output = counts.collect();
118         for (Tuple2<?, ?> tuple : output) {
119             System.out.println(tuple._1() + ": " + tuple._2());
120         }
121         ctx.stop();
122     }
123 }

簡單的java項目開發,所需Jar包見百度網盤

鏈接:https://pan.baidu.com/s/1jqWwBBNIm1kbQoFSCppEZQ 密碼:y4xr


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM