pyspark創建RDD的方式主要有兩種,一種是通過spark.sparkContext.textFile 或者 sparkContext.textFile讀取生成RDD數據;另一種是通過spark.sparkContext.parallelize創建RDD數據。
1. 首先導入庫和進行環境配置(使用的是linux下的pycharm)
import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession from pyspark.sql.types import StructField, StructType, StringType from pyspark.sql import HiveContext os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" #多個python版本時需要指定 spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate()
2. 創建RDD數據,這里采用的是第二種方式
data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed,可以元組、列表或者混合,子元素的長度也不一定要一樣 rdd_ = spark.sparkContext.parallelize(data) print(type(rdd_)) # support: list\tuple\dict or mixed them print(rdd_.take(2)) rdd_collect = rdd_.collect() print(rdd_collect) print(rdd_collect[1])
如下,混合也是可行的,但是長度不一致時,就不能直接轉成DataFrame了,否則會出現: ValueError: Length of object (2) does not match with length of fields (3)
data = [('Alex','male',3),['Nancy',6],{'sport':'tennis'}] # 混合,長度也不一致,相當於RDD把每一行當做一整個元素了 rdd_ = spark.sparkContext.parallelize(data) print(type(rdd_)) # support: list\tuple\dict or mixed them print(rdd_.take(2)) rdd_collect = rdd_.collect() print(rdd_collect) print(rdd_collect[1])
3. 如果RDD要直接轉成DataFrame,使用spark.createDataFrame,則子元素長度要一致,例如:
data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed rdd_ = spark.sparkContext.parallelize(data) # schema schema = StructType([ # true代表不為空 StructField("name", StringType(), True), StructField("gender", StringType(), True), StructField("num", StringType(), True) ]) df = spark.createDataFrame(rdd_, schema=schema) # working when the struct of data is same. print(df.show())
其中,DataFrame和hive table的相互轉換可見:https://www.cnblogs.com/qi-yuan-008/p/12494024.html
4. RDD數據的保存:saveAsTextFile,如下 repartition 表示使用一個分區,后面加上路徑即可
rdd_.repartition(1).saveAsTextFile(r'some_path')
5. DataFrame數據的保存:通過路徑進行設置
# save file_path = r'/home/Felix/pycharm_projects/test/testfile.csv' df.write.csv(path=file_path, header=True, sep=',', mode='overwrite') file_path_2 = r'/home/Felix/pycharm_projects/test/testfile.parquet' df.write.parquet(path=file_path_2, mode='overwrite')
6. 讀取以上保存的csv和parquet文件
dt1 = spark.read.csv(r'/home/Felix/pycharm_projects/test/testfile.csv', header=True) print(dt1.show()) print('11111111') # dt1 = spark.read.parquet(r'/home/Felix/pycharm_projects/test/testfile.parquet') print(dt1.show()) print('22222222')
##