033 Java Spark的編程


1.Java SparkCore編程

  入口是:JavaSparkContext
    基本的RDD是:JavaRDD
    其他常用RDD: JavaPairRDD
  JavaRDD和JavaPairRDD轉換:
    JavaRDD => JavaPairRDD: 通過mapToPair函數
    JavaPairRDD => JavaRDD: 通過map函數轉換

 

2.前提

  運行前將core-site.xml復制到resource文件夾中

 

3.程序  

  1 package com.ibeifeng.senior;
  2 
  3 import org.apache.spark.SparkConf;
  4 import org.apache.spark.api.java.JavaPairRDD;
  5 import org.apache.spark.api.java.JavaRDD;
  6 import org.apache.spark.api.java.JavaSparkContext;
  7 import org.apache.spark.api.java.function.FlatMapFunction;
  8 import org.apache.spark.api.java.function.Function2;
  9 import org.apache.spark.api.java.function.PairFunction;
 10 import org.apache.spark.api.java.function.VoidFunction;
 11 import scala.Tuple2;
 12 
 13 import java.sql.Connection;
 14 import java.sql.DriverManager;
 15 import java.sql.PreparedStatement;
 16 import java.util.Arrays;
 17 import java.util.Iterator;
 18 
 19 /**
 20  * Java實現Spark的WordCount程序
 21  * Created by ibf on 02/15.
 22  */
 23 public class JavaWordCountSparkCore {
 24     public static void main(String[] args) {
 25         String resultHDFSSavePath = "/beifeng/spark/result/wordcount/" + System.currentTimeMillis();
 26         // 1. 創建SparkConf配置信息
 27         SparkConf conf = new SparkConf()
 28                 .setMaster("local[*]")
 29                 .setAppName("spark-wordcount");
 30 
 31         // 2. 創建SparkContext對象,在java編程中,該對象叫做JavaSparkContext
 32         JavaSparkContext sc = new JavaSparkContext(conf);
 33 
 34         // 3. 從hdfs讀取文件形成RDD
 35         // TODO: 文件路徑自行給定
 36         JavaRDD<String> rdd = sc.textFile("/hive/common.db/dept");
 37 
 38         // 4. RDD數據處理
 39         // TODO: 過濾特殊字符
 40         // 4.1 行數據的分割,調用flatMap函數
 41         JavaRDD<String> wordRDD = rdd.flatMap(new FlatMapFunction<String, String>() {
 42             @Override
 43             public Iterable<String> call(String s) throws Exception {
 44                 String line = s;
 45                 if (line == null) line = "";
 46                 String[] arr = line.split("\t");
 47                 return Arrays.asList(arr);
 48             }
 49         });
 50 
 51         // 4.2 將數據轉換為key/value鍵值對
 52         /**
 53          * RDD的reduceByKey函數不是RDD類中,通過隱式轉換后,存在於其他類中<br/>
 54          * Java由於不存在隱式轉換,所以不能直接調用map函數進行key/value鍵值對轉換操作,必須調用特定的函數
 55          * */
 56         JavaPairRDD<String, Integer> wordCountRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
 57             @Override
 58             public Tuple2<String, Integer> call(String s) throws Exception {
 59                 return new Tuple2<String, Integer>(s, 1);
 60             }
 61         });
 62 
 63         // 4.3 聚合結果
 64         JavaPairRDD<String, Integer> resultRDD = wordCountRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
 65 
 66             @Override
 67             public Integer call(Integer v1, Integer v2) throws Exception {
 68                 return v1 + v2;
 69             }
 70         });
 71 
 72         // 5. 結果輸出
 73         // 5.1 結果輸出到HDFS
 74         resultRDD.saveAsTextFile(resultHDFSSavePath);
 75         // 5.2 結果輸出到MySQL
 76         /**
 77          * SparkCore RDD數據的讀入是通過InputFormat來讀入數據形成RDD的
 78          *  sc.newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
 79          conf: Configuration,
 80          fClass: Class[F],
 81          kClass: Class[K],
 82          vClass: Class[V])
 83          * RDD的saveASxxxx相關方法是利用OutputFormat來進行數據輸出的
 84          * resultRDD.saveAsNewAPIHadoopDataset(conf: Configuration);
 85          */
 86         resultRDD.foreachPartition(new VoidFunction<java.util.Iterator<Tuple2<String, Integer>>>() {
 87 
 88             @Override
 89             public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
 90                 Class.forName("com.mysql.jdbc.Driver");
 91                 String url = "jdbc:mysql://hadoop-senior01:3306/test";
 92                 String username = "root";
 93                 String password = "123456";
 94                 Connection conn = null;
 95                 try {
 96                     // 1. 創建connection連接
 97                     conn = DriverManager.getConnection(url, username, password);
 98 
 99                     // 2. 構建statement
100                     String sql = "insert into wordcount values(?,?)";
101                     PreparedStatement pstmt = conn.prepareStatement(sql);
102 
103                     // 3. 結果數據輸出
104                     while (tuple2Iterator.hasNext()) {
105                         Tuple2<String, Integer> t2 = tuple2Iterator.next();
106                         pstmt.setString(1, t2._1());
107                         pstmt.setLong(2, t2._2());
108 
109                         pstmt.executeUpdate();
110                     }
111                 } finally {
112                     // 4. 關閉連接
113                     conn.close();
114                 }
115 
116             }
117         });
118 
119 
120     }
121 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM