1.Java SparkCore編程
入口是:JavaSparkContext
基本的RDD是:JavaRDD
其他常用RDD: JavaPairRDD
JavaRDD和JavaPairRDD轉換:
JavaRDD => JavaPairRDD: 通過mapToPair函數
JavaPairRDD => JavaRDD: 通過map函數轉換
2.前提
運行前將core-site.xml復制到resource文件夾中
3.程序
1 package com.ibeifeng.senior; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaPairRDD; 5 import org.apache.spark.api.java.JavaRDD; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.api.java.function.FlatMapFunction; 8 import org.apache.spark.api.java.function.Function2; 9 import org.apache.spark.api.java.function.PairFunction; 10 import org.apache.spark.api.java.function.VoidFunction; 11 import scala.Tuple2; 12 13 import java.sql.Connection; 14 import java.sql.DriverManager; 15 import java.sql.PreparedStatement; 16 import java.util.Arrays; 17 import java.util.Iterator; 18 19 /** 20 * Java實現Spark的WordCount程序 21 * Created by ibf on 02/15. 22 */ 23 public class JavaWordCountSparkCore { 24 public static void main(String[] args) { 25 String resultHDFSSavePath = "/beifeng/spark/result/wordcount/" + System.currentTimeMillis(); 26 // 1. 創建SparkConf配置信息 27 SparkConf conf = new SparkConf() 28 .setMaster("local[*]") 29 .setAppName("spark-wordcount"); 30 31 // 2. 創建SparkContext對象,在java編程中,該對象叫做JavaSparkContext 32 JavaSparkContext sc = new JavaSparkContext(conf); 33 34 // 3. 從hdfs讀取文件形成RDD 35 // TODO: 文件路徑自行給定 36 JavaRDD<String> rdd = sc.textFile("/hive/common.db/dept"); 37 38 // 4. RDD數據處理 39 // TODO: 過濾特殊字符 40 // 4.1 行數據的分割,調用flatMap函數 41 JavaRDD<String> wordRDD = rdd.flatMap(new FlatMapFunction<String, String>() { 42 @Override 43 public Iterable<String> call(String s) throws Exception { 44 String line = s; 45 if (line == null) line = ""; 46 String[] arr = line.split("\t"); 47 return Arrays.asList(arr); 48 } 49 }); 50 51 // 4.2 將數據轉換為key/value鍵值對 52 /** 53 * RDD的reduceByKey函數不是RDD類中,通過隱式轉換后,存在於其他類中<br/> 54 * Java由於不存在隱式轉換,所以不能直接調用map函數進行key/value鍵值對轉換操作,必須調用特定的函數 55 * */ 56 JavaPairRDD<String, Integer> wordCountRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { 57 @Override 58 public Tuple2<String, Integer> call(String s) throws Exception { 59 return new Tuple2<String, Integer>(s, 1); 60 } 61 }); 62 63 // 4.3 聚合結果 64 JavaPairRDD<String, Integer> resultRDD = wordCountRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { 65 66 @Override 67 public Integer call(Integer v1, Integer v2) throws Exception { 68 return v1 + v2; 69 } 70 }); 71 72 // 5. 結果輸出 73 // 5.1 結果輸出到HDFS 74 resultRDD.saveAsTextFile(resultHDFSSavePath); 75 // 5.2 結果輸出到MySQL 76 /** 77 * SparkCore RDD數據的讀入是通過InputFormat來讀入數據形成RDD的 78 * sc.newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( 79 conf: Configuration, 80 fClass: Class[F], 81 kClass: Class[K], 82 vClass: Class[V]) 83 * RDD的saveASxxxx相關方法是利用OutputFormat來進行數據輸出的 84 * resultRDD.saveAsNewAPIHadoopDataset(conf: Configuration); 85 */ 86 resultRDD.foreachPartition(new VoidFunction<java.util.Iterator<Tuple2<String, Integer>>>() { 87 88 @Override 89 public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception { 90 Class.forName("com.mysql.jdbc.Driver"); 91 String url = "jdbc:mysql://hadoop-senior01:3306/test"; 92 String username = "root"; 93 String password = "123456"; 94 Connection conn = null; 95 try { 96 // 1. 創建connection連接 97 conn = DriverManager.getConnection(url, username, password); 98 99 // 2. 構建statement 100 String sql = "insert into wordcount values(?,?)"; 101 PreparedStatement pstmt = conn.prepareStatement(sql); 102 103 // 3. 結果數據輸出 104 while (tuple2Iterator.hasNext()) { 105 Tuple2<String, Integer> t2 = tuple2Iterator.next(); 106 pstmt.setString(1, t2._1()); 107 pstmt.setLong(2, t2._2()); 108 109 pstmt.executeUpdate(); 110 } 111 } finally { 112 // 4. 關閉連接 113 conn.close(); 114 } 115 116 } 117 }); 118 119 120 } 121 }