pyspark中的DataFrame等價於Spark SQL中的一個關系表。在pyspark中,DataFrame由Column和Row構成。
- pyspark.sql.SparkSession:是DataFrame和SQL函數的主要入口
- DataFrameReader:讀取數據,返回DataFrame
- DataFrameWriter:把DataFrame存儲到其他存儲系統
- pyspark.sql.DataFrame、pyspark.sql.Column和 pyspark.sql.Row
一,SparkSession類
在操作DataFrame之前,首先需要創建SparkSession,通過SparkSession來操作DataFrame。
1,創建SparkSession
通過Builder類來創建SparkSession,在Databricks Notebook中,spark是默認創建,表示一個SparkSession對象:
spark = SparkSession.builder \ .master("local") \ .appName("Word Count") \ .config("spark.some.config.option", "some-value") \ .getOrCreate()
函數注釋:
- master(master):用於設置要連接的Spark的master URL,例如local表示在本地運行,local[4] 在本地使用4核運行,
- appName(name):為application設置一個名字
- config(key=None, value=None, conf=None):設置SparkSession的配置選項,
- getOrCreate():獲得一個已存在的或者創建一個新的SparkSession
2,從常量數據中創建DataFrame
從RDD、list或pandas.DataFrame 創建DataFrame:
createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)
3,從SQL查詢中創建DataFrame
從一個給定的SQL查詢或Table中獲取DataFrame,舉個例子:
df.createOrReplaceTempView("table1") #use SQL query to fetch data df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1") #use table to fetch data df2 = spark.table("table1")
4,SparkSession的兩個重要屬性
read:該屬性是DataFrameReader 對象,用於讀取數據,返回DataFrame對象
readStream:該屬性是DataStreamReader對象,用於讀取Data Stream,返回 流式的DataFrame對象( streaming DataFrame)
二,DataFrameReader類
從外部存儲系統中讀取數據,返回DataFrame對象,通常使用SparkSession.read來訪問,通用語法是先調用format()函數來指定輸入數據的格式,后調用load()函數從數據源加載數據,並返回DataFrame對象:
df = spark.read.format('json').load('python/test_support/sql/people.json')
對於不同的格式,DataFrameReader類有細分的函數來加載數據:
df_csv = spark.read.csv('python/test_support/sql/ages.csv') df_json = spark.read.json('python/test_support/sql/people.json') df_txt = spark.read.text('python/test_support/sql/text-test.txt') df_parquet = spark.read.parquet('python/test_support/sql/parquet_partitioned') # read a table as a DataFrame df = spark.read.parquet('python/test_support/sql/parquet_partitioned') df.createOrReplaceTempView('tmpTable') spark.read.table('tmpTable')
還可以通過jdbc,從JDBC URL中構建DataFrame
jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
三,DataFrameWriter類
用於把DataFrame寫入到外部存儲系統中,通過DataFrame.write來訪問。
(df.write.format('parquet') .mode("overwrite") .saveAsTable('bucketed_table'))
函數注釋:
- format(source):指定底層輸出的源的格式
- mode(saveMode):當數據或表已經存在時,指定數據存儲的行為,保存的模式有:append、overwrite、error和ignore。
saveAsTable
(name, format=None, mode=None, partitionBy=None, **options):把DataFrame 存儲為表save
(path=None, format=None, mode=None, partitionBy=None, **options):把DataFrame存儲到數據源中
對於不同的格式,DataFrameWriter類有細分的函數來加載數據:
df.write.csv(os.path.join(tempfile.mkdtemp(), 'data')) df.write.json(os.path.join(tempfile.mkdtemp(), 'data')) df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data')) df.write.txt(os.path.join(tempfile.mkdtemp(), 'data')) #wirte data to external database via jdbc df.write.jdbc(url, table, mode=None, properties=None)
把DataFrame內容存儲到源中:
df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data'))
把DataFrame的內容存到表中:
df.write.saveAsTable(name='db_name.table_name',format='delta')
四,DataFrame操作
DataFrame等價於Spark SQL中的關系表,
1,常規操作
從parquet 文件中讀取數據,返回一個DataFrame對象:
people = spark.read.parquet("...")
從DataFrame對象返回一列:
ageCol = people.age
從DataFrame對象中row的集合:
people.collect()
從DataFrame對象中刪除列:
people.drop(*cols)
2,創建臨時視圖
可以創建全局臨時視圖,也可以創建本地臨時視圖,對於local view,臨時視圖的生命周期和SparkSession相同;對於global view,臨時視圖的生命周期由Spark application決定。
createOrReplaceGlobalTempView(name)
createGlobalTempView(name)
createOrReplaceTempView(name)
createTempView(name)
3,DataFrame數據的查詢
df.filter(df.age > 3) df.select('name', 'age') # join cond = [df.name == df3.name, df.age == df3.age] df.join(df3, cond, 'outer').select(df.name, df3.age) #group by df.groupBy('name').agg({'age': 'mean'})
參考文檔: