pyspark創建RDD數據、RDD轉DataFrame以及保存


pyspark創建RDD的方式主要有兩種,一種是通過spark.sparkContext.textFile 或者 sparkContext.textFile讀取生成RDD數據;另一種是通過spark.sparkContext.parallelize創建RDD數據。

1. 首先導入庫和進行環境配置(使用的是linux下的pycharm)

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import HiveContext

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"  #多個python版本時需要指定

spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate()

2. 創建RDD數據,這里采用的是第二種方式

data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed,可以元組、列表或者混合,子元素的長度也不一定要一樣
rdd_ = spark.sparkContext.parallelize(data)
print(type(rdd_))
# support: list\tuple\dict or mixed them
print(rdd_.take(2))
rdd_collect = rdd_.collect()
print(rdd_collect)
print(rdd_collect[1])

如下,混合也是可行的,但是長度不一致時,就不能直接轉成DataFrame了,否則會出現: ValueError: Length of object (2) does not match with length of fields (3)

data = [('Alex','male',3),['Nancy',6],{'sport':'tennis'}] # 混合,長度也不一致,相當於RDD把每一行當做一整個元素了
rdd_ = spark.sparkContext.parallelize(data)
print(type(rdd_))
# support: list\tuple\dict or mixed them
print(rdd_.take(2))
rdd_collect = rdd_.collect()
print(rdd_collect)
print(rdd_collect[1])

3. 如果RDD要直接轉成DataFrame,使用spark.createDataFrame,則子元素長度要一致,例如:

data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed
rdd_ = spark.sparkContext.parallelize(data)

# schema
schema = StructType([
        # true代表不為空
        StructField("name", StringType(), True),
        StructField("gender", StringType(), True),
        StructField("num", StringType(), True)
    ])
df = spark.createDataFrame(rdd_, schema=schema)  # working when the struct of data is same.
print(df.show()) 

其中,DataFrame和hive table的相互轉換可見:https://www.cnblogs.com/qi-yuan-008/p/12494024.html

4. RDD數據的保存:saveAsTextFile,如下 repartition 表示使用一個分區,后面加上路徑即可

rdd_.repartition(1).saveAsTextFile(r'some_path') 

5. DataFrame數據的保存:通過路徑進行設置

# save
file_path = r'/home/Felix/pycharm_projects/test/testfile.csv'
df.write.csv(path=file_path, header=True, sep=',', mode='overwrite')

file_path_2 = r'/home/Felix/pycharm_projects/test/testfile.parquet'
df.write.parquet(path=file_path_2, mode='overwrite')

6. 讀取以上保存的csv和parquet文件

dt1 = spark.read.csv(r'/home/Felix/pycharm_projects/test/testfile.csv', header=True)
print(dt1.show())
print('11111111')
#
dt1 = spark.read.parquet(r'/home/Felix/pycharm_projects/test/testfile.parquet')
print(dt1.show())
print('22222222')

 

##

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM