1、搭建基本spark+Hadoop的本地環境
https://blog.csdn.net/u011513853/article/details/52865076?tdsourcetag=s_pcqq_aiomsg
2、下載對應的spark與pyspark的版本進行安裝
https://pypi.org/project/pyspark/2.3.0/#history
3、單詞統計測試
a、python版本
import os import shutil from pyspark import SparkContext inputpath = './data/wc.txt' outputpath = './data/out.txt' sc = SparkContext('local', 'wordcount') # 讀取文件 input = sc.textFile(inputpath) # 切分單詞 words = input.flatMap(lambda line: line.split(' ')) # 轉換成鍵值對並計數 counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y) # 輸出結果 counts.foreach(print) # 刪除輸出目錄 if os.path.exists(outputpath): shutil.rmtree(outputpath, True) # 將統計結果寫入結果文件 counts.saveAsTextFile(outputpath)
b、scala版本
package com.wcount import java.io.{File, PrintWriter} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} object ScalaWordCount { def main(args: Array[String]): Unit = { /** * SparkConf:表示spark application的參數, * setMaster:表示運行的模式: * * local:本地模式,一般用於測試 * standalone:spark集群自帶的資源調度模式 * yarn:hadoop * mesos:資源調度框架 * setAppName:設置application的名稱 */ val conf = new SparkConf().setMaster("local").setAppName("workJob") /** * SparkContext:spark application的上下文環境,通往集群的唯一入口 */ val sc = new SparkContext(conf) // val session: SparkSession = SparkSession.builder.appName("wc").master("local").getOrCreate() val lines: RDD[String] = sc.textFile("./data/wc.txt") val words: RDD[String] = lines.flatMap(line => { println("flatmap...........") line.split(" ") }) val tuple: RDD[(String, Int)] = words.map(word => { println("map............") new Tuple2(word, 1) }) val result: RDD[(String, Int)] = tuple.reduceByKey((v1: Int, v2: Int) => v1 + v2) //result.foreach(println) //文件寫入 val outWriter = new PrintWriter(new File("./data/out.txt")) var wt:String = "" for (item<-result){ wt =item._1.toString+":"+item._2.toString+" " println(wt) } println(wt) outWriter.println(wt) outWriter.close() while (true){ } // sc.textFile("./data/wc").flatMap(line => {line.split(" ")}).map(word => {new Tuple2(word, 1)}).reduceByKey((v1: Int, v2: Int) => v1 + v2).foreach(println) sc.stop() } }