快速開始使用spark


1、版本說明

  • 在spark2.0版本以前,spakr編程接口是RDD(Resilient Distributed Dataset,彈性分布式數據集),spark2.0版本即以上,RDD被Dataset取代,Dataset比RDD更為強大,在底層得到了許多優化了。當然2.0+版本仍然支持RDD,但官方建議使用Dataset。

2、安全

  • spark的安全模式默認是關閉的,這意味着你可能收到攻擊。

3、利用Spark Shell進行交互式數據分析

  • Spark的shell提供了一種學習API的簡單方法,以及一種以交互方式分析數據的強大工具。
  • 可以通過使用scala或者python進行編程。
  • 在spark的安裝根目錄下啟動。

3.1、Scala方式

啟動 
./bin/spark-shell
讀取一個文件用來創建一個新的數據集Dataset
val textFile = spark.read.textFile("README.md")

對數據集進行操作
textFile.count()
textFile.first()
val linesWithSpark = textFile.filter(line => line.contains("Spark"))
textFile.filter(line => line.contains("Spark")).count()

3.2、python方式

啟動
./bin/pyspark
textFile = spark.read.text("README.md")
textFile.count()
textFile.first()
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
textFile.filter(textFile.value.contains("Spark")).count()

4、Dataset的更多操作

1.查找文件中長度最大的字符串,並返回長度
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

2.實現wordcounts
val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
wordCounts.collect()

5、緩存Caching

  • Spark還支持將數據集提取到群集范圍的內存緩存中。這在重復訪問數據時非常有用,例如查詢小的“熱”數據集或運行像PageRank這樣的迭代算法時。舉個簡單的例子,讓我們標記linesWithSpark要緩存的數據集:
linesWithSpark.cache()
linesWithSpark.count()

通過文件運行

  • 新建一個SimpleApp.scala
/* SimpleApp.scala */
import org.apache.spark.sql.SparkSession

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
    val logData = spark.read.textFile(logFile).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println(s"Lines with a: $numAs, Lines with b: $numBs")
    spark.stop()
  }
}
  • 運行結果
    image.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM