原文引自:http://blog.csdn.net/fengzhimohan/article/details/78535143
該案例中,我們將假設我們需要統計一個 10 萬人口的所有人的平均年齡,當然如果您想測試 Spark 對於大數據的處理能力,您可以把人口數放的更大,比如 1 億人口,當然這個取決於測試所用集群的存儲容量。假設這些年齡信息都存儲在一個文件里,並且該文件的格式如下,第一列是 ID,第二列是年齡。如下圖格式:

以下利用java隨機生成10萬個人口年齡文件
1 import java.io.File; 2 import java.io.FileWriter; 3 import java.io.IOException; 4 import java.util.Random; 5 6 /** 7 * Created by Administrator on 2017/11/13. 8 */ 9 public class DataFileGenerator { 10 public static void main(String[] args){ 11 File file = new File("F:\\DataFile.txt"); 12 try { 13 FileWriter fileWriter = new FileWriter(file); 14 Random rand = new Random(); 15 for (int i=1;i<=100000;i++){ 16 fileWriter.write(i +" " + (rand.nextInt(100)+1)); 17 fileWriter.write(System.getProperty("line.separator")); 18 } 19 fileWriter.flush(); 20 fileWriter.close(); 21 22 }catch(IOException e){ 23 e.printStackTrace(); 24 } 25 } 26 }
場景分析:
要計算平均年齡,那么首先需要對源文件對應的 RDD 進行處理,也就是將它轉化成一個只包含年齡信息的 RDD,其次是計算元素個數即為總人數,然后是把所有年齡數加起來,最后平均年齡=總年齡/人數。
對於第一步我們需要使用 map 算子把源文件對應的 RDD 映射成一個新的只包含年齡數據的 RDD,很顯然需要對在 map 算子的傳入函數中使用 split 方法,得到數組后只取第二個元素即為年齡信息;第二步計算數據元素總數需要對於第一步映射的結果 RDD 使用 count 算子;第三步則是使用 reduce 算子對只包含年齡信息的 RDD 的所有元素用加法求和;最后使用除法計算平均年齡即可。
以下實現對平均年齡的計算的代碼:
1 import org.apache.spark.SparkConf; 2 import org.apache.spark.api.java.JavaRDD; 3 import org.apache.spark.api.java.JavaSparkContext; 4 import org.apache.spark.api.java.function.FlatMapFunction; 5 import org.apache.spark.api.java.function.Function; 6 import org.apache.spark.api.java.function.Function2; 7 import java.util.Arrays; 8 9 /** 10 * Created by Administrator on 2017/11/13. 11 */ 12 public class AvgAgeCalculator { 13 public static void main(String[] args){ 14 15 SparkConf sparkConf = new SparkConf().setAppName("AvgAgeCalculator").setMaster("local[3]"); 16 JavaSparkContext sc = new JavaSparkContext(sparkConf); 17 //讀取文件 18 JavaRDD<String> dataFile = sc.textFile("F:\\DataFile.txt"); 19 //數據分片並取第二個數 20 JavaRDD<String> ageData = dataFile.flatMap(new FlatMapFunction<String, String>() { 21 @Override 22 public Iterable<String> call(String s) throws Exception { 23 return Arrays.asList(s.split(" ")[1]); 24 } 25 }); 26 //求出所有年齡個數。 27 long count = ageData.count(); 28 //轉換數據類型 29 JavaRDD<Integer> ageDataInt = ageData.map(new Function<String, Integer>() { 30 @Override 31 public Integer call(String s) throws Exception { 32 return Integer.parseInt(String.valueOf(s)); 33 } 34 }); 35 //求出年齡的和 36 Integer totalAge = ageDataInt.reduce(new Function2<Integer, Integer, Integer>() { 37 @Override 38 public Integer call(Integer x, Integer y) throws Exception { 39 return x+y; 40 } 41 }); 42 //平均值結果為double類型 43 Double avgAge = totalAge.doubleValue()/count; 44 /*System.out.println(ageData.collect()); 45 System.out.println(count);*/ 46 System.out.println("Total Age:" + totalAge + "; Number of People:" + count ); 47 System.out.println("Average Age is " + avgAge); 48 49 } 50 }
運行結果:

從結果可以看出,計算出所以年齡的總和,以及總人數,以及平均年齡值。看似簡單的例子,在對數組取值和數據類型轉換時候需要特別的注意。
