pyspark的安裝配置


1、搭建基本spark+Hadoop的本地環境

  https://blog.csdn.net/u011513853/article/details/52865076?tdsourcetag=s_pcqq_aiomsg

2、下載對應的spark與pyspark的版本進行安裝

  https://pypi.org/project/pyspark/2.3.0/#history

3、單詞統計測試

  a、python版本

import os
import shutil

from pyspark import SparkContext

inputpath = './data/wc.txt'
outputpath = './data/out.txt'

sc = SparkContext('local', 'wordcount')

# 讀取文件
input = sc.textFile(inputpath)
# 切分單詞
words = input.flatMap(lambda line: line.split(' '))
# 轉換成鍵值對並計數
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y)

# 輸出結果
counts.foreach(print)

# 刪除輸出目錄
if os.path.exists(outputpath):
    shutil.rmtree(outputpath, True)

# 將統計結果寫入結果文件
counts.saveAsTextFile(outputpath)

  

  b、scala版本

package com.wcount

import java.io.{File, PrintWriter}

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ScalaWordCount {

  def main(args: Array[String]): Unit = {
    /**
      * SparkConf:表示spark application的參數,
      *   setMaster:表示運行的模式:
      *
      *       local:本地模式,一般用於測試
      *       standalone:spark集群自帶的資源調度模式
      *       yarn:hadoop
      *       mesos:資源調度框架
      *   setAppName:設置application的名稱
      */
    val conf = new SparkConf().setMaster("local").setAppName("workJob")
    /**
      * SparkContext:spark application的上下文環境,通往集群的唯一入口
      */
    val sc = new SparkContext(conf)

//    val session: SparkSession = SparkSession.builder.appName("wc").master("local").getOrCreate()


    val lines: RDD[String] = sc.textFile("./data/wc.txt")
    val words: RDD[String] = lines.flatMap(line => {
      println("flatmap...........")
      line.split(" ")
    })
    val tuple: RDD[(String, Int)] = words.map(word => {
      println("map............")
      new Tuple2(word, 1)
    })
    val result: RDD[(String, Int)] = tuple.reduceByKey((v1: Int, v2: Int) => v1 + v2)
    //result.foreach(println)

    //文件寫入
    val outWriter = new PrintWriter(new File("./data/out.txt"))
    var wt:String = ""

    for (item<-result){
      wt =item._1.toString+":"+item._2.toString+" "
      println(wt)
    }
    println(wt)
    outWriter.println(wt)
    outWriter.close()

    while (true){

    }
    //    sc.textFile("./data/wc").flatMap(line => {line.split(" ")}).map(word => {new Tuple2(word, 1)}).reduceByKey((v1: Int, v2: Int) => v1 + v2).foreach(println)
    sc.stop()
  }
}

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM