pyspark獲取和處理RDD數據


彈性分布式數據集(RDD)是一組不可變的JVM對象的分布集,可以用於執行高速運算,它是Apache Spark的核心。

在pyspark中獲取和處理RDD數據集的方法如下:

1. 首先是導入庫和環境配置(本測試在linux的pycharm上完成)

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

os.environ["PYSPARK_PYTHON"]="/usr/bin/python3"

conf = SparkConf().setAppName('test_rdd')
sc = SparkContext('local', 'test', conf=conf)
spark = SparkSession(sc)

2. 然后,提供hdfs分區數據的路徑或者分區表名

txt_File = r"hdfs://host:port/apps/hive/warehouse/數據庫名.db/表名/分區名/part-m-00029.deflate"  # part-m-00029.deflate
# txt_File = r"hdfs://host:port/apps/hive/warehouse/數據庫名.db/表名"  # hive table,即也可直接根據表名讀取

3. sc.textFile進行讀取,得到RDD格式數據<還可以用 spark.sparkContext.parallelize(data) 來獲取RDD數據>,參數中還可設置數據被划分的分區數

txt_ = sc.textFile(txt_File) 

4. 基本操作:

type(txt_):顯示數據類型,這時屬於 'pyspark.rdd.RDD'

 

txt_.first():獲取第一條數據

 

txt_.take(2):獲取前2條數據,形成長度為2的list

 

txt_.take(2)[1].split('\1')[1]:表示獲取前兩條中的第[1]條數據(也就是第2條,因為python的索引是從0開始的),並以 '\1'字符分隔開(這要看你的表用什么作為分隔符的),形成list,再獲取該list的第2條數據

 

txt_.map(lambda x:x.split('\1')):使用lambda函數和map函數快速處理每一行數據,這里表示將每一行以 '\1'字符分隔開,每一行返回一個list;此時數據結構是:'pyspark.rdd.PipelinedRDD'

 

txt_.map(lambda x:(x, x.split('\1'))).filter(lambda y:y[0].startswith('北京')):表示在返回 (x, x.split('\1')) 后,進行篩選filter,獲取其中以 '北京' 開頭的行,並按照相同格式 (例如,這里是(x, x.split('\1'))格式,即原數據+分割后的列表數據) 返回數據

 

txt_.collect():返回所有RDD數據元素,當數據量很大時謹慎操作

 

txt_.toDF():不能直接轉成DataFrame格式,需要設置Schema

 

注意:RDD格式不能用show()方法

##


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM