spark 基本操作


讀取文件的數據

使用的數據:https://codeload.github.com/xsankar/fdps-v3/zip/master

讀取單個文件的數據

case class Employee(EmployeeID: String,
    LastName: String, FirstName: String, Title: String,
    BirthDate: String, HireDate: String,
    City: String, State: String, Zip: String, Country: String,
    ReportsTo: String)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", "spark demo")
    val sc = new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 創建spark對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();

    import spark.implicits._ // 這行必須引入不然下面的報錯
    // header (default false): uses the first line as names of columns.
    val employees = spark.read.option("header", "true")
        .csv("hdfs://m3:9820/NW-Employees.csv").as[Employee];

   employees.show();

  }

 數據轉換成一個視圖,通過sql查詢

case class Employee(EmployeeID: String,
    LastName: String, FirstName: String, Title: String,
    BirthDate: String, HireDate: String,
    City: String, State: String, Zip: String, Country: String,
    ReportsTo: String)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", "spark demo")
    val sc = new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 創建spark對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();

    import spark.implicits._ // 這行必須引入不然下面的報錯
    // header (default false): uses the first line as names of columns.
    val employees = spark.read.option("header", "true")
        .csv("hdfs://m3:9820/NW-Employees.csv").as[Employee];
    // Creates a temporary view using the given name
    employees.createOrReplaceTempView("employeesTable");
    // 通過sql語句查詢, 后面的表名不區分大小寫
    val records = spark.sql("select * from EmployeesTable");
    records.show();
    records.head(2);
    records.explain(true);

  }

 join查詢 

case class Order(OrderID: String,
    CustomerID: String, EmployeeID: String, OrderDate: String,
    ShipCountry: String)

  case class OrderDetail(OrderID: String,
    ProductID: String, UnitPrice: String, Qty: String,
    Discount: String)

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", "spark demo")
    val sc = new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 創建spark對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();

    import spark.implicits._ // 這行必須引入不然下面的報錯
    // header (default false): uses the first line as names of columns.
    val orders = spark.read.option("header", "true")
      .csv("hdfs://m3:9820/NW-Orders.csv").as[Order];
    val orderDetails = spark.read.option("header", "true")
      .csv("hdfs://m3:9820/NW-Order-Details.csv").as[OrderDetail];
    // Creates a temporary view using the given name
    orders.createOrReplaceTempView("orders")
    orderDetails.createOrReplaceTempView("orderDetails")
    // show 方法如果不顯示的指定顯示多少行,則默認顯示20行
    // orders.show();
    // orderDetails.show();
    // 如果對表不指定別名,則別名和表明一樣
    val joinResult = spark.sql("select o.OrderID, orderDetails.ProductID from orders o inner join orderDetails  on o.OrderID = orderDetails.OrderID")
    joinResult.show
    
  }

 數據的讀取和寫出 

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", "spark demo")
    val sc = new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 創建spark對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();

    import spark.implicits._ // 這行必須引入不然下面的報錯
    // header (default false): uses the first line as names of columns.
    // inferSchema (default `false`): infers the input schema automatically from data. It
    // requires one extra pass over the data.
    // read data from file
    val cars = spark.read.option("header", "true").option("inferSchema", "true")
      .csv("hdfs://m3:9820/cars.csv");
    cars.show(5)
    cars.printSchema()   
    
    // write  data to file
    // overwrite 覆蓋原來的數據
    // csv 保存數據
    cars.write.mode("overwrite").option("header", "true").csv("hdfs://m3:9820/cars_csv")
    
    // parquet 格式存儲數據
    cars.write.mode("overwrite").partitionBy("year").parquet("hdfs://m3:9820/cars_parquet")
  }

 統計方法

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", "spark demo")
    val sc = new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 創建spark對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();

    import spark.implicits._ // 這行必須引入不然下面的報錯
    // header (default false): uses the first line as names of columns.
    // inferSchema (default `false`): infers the input schema automatically from data. It
    // requires one extra pass over the data.
    // read data from file
    val cars = spark.read.option("header", "true").option("inferSchema", "true")
      .csv("hdfs://m3:9820/cars.csv");
    cars.show(5)
    cars.printSchema()   
    
    // 顯示某一列的最大值、最小值、平均值、標准偏差
    cars.describe("model").show()
    
    // groupBy 分組    avg 求平均值
    cars.groupBy("year").avg("year").show()
    cars.show()
 
  }

  

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local")
    conf.set("spark.app.name", "spark demo")
    val sc = new SparkContext(conf);
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 創建spark對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();

    import spark.implicits._ // 這行必須引入不然下面的報錯
    // header (default false): uses the first line as names of columns.
    // inferSchema (default `false`): infers the input schema automatically from data. It
    // requires one extra pass over the data.
    // read data from file
    val passagers = spark.read.option("header", "true").option("inferSchema", "true")
      .csv("hdfs://m3:9820/titanic3_02.csv");
     
    // Pclass,Survived,Name,Gender,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Boat,Body,HomeDest
    // 選擇dataset里面的一些列,生成新的dataset
    val passagers1 = passagers.select(passagers("Pclass"), passagers("Survived"), 
           passagers("Gender"), passagers("Age"), passagers("SibSp"), 
           passagers("Parch"), passagers("Fare"))
           
    passagers1.show
    
    passagers1.printSchema()
    
    passagers1.groupBy("Gender").count.show
    
    passagers1.stat.crosstab("Survived", "SibSp").show
    
    
  }

 線性回歸 

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    //conf.set("spark.master", "spark://m2:7077")
    conf.set("spark.master", "local[4]")
    // 創建SparkSession對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    // 創建sparkContext對象
    // val sc = spark.sparkContext
    // inferSchema為true可以自動推測數據的類型,默認false,則所有的數據都是String類型的
    // 1、加載數據
    val cars = spark.read.option("header", "true").option("inferSchema", "true").csv("hdfs://m2:9820/car-milage.csv")
    /*cars.show(5)
    cars.printSchema()
    
    //  mpg|displacement| hp|torque|CRatio|RARatio|CarbBarrells|NoOfSpeed|length|width|weight|automatic|
    cars.describe("mpg", "hp", "weight", "automatic").show
    
    val corr = cars.stat.corr("hp", "weight")
    println("correlation is %2.4f".format(corr))
    
    val cov = cars.stat.cov("hp", "weight")
    // 協方差
    println("covariance is %2.4f".format(cov))*/
    
    // Returns a new [[DataFrame]] that drops rows containing any null or NaN values.
    val cars1 = cars.na.drop()
    
    // 2、創建一個向量
    val assembler = new VectorAssembler()
    
    // 設置輸入
    assembler.setInputCols(Array("displacement", "hp", "torque", "CRatio",
      "RARatio", "CarbBarrells" ,"NoOfSpeed" ,"length", "width" , "weight" ,"automatic"    
    ))   
    // 設置輸出
    assembler.setOutputCol("features")
    
    // 轉換
    val cars2 = assembler.transform(cars1)
    // cars2.show();
    
    // 3、分類數據
    
    val train = cars2.filter(cars2("weight") <= 4000)
    
    val test = cars2.filter(cars2("weight") > 4000)
    
    // test.show
    // 4、設置線性回歸的一些參數
    val linearReg = new LinearRegression
    // Set the maximum number of iterations(迭代)
    linearReg.setMaxIter(100)
    // Set the regularization(正則化) parameter
    linearReg.setRegParam(0.3)
    //  Set the ElasticNet mixing parameter
    // L2 (ridge regression)
    // - L1 (Lasso)
    // L2 + L1 (elastic net)
    // 默認是0 L2(ridge regression), 0 L2, 1 L1(Lasso) 大於0小於1是L2 + L1 
    linearReg.setElasticNetParam(0.8)
    linearReg.setLabelCol("mpg") // 這個就是被預測的值得label
    
    // println("train count: " + train.count())
    // 5、對數據進行訓練
    val mdlLR = linearReg.fit(train)
     
    println("totalIterations: " + mdlLR.summary.totalIterations)
    
    // 6、根據訓練模型預測數據(prediction)
    val predictions = mdlLR.transform(test)
    predictions.show
    val evaluator = new RegressionEvaluator
    evaluator.setLabelCol("mpg")
    val rmse = evaluator.evaluate(predictions)
    // rmse root mean squared error
    println("root mean squared error = " + "%6.3f".format(rmse))
    
    evaluator.setMetricName("mse")
    val mse = evaluator.evaluate(predictions)
    // mean squared error
    println("mean squared error = " + "%6.3f".format(mse))
  }

 分類

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "spark://m2:7077")
    // conf.set("spark.master", "local[8]")
    // 創建SparkSession對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    // 創建sparkContext對象
    // val sc = spark.sparkContext
    // inferSchema為true可以自動推測數據的類型,默認false,則所有的數據都是String類型的
    // 1、加載數據
    val passagers = spark.read.option("header", "true").option("inferSchema", "true")
         .csv("hdfs://m2:9820/titanic3_02.csv")
    // passagers.show()
    // passagers.printSchema()
 
    // 2、提取特征
       val passagers1 = passagers.select(passagers("Pclass"), passagers("Survived").cast(DoubleType).as("Survived"),
           passagers("Gender"), passagers("Age"), passagers("SibSp"), passagers("Parch")
           , passagers("Fare")) 
       
       // VectorAssembler 不支持字符串類型,轉換Gender為數字類型
       val indexer = new StringIndexer
       indexer.setInputCol("Gender")
       indexer.setOutputCol("GenderCat")
       val passagers2 = indexer.fit(passagers1).transform(passagers1)
       // passagers2.show
       
       // 刪除包含null或者NAN的行
       val passagers3 = passagers2.na.drop()
       println("total count:" + passagers2.count() + "  droped count is: " + (passagers2.count() - passagers3.count()))
       
       val vectorAssembler = new VectorAssembler
       vectorAssembler.setInputCols(Array("Pclass", "GenderCat", "Age", "SibSp", "Parch", "Fare"))
       vectorAssembler.setOutputCol("features")
       val passagers4 = vectorAssembler.transform(passagers3)
       // passagers4.show()
       
       // 3、數據分類,分為訓練數據和測試數據
       
       val Array(train, test) = passagers4.randomSplit(Array(0.9, 0.1))
       // train.show()
       
       val algtree = new DecisionTreeClassifier
       algtree.setLabelCol("Survived")
       algtree.setImpurity("gini")
       algtree.setMaxBins(32)
       // Maximum depth of the tree
       algtree.setMaxDepth(5)
       
       // 模型
       val mdlTree = algtree.fit(train)
       // println(mdlTree.toDebugString)
       // println(mdlTree.toString)
       // println(mdlTree.featureImportances)
       
       //4、 利用模型評估
       val predictions = mdlTree.transform(test)
       predictions.show
       
       // 5、模型評估
       val evaluator = new MulticlassClassificationEvaluator
       evaluator.setLabelCol("Survived")
       // metric(度量標准) name in evaluation 
       // (supports `"f1"` (default), `"weightedPrecision"`,`"weightedRecall"`, `"accuracy"`)
       evaluator.setMetricName("accuracy")
       val accuracy = evaluator.evaluate(predictions)
       println("the accuracy is %.2f%%".format(accuracy))
       
  }

  

聚類

def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local[4]")
    // conf.set("spark.master", "local[8]")
    // 創建SparkSession對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    // 創建sparkContext對象
    // val sc = spark.sparkContext
    // inferSchema為true可以自動推測數據的類型,默認false,則所有的數據都是String類型的
    // 1、加載數據
    val points = spark.read.option("header", "true").option("inferSchema", "true")
         .csv("hdfs://m2:9820/cluster-points-v2.csv")
     // points.show()
     // points.printSchema()
  
    // 2、數據轉換
    val vectorAssembler = new VectorAssembler
    vectorAssembler.setInputCols(Array("X", "Y"))
    vectorAssembler.setOutputCol("features")
    val points1 = vectorAssembler.transform(points)
    // points1.show()
    // points1.printSchema()
    
    // 3、聚類是一個非監督學習算法,不需要把數據分為train和test,這里是用k-means算法
    // key值(2)代表有多少個cluster
    val algKmeans = new KMeans().setK(2)
    // 模型
    val mdlKmeans = algKmeans.fit(points1)
    
    // 4、利用模型預測
    val predictions = mdlKmeans.transform(points1)
    // predictions.show
    // 5、評估 wsse 每個cluster中點到cluster中心的距離之和,越小越好
    val wsse = mdlKmeans.computeCost(points1)
    println(wsse)
         
  }

  推薦

def parseRating(row: Row): Rating = {
    val aList = row.getList[String](0)
    Rating(aList.get(0).toInt, aList.get(1).toInt, aList.get(2).toDouble) //.getInt(0), row.getInt(1), row.getDouble(2))
  }
  
  def rowSqDiff(row:Row) : Double = {
	  math.pow( (row.getDouble(2) - row.getFloat(3).toDouble),2)
	}

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.set("spark.master", "local[4]")
    // conf.set("spark.master", "local[8]")
    // 創建SparkSession對象
    val spark = SparkSession.builder().appName("spark sql").config(conf).getOrCreate();
    // 創建sparkContext對象
    // val sc = spark.sparkContext
    // inferSchema為true可以自動推測數據的類型,默認false,則所有的數據都是String類型的
    val startTime = System.nanoTime()
    // 1、加載數據
    
    val movies = spark.read.text("hdfs://m3:9820/movies.dat")
    // movies.show()
    // movies.printSchema()

    val ratings = spark.read.text("hdfs://m3:9820/ratings.dat")
    // ratings.show()
    // ratings.printSchema()

    val users = spark.read.text("hdfs://m3:9820/users.dat")
    // users.show()
    // users.printSchema()

    val ratings1 = ratings.select(split(ratings("value"), "::")).as("values")
    // ratings1.show
    // 2、數據轉換  Rating
    val rating2 = ratings1.rdd.map(parseRating(_))
    val rating3 = spark.createDataFrame(rating2)
    // rating3.show
    
    // 3、數據分為train和test
    val Array(train, test) = rating3.randomSplit(Array(0.8, 0.2))
    
    // 4、構建模型,訓練數據
    val algAls = new ALS
    algAls.setItemCol("product")
    algAls.setRank(12)
    algAls.setRegParam(0.1) // 正則化參數
    algAls.setMaxIter(20)
    // 模型
    val mdlReco = algAls.fit(train)

    // mdlReco.
    // 5、預測數據
    val predictions = mdlReco.transform(test)
    predictions.show
    predictions.printSchema()
     
    // 6、算法評估
    // 過濾一些NAN數據
    val nanState = predictions.na.fill(99999.0)
    println(nanState.filter(nanState("prediction") > 99998).count())
    nanState.filter(nanState("prediction") > 99998).show(5)
    //
    val pred = predictions.na.drop()
    println("Orig = "+predictions.count()+" Final = "+ pred.count() + " Dropped = "+ (predictions.count() - pred.count()))
    // Calculate RMSE & MSE
    val evaluator = new RegressionEvaluator()
		evaluator.setLabelCol("rating")
		var rmse = evaluator.evaluate(pred)
		println("Root Mean Squared Error = "+"%.3f".format(rmse))
		//
		evaluator.setMetricName("mse")
		var mse = evaluator.evaluate(pred)
		println("Mean Squared Error = "+"%.3f".format(mse))
		mse = pred.rdd.map(r => rowSqDiff(r)).reduce(_+_) / predictions.count().toDouble
		println("Mean Squared Error (Calculated) = "+"%.3f".format(mse))
		//
    //
    val elapsedTime = (System.nanoTime() - startTime) / 1e9
    println("Elapsed time: %.2f seconds".format(elapsedTime))
    
    // MatrixFactorizationModel 
  }

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM