彈性分布式數據集(RDD)是一組不可變的JVM對象的分布集,可以用於執行高速運算,它是Apache Spark的核心。
在pyspark中獲取和處理RDD數據集的方法如下:
1. 首先是導入庫和環境配置(本測試在linux的pycharm上完成)
import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" conf = SparkConf().setAppName('test_rdd') sc = SparkContext('local', 'test', conf=conf) spark = SparkSession(sc)
2. 然后,提供hdfs分區數據的路徑或者分區表名
txt_File = r"hdfs://host:port/apps/hive/warehouse/數據庫名.db/表名/分區名/part-m-00029.deflate" # part-m-00029.deflate # txt_File = r"hdfs://host:port/apps/hive/warehouse/數據庫名.db/表名" # hive table,即也可直接根據表名讀取
3. sc.textFile進行讀取,得到RDD格式數據<還可以用 spark.sparkContext.parallelize(data) 來獲取RDD數據>,參數中還可設置數據被划分的分區數
txt_ = sc.textFile(txt_File)
4. 基本操作:
type(txt_):顯示數據類型,這時屬於 'pyspark.rdd.RDD'

txt_.first():獲取第一條數據
txt_.take(2):獲取前2條數據,形成長度為2的list
txt_.take(2)[1].split('\1')[1]:表示獲取前兩條中的第[1]條數據(也就是第2條,因為python的索引是從0開始的),並以 '\1'字符分隔開(這要看你的表用什么作為分隔符的),形成list,再獲取該list的第2條數據
txt_.map(lambda x:x.split('\1')):使用lambda函數和map函數快速處理每一行數據,這里表示將每一行以 '\1'字符分隔開,每一行返回一個list;此時數據結構是:'pyspark.rdd.PipelinedRDD'
txt_.map(lambda x:(x, x.split('\1'))).filter(lambda y:y[0].startswith('北京')):表示在返回 (x, x.split('\1')) 后,進行篩選filter,獲取其中以 '北京' 開頭的行,並按照相同格式 (例如,這里是(x, x.split('\1'))格式,即原數據+分割后的列表數據) 返回數據
txt_.collect():返回所有RDD數據元素,當數據量很大時謹慎操作
txt_.toDF():不能直接轉成DataFrame格式,需要設置Schema
注意:RDD格式不能用show()方法。
##
