創建DataFrame在Spark SQL中,開發者可以非常便捷地將各種內、外部的單機、分布式數據轉換為DataFrame。以下Python示例代碼充分體現了Spark SQL 1.3.0中DataFrame數據源的豐富多樣和簡單易用:
- # 從Hive中的users表構造DataFrame
- users = sqlContext.table("users")
- # 加載S3上的JSON文件
- logs = sqlContext.load("s3n://path/to/data.json", "json")
- # 加載HDFS上的Parquet文件
- clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")
- # 通過JDBC訪問MySQL
- comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")
- # 將普通RDD轉變為DataFrame
- rdd = sparkContext.textFile("article.txt") \
- .flatMap(lambda line: line.split()) \
- .map(lambda word: (word, 1)) \
- .reduceByKey(lambda a, b: a + b) \
- wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])
- # 將本地數據容器轉變為DataFrame
- data = [("Alice", 21), ("Bob", 24)]
- people = sqlContext.createDataFrame(data, ["name", "age"])
- # 將Pandas DataFrame轉變為Spark DataFrame(Python API特有功能)
- sparkDF = sqlContext.createDataFrame(pandasDF)