1 package stuSpark.com; 2 3 import scala.Tuple2; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaPairRDD; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 import org.apache.spark.api.java.function.FlatMapFunction; 10 import org.apache.spark.api.java.function.Function; 11 import org.apache.spark.api.java.function.Function2; 12 import org.apache.spark.api.java.function.PairFunction; 13 import org.apache.spark.storage.StorageLevel; 14 15 import java.util.Arrays; 16 import java.util.Iterator; 17 import java.util.List; 18 import java.util.regex.Pattern; 19 20 public final class JavaWordCount { 21 private static final Pattern SPACE = Pattern.compile(" "); 22 //pattern 對象是一個正則表達式的編譯表示 23 //compile()方法表示編譯此正則表達式regExp,返回regExp被編譯后的pattern 24 25 public static void main(String[] args) throws Exception { 26 27 // file 代表本地路徑,反之代表hdfs路徑 28 String filePath = "file:\\E:\\test.txt"; 29 30 SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount") 31 .setMaster("local[2]"); 32 //設置該程序名稱 設置本地模式 33 JavaSparkContext ctx = new JavaSparkContext(sparkConf); 34 //創建JavaSparkContext對象實例sc 35 36 JavaRDD<String> lines = ctx.textFile(filePath, 1); 37 //直接從集合轉化 sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)) 38 //從HDFS文件轉化 sc.textFile("hdfs://") 39 //從本地文件轉化 sc.textFile("file:/") 40 41 lines.cache(); 42 lines.persist(StorageLevel.MEMORY_ONLY()); 43 //持久化RDD 44 /* 45 * cache()方法表示:使用非序列化的方式將RDD的數據全部嘗試持久化到內存中, 46 * cache是一個transformtion,是lazy的,必須通過一個action觸發, 47 * 才能真正的將該RDD cache到內存中。 48 * 49 * persist()方法表示:手動選擇持久化級別,並使用指定的方式進行持久化 50 * DISK_ONLY:磁盤 51 DISK_ONLY_2:磁盤;雙副本 52 MEMORY_ONLY: 內存;反序列化;把RDD作為反序列化的方式存儲,假如RDD的內容存不下,剩余的分區在以后需要時會重新計算,不會刷到磁盤上。 53 MEMORY_ONLY_2:內存;反序列化;雙副本 54 MEMORY_ONLY_SER:內存;序列化;這種序列化方式,每一個partition以字節數據存儲,好處是能帶來更好的空間存儲,但CPU耗費高 55 MEMORY_ONLY_SER_2 : 內存;序列化;雙副本 56 MEMORY_AND_DISK:內存 + 磁盤;反序列化;雙副本;RDD以反序列化的方式存內存,假如RDD的內容存不下,剩余的會存到磁盤 57 MEMORY_AND_DISK_2 : 內存 + 磁盤;反序列化;雙副本 58 MEMORY_AND_DISK_SER:內存 + 磁盤;序列化 59 MEMORY_AND_DISK_SER_2:內存 + 磁盤;序列化;雙副本 60 * */ 61 62 63 // 並行化集合 64 //並行數組中一個很重要的參數是partitions,它來描述數組被切割的數據集數量。Spark會在每一個partitions上運行任務 65 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); 66 //數組連接list,當更新其中之一時,另一個自動更新 67 JavaRDD<Integer> distData = ctx.parallelize(data); 68 //分發本地Scala集合以形成RDD 69 //初始化一個已經存在的集合 70 71 //filter()參數是函數,函數會過濾掉不符合條件的元素,返回值是新的RDD 72 lines.filter(new Function<String,Boolean>(){ 73 74 public Boolean call(String arg0) throws Exception { 75 // TODO Auto-generated method stub 76 return null; 77 } 78 79 }); 80 //map參數是函數,函數應用於RDD每一個元素,返回值是新的RDD 81 JavaRDD<Integer> lineLengths = lines 82 .map(new Function<String, Integer>() { 83 public Integer call(String s) { 84 return s.length(); 85 } 86 }); 87 //reduce聚集,但是傳入的函數是兩個參數輸入返回一個值,這個函數必須是滿足交換律和結合律 88 int totalLength = lineLengths 89 .reduce(new Function2<Integer, Integer, Integer>() { 90 public Integer call(Integer a, Integer b) { 91 return a + b; 92 } 93 }); 94 //flatMap和map差不多,但是flatMap生成的是多個結果 95 JavaRDD<String> words = lines 96 .flatMap(new FlatMapFunction<String, String>() { 97 //Iterable迭代的 98 public Iterable<String> call(String s) { 99 return Arrays.asList(SPACE.split(s)); 100 } 101 }); 102 //maptopair 將集合數據存為key value 103 JavaPairRDD<String, Integer> ones = words 104 .mapToPair(new PairFunction<String, String, Integer>() { 105 public Tuple2<String, Integer> call(String s) { 106 return new Tuple2<String, Integer>(s, 1); 107 } 108 }); 109 //reduceBykey 根據key聚集,對value進行操作 110 JavaPairRDD<String, Integer> counts = ones 111 .reduceByKey(new Function2<Integer, Integer, Integer>() { 112 public Integer call(Integer i1, Integer i2) { 113 return i1 + i2; 114 } 115 }); 116 //collect封裝返回一個數組 117 List<Tuple2<String, Integer>> output = counts.collect(); 118 for (Tuple2<?, ?> tuple : output) { 119 System.out.println(tuple._1() + ": " + tuple._2()); 120 } 121 ctx.stop(); 122 } 123 }
簡單的java項目開發,所需Jar包見百度網盤
鏈接:https://pan.baidu.com/s/1jqWwBBNIm1kbQoFSCppEZQ 密碼:y4xr