轉】Spark DataFrame小試牛刀


  原博文出自於:  https://segmentfault.com/a/1190000002614456        感謝!

 

 

三月中旬,Spark發布了最新的1.3.0版本,其中最重要的變化,便是DataFrame這個API的推出。DataFrame讓Spark具備了處理大規模結構化數據的能力,在比原有的RDD轉化方式易用的前提下,計算性能更還快了兩倍。這一個小小的API,隱含着Spark希望大一統「大數據江湖」的野心和決心。DataFrame像是一條聯結所有主流數據源並自動轉化為可並行處理格式的水渠,通過它Spark能取悅大數據生態鏈上的所有玩家,無論是善用R的數據科學家,慣用SQL的商業分析師,還是在意效率和實時性的統計工程師。

以一個常見的場景 -- 日志解析為例,有時我們需要用到一些額外的結構化數據(比如做IP和地址的映射),通常這樣的數據會存在MySQL,而訪問的方式有兩種:一是每個worker遠程去檢索數據庫,弊端是耗費額外的網絡I/O資源;二是使用JdbcRDD的API轉化為RDD格式,然后編寫繁復的函數去實現檢索,顯然要寫更多的代碼。而現在,Spark提供了一種新的選擇,一行代碼就能實現從MySQL到DataFrame的轉化,並且支持SQL查詢。

實例

首先我們在本地放置了一個JSON文件,文件內容如下:

 {"name":"Michael"} {"name":"Andy", "age":30} {"name":"Justin", "age":19} 

然后我們進入spark-shell,控制台的提示說明Spark為我們創建了一個叫sqlContext的上下文,注意,它是DataFrame的起點。
接下來我們希望把本地的JSON文件轉化為DataFrame

scala> val df = sqlContext.jsonFile("/path/to/your/jsonfile") df: org.apache.spark.sql.DataFrame = [age: bigint, name: string] 

從控制台的提示可以得知,我們成功創建了一個DataFrame的對象,包含agename兩個字段。
DataFrame自帶的玩法就多了:

// 輸出表結構 df.printSchema() // 選擇所有年齡大於21歲的人,只保留name字段 df.filter(df("age") > 21).select("name").show() // 選擇name,並把age字段自增 df.select("name", df("age") + 1).show() // 按年齡分組計數 df.groupBy("age").count().show() // 左聯表(注意是3個等號!) df.join(df2, df("name") === df2("name"), "left").show() 

此外,我們也可以把DataFrame對象轉化為一個虛擬的表,然后用SQL語句查詢,比如下面的命令就等同於df.groupBy("age").count().show()

df.registerTempTable("people")
sqlContext.sql("select age, count(*) from people group by age").show() 

當然,Python有同樣豐富的API(由於最終都是轉化為JVM bytecode執行,Python和Scala的效率是一樣的),而且Python還提供了類Pandas的操作語法。關於Python的API,可以參考Spark新年福音:一個用於大規模數據科學的API——DataFrame

MySQL

除了JSON之外,DataFrame現在已經能支持MySQL、Hive、HDFS、PostgreSQL等外部數據源,而對關系數據庫的讀取,是通過jdbc實現的。

對於不同的關系數據庫,必須在SPARK_CLASSPATH變量中加入對應connector的jar包,比如希望連接MySQL的話應該這么啟動spark-shell

SPARK_CLASSPATH=mysql-connector-java-x.x.x-bin.jar spark-shell 

下面要將一個MySQL表轉化為DataFrame對象:

val jdbcDF = sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table")) 

然后十八般武藝又可以派上用場了。

Hive

Spark提供了一個HiveContext的上下文,其實是SQLContext的一個子類,但從作用上來說,sqlContext也支持Hive數據源。只要在部署Spark的時候加入Hive選項,並把已有的hive-site.xml文件挪到$SPARK_HOME/conf路徑下,我們就可以直接用Spark查詢包含已有元數據的Hive表了:

sqlContext.sql("select count(*) from hive_people").show() 

結語

Spark的目標在於成為一個跨環境、跨語言、跨工具的大數據處理和分析平台。DataFrame的推出很好詮釋了這一目標,從初步的使用來看確實很容易上手。隨着性能和穩定性的持續優化,我相信某一天所有玩數據的人,都可以使用Spark作為惟一的平台入口。

來自:建造者說


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM