spark-shell使用指南. - 韓禹的博客


在2.0版本之前,Spark的主要編程接口是RDD(彈性分布式數據集),在2.0之后,則主推Dataset,他與RDD一樣是強類型,但更加優化。RDD接口仍然支持,但為了更優性能考慮還是用Dataset的好。

在spark目錄中運行bin/spark-shell,或將spark安裝目錄設為SPARK_HOME環境變量且將其$SPARK_HOME/bin加到PATH中,則以后可在任意目錄執行spark-shell即可啟動。

RDD可以從Hadoop的InputFormats文檔(如hdfs文檔)創建,也可讀寫本地文檔,也可由其他RDD經轉換而來。Dataset也具有這些性質。以讀取文檔為例,RDD時代可以在shell中通過sc.textFile(filename)直接讀取,在Dataset則需要通過spark.read.textFile(filename)讀取。

1. 讀取Dataset方式

val dataset = spark.read.textFile(source_path)

其中spark.read返回的是一個DataFrameReader,所以上述方法其加載文本文檔並返回一個string的Dataset,這個dataset僅包含單個名為”value”的列。 若文本文檔的目錄結構包含分區信息,在讀到的dataset中也將被忽略,要想將這些分區信息作為schema列信息的話,需要用 大專欄  spark-shell使用指南. - 韓禹的博客text API, 看textFile的實現, 其也是用的text的特殊參數。

1.1 查看內容

dataset.collect().foreach(println) 或者 dataset.take(10).foreach(println)

其中collect返回所有記錄,take(n)返回n條記錄。

2. 讀取Json為dataset並進行select操作

val dataset = spark.read.json(source_path)

spark.read.json可以返回DataFrame形式的數據

val data = dataset.select($"content", $"id", $"time").filter($"id"===01 && $"time"="2019-01-01")

返回 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [content: string, gid: bigint ... 1 more field]

val dataC = data.select(unbase64($"content")).map(s => new String(s.getAs[Array[Byte]](0), "gb2312"))

將content中的base64的內容解碼為gb2312

val sample = dataC.take(10).foreach(println)

輸出


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM