PySpark SQL 基本操作


記錄備忘:

轉自: https://www.jianshu.com/p/177cbcb1cb6f 

 

 

數據拉取

加載包:

from __future__ import print_function

import pandas as pd

from pyspark.sql import HiveContext

from pyspark import SparkContext,SparkConf

from sqlalchemy import create_engine

import datetime

import pyspark.sql.functions as F

 

conf = SparkConf().setAppName("abc")

sc = SparkContext(conf=conf)

hiveCtx = HiveContext(sc)

 

# 創建dataframe

d = [{'name': 'Alice', 'age': 1},{'name': 'Bob', 'age': 5}]

df = sqlContext.createDataFrame(d)

df.show() 

 

sql = "" # 拉數SQL

df  = hiveCtx.sql(sql)

  

 

數據探索

df.show() # 不加參數默認展示前20行

df.count() 

df.printSchema() 

df.columns

 

數據處理

df.select('age','name') # 帶show才能看到結果

df.select(df.age.alias('age_value'),'name')

df.filter(df.name=='Alice')

  

 

函數和UDF

pyspark.sql.functions里有許多常用的函數,可以滿足日常絕大多數的數據處理需求;當然也支持自己寫的UDF,直接拿來用。

自帶函數

根據官方文檔,以下是部分函數說明:

'lit': 'Creates a :class:`Column` of literal value.',

'col': 'Returns a :class:`Column` based on the given column name.',

'column': 'Returns a :class:`Column` based on the given column name.',

'asc': 'Returns a sort expression based on the ascending order of the given column name.',

'desc': 'Returns a sort expression based on the descending order of the given column name.',

 

'upper': 'Converts a string expression to upper case.',

'lower': 'Converts a string expression to upper case.',

'sqrt': 'Computes the square root of the specified float value.',

'abs': 'Computes the absolutle value.',

 

'max': 'Aggregate function: returns the maximum value of the expression in a group.',

'min': 'Aggregate function: returns the minimum value of the expression in a group.',

'first': 'Aggregate function: returns the first value in a group.',

'last': 'Aggregate function: returns the last value in a group.',

'count': 'Aggregate function: returns the number of items in a group.',

'sum': 'Aggregate function: returns the sum of all values in the expression.',

'avg': 'Aggregate function: returns the average of the values in a group.',

'mean': 'Aggregate function: returns the average of the values in a group.',

'sumDistinct': 'Aggregate function: returns the sum of distinct values in the expression.',

 

---------------------------

 

df.select(F.max(df.age))

df.select(F.min(df.age))

df.select(F.avg(df.age)) # 也可以用mean,一樣的效果

df.select(F.countDistinct(df.age)) # 去重后統計

df.select(F.count(df.age)) # 直接統計,經試驗,這個函數會去掉缺失值會再統計

 

from pyspark.sql import Window

df.withColumn("row_number", F.row_number().over(Window.partitionBy("a","b","c","d").orderBy("time"))).show() # row_number()函數

  

數據寫出

寫入集群分區表

all_bike.rdd.map(lambda line: u','.join(map(lambda x:unicode(x),line))).saveAsTextFile('/user/hive/warehouse/bi.db/bikeid_without_3codes_a_d/dt={}'.format(t0_uf)) #轉化為RDD寫入HDFS路徑

  

還有一種方法,是先把dataframe創建成一個臨時表,再用hive sql的語句寫入表的分區

bike_change_2days.registerTempTable('bike_change_2days')
sqlContext.sql("insert into bi.bike_changes_2days_a_d partition(dt='%s') select citycode,biketype,detain_bike_flag,bike_tag_onday,bike_tag_yesterday,bike_num from bike_change_2days"%(date))

 

寫入集群非分區表

df_spark.write.mode("append").insertInto('bi.pesudo_bike_white_list') # 直接使用write.mode方法insert到指定的集群表

 

可以先將PySpark DataFrame轉化成Pandas DataFrame,然后用pandasto_sql方法插入數據庫

 

寫出本地

df.write.csv()

 

 

Pandas DataFrame互相轉換

如果你熟悉Pandas包,並且PySpark處理的中間數據量不是太大,那么可以直接轉換成pandas DataFrame,然后轉化成常規操作。
df.toPandas() # PySpark DataFrame轉化成Pandas DataFrame



import pandas as pd
df_p = pd.DataFrame(dict(num=range(3),char=['a','b','c']))
df_s = sqlContext.createDataFrame(df_p) # pandas dataframe轉化成PySpark DataFrame
type(df_s)

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM