第一個Spark程序


1、Java下Spark開發環境搭建(from http://www.cnblogs.com/eczhou/p/5216918.html

1.1、jdk安裝

安裝oracle下的jdk,我安裝的是jdk 1.7,安裝完新建系統環境變量JAVA_HOME,變量值為“C:\Program Files\Java\jdk1.7.0_79”,視自己安裝路勁而定。

同時在系統變量Path下添加C:\Program Files\Java\jdk1.7.0_79\bin和C:\Program Files\Java\jre7\bin。

1.2 spark環境變量配置

去http://spark.apache.org/downloads.html網站下載相應hadoop對應的版本,我下載的是spark-1.6.0-bin-hadoop2.6.tgz,spark版本是1.6,對應的hadoop版本是2.6

解壓下載的文件,假設解壓 目錄為:D:\spark-1.6.0-bin-hadoop2.6。將D:\spark-1.6.0-bin-hadoop2.6\bin添加到系統Path變量,同時新建SPARK_HOME變量,變量值為:D:\spark-1.6.0-bin-hadoop2.6

1.3 hadoop工具包安裝

spark是基於hadoop之上的,運行過程中會調用相關hadoop庫,如果沒配置相關hadoop運行環境,會提示相關出錯信息,雖然也不影響運行,但是這里還是把hadoop相關庫也配置好吧。

1.3.1 去下載hadoop 2.6編譯好的包https://www.barik.net/archive/2015/01/19/172716/,我下載的是hadoop-2.6.0.tar.gz

1.3.2 解壓下載的文件夾,將相關庫添加到系統Path變量中:D:\hadoop-2.6.0\bin;同時新建HADOOP_HOME變量,變量值為:D:\hadoop-2.6.0

1.4 eclipse環境

直接新建java工程,將D:\spark-1.6.0-bin-hadoop2.6\lib下的spark-assembly-1.6.0-hadoop2.6.0.jar添加到工程中就可以了。

 

2、Java寫Spark WordCount程序

package cn.spark.study;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;


public  class WordCount {
     public  static  void main(String[] args) {
        
         // 創建 SparkConf對象,對程序進行必要的配置
        SparkConf conf =  new SparkConf()
        .setAppName( " WordCount ").setMaster( " local ");
        
         // 通過conf創建上下文對象
        JavaSparkContext sc =  new JavaSparkContext(conf);
        
         // 創建初始RDD
        JavaRDD<String> lines = sc.textFile( " D://spark.txt ");
        
         // ----用各種Transformation算子對RDD進行操作-----------------------------------------
        JavaRDD<String> words = lines.flatMap( new FlatMapFunction<String, String>() {

             private  static final  long serialVersionUID =  1L;

            @Override
             public Iterable<String> call(String line) throws Exception {
                 //  TODO Auto-generated method stub
                 return Arrays.asList(line.split( "   "));
            }
        });
        
        JavaPairRDD<String,Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() {

             private  static final  long serialVersionUID =  1L;

            @Override
             public Tuple2<String, Integer> call(String word) throws Exception {
                 //  TODO Auto-generated method stub
                 return  new Tuple2<String,Integer>(word, 1);
            }
        });
        
        JavaPairRDD<String,Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() {
            
             private  static final  long serialVersionUID =  1L;
            
            @Override
             public Integer call(Integer v1, Integer v2) throws Exception {
                 //  TODO Auto-generated method stub
                 return v1 + v2;
            }
        });
        
        
         // ----用一個 action 算子觸發job-----------------------------------------
        wordCounts. foreach( new VoidFunction<Tuple2<String,Integer>>() {
            
            @Override
             public  void call(Tuple2<String, Integer> wordCount) throws Exception {
                 //  TODO Auto-generated method stub
                System. out.println(wordCount._1 +  "  appeared  " + wordCount._2 +  "  times ");
            }
        });
    }

} 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM