Databricks 第2篇:pyspark.sql 簡介


pyspark中的DataFrame等價於Spark SQL中的一個關系表。在pyspark中,DataFrame由Column和Row構成。

  • pyspark.sql.SparkSession:是DataFrame和SQL函數的主要入口
  • DataFrameReader:讀取數據,返回DataFrame
  • DataFrameWriter:把DataFrame存儲到其他存儲系統
  • pyspark.sql.DataFrame、pyspark.sql.Column和 pyspark.sql.Row

一,SparkSession類

在操作DataFrame之前,首先需要創建SparkSession,通過SparkSession來操作DataFrame。

1,創建SparkSession

通過Builder類來創建SparkSession,在Databricks Notebook中,spark是默認創建,表示一個SparkSession對象:

spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

函數注釋:

  • master(master):用於設置要連接的Spark的master URL,例如local表示在本地運行,local[4] 在本地使用4核運行,
  • appName(name):為application設置一個名字
  • config(key=Nonevalue=Noneconf=None):設置SparkSession的配置選項,
  • getOrCreate():獲得一個已存在的或者創建一個新的SparkSession

2,從常量數據中創建DataFrame

從RDD、list或pandas.DataFrame 創建DataFrame:

createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True)

3,從SQL查詢中創建DataFrame

從一個給定的SQL查詢或Table中獲取DataFrame,舉個例子:

df.createOrReplaceTempView("table1")

#use SQL query to fetch data
df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")

#use table to fetch data
df2 = spark.table("table1")

4,SparkSession的兩個重要屬性

read:該屬性是DataFrameReader 對象,用於讀取數據,返回DataFrame對象

readStream:該屬性是DataStreamReader對象,用於讀取Data Stream,返回 流式的DataFrame對象( streaming DataFrame)

二,DataFrameReader類

從外部存儲系統中讀取數據,返回DataFrame對象,通常使用SparkSession.read來訪問,通用語法是先調用format()函數來指定輸入數據的格式,后調用load()函數從數據源加載數據,並返回DataFrame對象:

df = spark.read.format('json').load('python/test_support/sql/people.json')

對於不同的格式,DataFrameReader類有細分的函數來加載數據:

df_csv = spark.read.csv('python/test_support/sql/ages.csv')
df_json = spark.read.json('python/test_support/sql/people.json')
df_txt = spark.read.text('python/test_support/sql/text-test.txt')
df_parquet = spark.read.parquet('python/test_support/sql/parquet_partitioned')

# read a table as a DataFrame
df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
df.createOrReplaceTempView('tmpTable')
spark.read.table('tmpTable')

還可以通過jdbc,從JDBC URL中構建DataFrame

jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)

三,DataFrameWriter類

用於把DataFrame寫入到外部存儲系統中,通過DataFrame.write來訪問。

(df.write.format('parquet')  
    .mode("overwrite")
    .saveAsTable('bucketed_table'))

函數注釋:

  • format(source):指定底層輸出的源的格式
  • mode(saveMode):當數據或表已經存在時,指定數據存儲的行為,保存的模式有:append、overwrite、error和ignore。
  • saveAsTable(nameformat=Nonemode=NonepartitionBy=None**options):把DataFrame 存儲為表
  • save(path=Noneformat=Nonemode=NonepartitionBy=None**options):把DataFrame存儲到數據源中

對於不同的格式,DataFrameWriter類有細分的函數來加載數據:

df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.parquet(os.path.join(tempfile.mkdtemp(), 'data'))
df.write.txt(os.path.join(tempfile.mkdtemp(), 'data'))

#wirte data to external database via jdbc
df.write.jdbc(url, table, mode=None, properties=None)

把DataFrame內容存儲到源中:

df.write.mode("append").save(os.path.join(tempfile.mkdtemp(), 'data'))

把DataFrame的內容存到表中:

df.write.saveAsTable(name='db_name.table_name',format='delta')

四,DataFrame操作

DataFrame等價於Spark SQL中的關系表,

1,常規操作

從parquet 文件中讀取數據,返回一個DataFrame對象:

people = spark.read.parquet("...")

從DataFrame對象返回一列:

ageCol = people.age

從DataFrame對象中row的集合:

people.collect()

從DataFrame對象中刪除列:

people.drop(*cols)

2,創建臨時視圖

可以創建全局臨時視圖,也可以創建本地臨時視圖,對於local view,臨時視圖的生命周期和SparkSession相同;對於global view,臨時視圖的生命周期由Spark application決定。

createOrReplaceGlobalTempView(name)
createGlobalTempView(name)
createOrReplaceTempView(name)
createTempView(name)

3,DataFrame數據的查詢

df.filter(df.age > 3)
df.select('name', 'age')

# join
cond = [df.name == df3.name, df.age == df3.age]
df.join(df3, cond, 'outer').select(df.name, df3.age)

#group by 
df.groupBy('name').agg({'age': 'mean'})

 

參考文檔:

pyspark.sql module


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM