在2.0版本之前,Spark的主要編程接口是RDD(彈性分布式數據集),在2.0之后,則主推Dataset,他與RDD一樣是強類型,但更加優化。RDD接口仍然支持,但為了更優性能考慮還是用Dataset的好。
在spark目錄中運行bin/spark-shell,或將spark安裝目錄設為SPARK_HOME環境變量且將其$SPARK_HOME/bin加到PATH中,則以后可在任意目錄執行spark-shell即可啟動。
RDD可以從Hadoop的InputFormats文檔(如hdfs文檔)創建,也可讀寫本地文檔,也可由其他RDD經轉換而來。Dataset也具有這些性質。以讀取文檔為例,RDD時代可以在shell中通過sc.textFile(filename)直接讀取,在Dataset則需要通過spark.read.textFile(filename)讀取。
1. 讀取Dataset方式
val dataset = spark.read.textFile(source_path)
其中spark.read
返回的是一個DataFrameReader
,所以上述方法其加載文本文檔並返回一個string的Dataset,這個dataset僅包含單個名為”value”的列。 若文本文檔的目錄結構包含分區信息,在讀到的dataset中也將被忽略,要想將這些分區信息作為schema列信息的話,需要用 大專欄 spark-shell使用指南. - 韓禹的博客text API, 看textFile的實現, 其也是用的text的特殊參數。
1.1 查看內容
dataset.collect().foreach(println)
或者 dataset.take(10).foreach(println)
其中collect返回所有記錄,take(n)返回n條記錄。
2. 讀取Json為dataset並進行select操作
val dataset = spark.read.json(source_path)
spark.read.json可以返回DataFrame形式的數據
val data = dataset.select($"content", $"id", $"time").filter($"id"===01 && $"time"="2019-01-01")
返回 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [content: string, gid: bigint ... 1 more field]
val dataC = data.select(unbase64($"content")).map(s => new String(s.getAs[Array[Byte]](0), "gb2312"))
將content中的base64的內容解碼為gb2312
val sample = dataC.take(10).foreach(println)
輸出