pyspark dataframe 常用操作


spark dataframe派生於RDD類,但是提供了非常強大的數據操作功能。當然主要對類SQL的支持。

 
在實際工作中會遇到這樣的情況,主要是會進行兩個數據集的篩選、合並,重新入庫。
 
首先加載數據集,然后在提取數據集的前幾行過程中,才找到limit的函數。
 
而合並就用到union函數,重新入庫,就是registerTemple注冊成表,再進行寫入到HIVE中。
 

 

1、union、unionAll、unionByName,row 合並(上下拼接)

data_all = data_neg.unionByName(data_pos)

2、dataframe 樣本抽樣

data_all.sample(False, 0.5, 1000).count()

3、條件過濾

data_all.filter("label >= 1").count()

4、注冊為臨時表,再使用spark.sql 對dataframe進行操作

res = predictions.select("user_log_acct", split_udf('probability').alias('probability'))

res.registerTempTable("tmp")
spark.sql("insert overwrite table dev.dev_result_temp select user_log_acct,probability from tmp")

spark.stop()

 

創建和保存spark dataframe:

spark.createDataFrame(data, schema=None, samplingRatio=None),直接創建
其中data是行或元組或列表或字典的RDD、list、pandas.DataFrame。

df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender']) #直接創建Dataframe

df = spark.createDataFrame([{'name':'Alice','age':1},
	{'name':'Polo','age':1}]) #從字典創建

schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])
df = spark.createDataFrame(csvRDD, schema) #指定schema。

 spark.read 從文件中讀數據

>>> airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
>>> rdd = sc.textFile('python/test_support/sql/ages.csv') #可以用這種方法將用逗號分隔的rdd轉為dataframe
>>> df2 = spark.read.csv(rdd)
>>> df = spark.read.format('json').load('python/test_support/sql/people.json') 
>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
>>> rdd = sc.textFile('python/test_support/sql/people.json')
>>> df2 = spark.read.json(rdd) 
>>> df = spark.read.text('python/test_support/sql/text-test.txt')
>>> df.collect()
[Row(value='hello'), Row(value='this')]
>>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
>>> df.collect()
[Row(value='hello\nthis')]

  

 

Spark function

1)foreach(f),應用f函數,將df的每一行作為f函數的輸入

例如:

def f(person):

    print(person.name)

df.foreach(f)

2) apply(udf)
3) map(f),應用f函數,作用對象為rdd的每一行

 

參考:https://blog.csdn.net/kittyzc/article/details/82862089 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM