Pyspark-SQL 官方 API 的一些梳理(上)


 在 Pyspark 操縱 spark-SQL 的世界里借助 session 這個客戶端來對內容進行操作和計算。里面涉及到非常多常見常用的方法,本篇文章回來梳理一下這些方法和操作。

 

class pyspark.sql.SparkSession 類

下面是一個初始化 spark session 的方法,接下來我會依次來介紹相關函數代表的意義。

>>> spark = SparkSession.builder \
...     .master("local") \
...     .appName("Word Count") \
...     .config("spark.some.config.option", "some-value") \
...     .getOrCreate()

 

SparkSession.builder:

Builder for SparkSession 這個就是生成一個 sparksession 實例。他下面有一些支持的函數

master: 設置 spark master 的 url 。由於我們集群使用的是 spark on yarn 的模式,所以可以用選擇不設置這個參數。

appName: 在程序運行的時候看到的應用名稱。

config: 其實其他參數也是調用 .config 設置相應的 config 參數,例如 .master 就是調用了 .config("spark.master", master)。

enableHiveSupport: 啟動對 hive 的支持。例如支持 hivesql 以及 hive udf 等。

getOrCreate: 得到一個現成的 SparkSession ,如果沒有就生成一個。

 

SparkSession.catalog:

提供一個接口來操作 create drop alter query 庫或者表

 

SparkSession.createDataFrame:

可以獲得從 rdd python list 和 pandas df 創建 df 的能力。下面貼一下官方的例子:

>>> l = [('Alice', 1)]
>>> spark.createDataFrame(l).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(l, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]


>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]


>>> rdd = sc.parallelize(l)
>>> spark.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = spark.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]


>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = spark.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]


>>> from pyspark.sql.types import *
>>> schema = StructType([
...    StructField("name", StringType(), True),
...    StructField("age", IntegerType(), True)])
>>> df3 = spark.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]


>>> spark.createDataFrame(df.toPandas()).collect()  
[Row(name=u'Alice', age=1)]
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  
[Row(0=1, 1=2)]


>>> spark.createDataFrame(rdd, "a: string, b: int").collect()
[Row(a=u'Alice', b=1)]
>>> rdd = rdd.map(lambda row: row[1])
>>> spark.createDataFrame(rdd, "int").collect()
[Row(value=1)]
>>> spark.createDataFrame(rdd, "boolean").collect() 
Traceback (most recent call last):
    ...
Py4JJavaError: ...

 

SparkSession.sql:

使用 sql 方法返回的是 df 例如:

>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]

 

SparkSession.table:

這個可以返回指定表的 df 

>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True

 

由於 SQLContext 在 Spark 2.0 版本以后已經和 Session 類合並了,所以兩個類一起說了。

目前看來大量 SQLContext 的方法都被整合進了 SparkSession.catalog 方法中。

 

SparkSession.catalog.cacheTable(TABLE_NAME):

緩存相關表到內存方便使用

 

SparkSession.catalog.clearCache:

清楚所有表的緩存

 

SparkSession.catalog.dropTempView():

丟棄掉本地計算使用的臨時視圖,如果之前這個視圖之前被 cache 過了。調用這個方法將會注銷 cache。如果丟棄視圖成功會返回 true 否則就返回 false

>>> sqlContext.registerDataFrameAsTable(df, "table1")
>>> sqlContext.dropTempTable("table1")

 

DataFrame.createOrReplaceTempView(TABLE_NAME):

使用現成的 df 創建臨時視圖。

>>> df.createOrReplaceTempView("people")
>>> df2 = df.filter(df.age > 3)
>>> df2.createOrReplaceTempView("people")
>>> df3 = spark.sql("select * from people")
>>> sorted(df3.collect()) == sorted(df2.collect())
True

 

SparkSession.catalog.registerFunction:

非常重要的一個函數用於給計算套上 udf。這個函數幾經變動。首先在 1.2 版本里面在 sqlContext 里面首次實現,后來在 2.0 版本被遷移到了 SparkSession.catalog 里面實現。然后在 spark2.3.0 會遷移到 SparkSession.udf 里面實現。SparkSession.udf.register 這個函數會提供功能。由於會完全兼容之前的情況,這里我們暫時把它划分到 catalog 里。

Parameters:    
name – name of the UDF
f – python function
returnType – a pyspark.sql.types.DataType object
>>> SparkSession.catalog.registerFunction("stringLengthString", lambda x: len(x))
>>> SparkSession.sql("SELECT stringLengthString('test')").collect()
[Row(stringLengthString(test)=u'4')]

>>> from pyspark.sql.types import IntegerType >>> SparkSession.catalog.registerFunction("stringLengthInt", lambda x: len(x), IntegerType()) >>> SparkSession.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)]

>>> from pyspark.sql.types import IntegerType >>> SparkSession.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> SparkSession.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)]

 

SparkSession.catalog.listDatabases:

列舉目前的目前可以讀取的數據庫

 

SparkSession.catalog.listTables:

列舉目前的目前可以讀取的表

 

class pyspark.sql.DataFrame(jdf, sql_ctx)
前面說完了 SparkSession 現在這個 df 類,是我們在 python 當中操縱的對象。df 就相當於我們在 py 里面獲得的一個被映射的表一樣。同樣 spark.sql 提供給了我們非常豐富的方法。
 
DataFrame().alias(alias):
重命名 df 
>>> from pyspark.sql.functions import *
>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
>>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
>>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()
[Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)]

 

DataFrame().checkpoint(eager=True):

返回這個數據集的檢查點版本,檢查點可以用來截斷這個DataFrame的邏輯計划,這在計划可能呈指數增長的迭代算法中特別有用。它將保存到SparkContext.setCheckpointDir()設置的檢查點目錄中的文件中。

 

DataFrame().collect():

返回所有記錄的 list 。

>>> df.collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

 

DataFrame().columns():

以 list 的格式返回所有列名

>>> df.columns
['age', 'name']

 

DataFrame().count():

返回 df 所有記錄一共多少條

>>> df.count()
2

 

DataFrame().createGlobalTempView():

通過 df 創建一個全局的臨時表。他的生命周期跟 spark 應用的生命周期相關聯。如果視圖中已經存在這個名字了會拋出 TempTableAlreadyExistsException 的錯誤。

>>> df.createGlobalTempView("people")
>>> df2 = spark.sql("select * from global_temp.people")
>>> sorted(df.collect()) == sorted(df2.collect())
True
>>> df.createGlobalTempView("people")  
Traceback (most recent call last):
...
AnalysisException: u"Temporary table 'people' already exists;"
>>> spark.catalog.dropGlobalTempView("people")

這個跟之前談到的 df 方法 createOrReplaceTempView 非常類似,createOrReplaceTempView 這個生命周期是跟隨 SparkSession 這個生命周期的。

 

DataFrame().createOrReplaceGlobalTempView():

這個對比 createGlobalTempView 不同的點是不會拋出 TempTableAlreadyExistsException 的錯誤,會直接替換。

 

DataFrame().createTempView():

這個對比 createOrReplaceTempView 如果表已經存在則會拋出 TempTableAlreadyExistsException 的錯誤。

 

DataFrame().crossJoin():

和另外一個 df 取笛卡爾積例如

>>> df.select("age", "name").collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df2.select("name", "height").collect() [Row(name=u'Tom', height=80), Row(name=u'Bob', height=85)]
>>> df.crossJoin(df2.select("height")).select("age", "name", "height").collect() [Row(age=2, name=u'Alice', height=80), Row(age=2, name=u'Alice', height=85), Row(age=5, name=u'Bob', height=80), Row(age=5, name=u'Bob', height=85)]

 

DataFrame().describe():

為數字和字符串計算統計數據,例如:

>>> df.describe(['age']).show() +-------+------------------+ |summary| age| +-------+------------------+ | count| 2| | mean| 3.5| | stddev|2.1213203435596424| | min| 2| | max| 5| +-------+------------------+ >>> df.describe().show() +-------+------------------+-----+ |summary| age| name| +-------+------------------+-----+ | count| 2| 2| | mean| 3.5| null| | stddev|2.1213203435596424| null| | min| 2|Alice| | max| 5| Bob| +-------+------------------+-----+

 

DataFrame().distinct():

返回新的 df ,這個新的 df 不包含重復的數據

>>> df.distinct().count()
2

 

DataFrame().drop():

返回一個新的 df 丟棄掉指定的列, 如果模式不包含給定的列名,則此模式為 no-op(no operation),不刪除任何列。

>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]

>>> df.join(df2, 'name', 'inner').drop('age', 'height').collect()
[Row(name=u'Bob')]

 

DataFrame().dropDuplicates(subset=None):

返回一個新的 df ,這個 df 里面不再有重復的記錄。可選參數可以讓我們選擇關心的字段進行去重。

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show() +---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| | 10| 80|Alice| +---+------+-----+
>>> df.dropDuplicates(['name', 'height']).show() +---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| +---+------+-----+

 

DataFrame().dropna(how='any'thresh=Nonesubset=None): 

返回一個省略了 null 值的行 DataFrame.dropna() 和 DataFrameNaFunctions.drop() 互為同名函數。他的 option 參數

how: any or all ,any 代表列中有任意數據為空就會被過濾,all 代表所有列為空才會被過濾。

thresh: 指定被過濾的空值個數,這個值會覆蓋 how 參數的指定。比如 thresh=3 如果小於3個 thresh 則會被過濾。

subset: 指定某些列被考慮在其中。

 

DataFrame().dtypes:

返回所有列名和其類型

>>> df.dtypes
[('age', 'int'), ('name', 'string')]

 

DataFrame().fillna(valuesubset=None):

替換空值,和na.fill() 互為同名函數。

value: 替換的值,可以是 int, long, float, string, or dict,如果是 dict 的話 key 應當是列值, value 應該是空值的替換值,如果是 dict 則 subset 不生效。

subset: 指定需要忽略替換的列。

>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10|    80|Alice|
|  5|    50|  Bob|
| 50|    50|  Tom|
| 50|    50| null|
+---+------+-----+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show() +---+------+-------+ |age|height| name| +---+------+-------+ | 10| 80| Alice| | 5| null| Bob| | 50| null| Tom| | 50| null|unknown| +---+------+-------+

 

DataFrame().filter(condition):

使用條件過濾 df records,where 和 filter 是同名函數。

>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]

>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]

>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]

>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]

 

DataFrame().first:

返回第一條 df

 

DataFrame().foreach(f):

定義一個函數,會讓每個 df 都執行該函數

>>> def f(person):
...     print(person.name)
>>> df.foreach(f)

 

DataFrame().foreachPartition(f):

定義一個函數,會讓每個 partitions 都執行這個函數

>>> def f(people):
...     for person in people:
...         print(person.name)
>>> df.foreachPartition(f)

 

DataFrame().groupBy(*col):

是 group_by 的同名函數,可以使用 agg 方法對其進行各種各樣的聚合, spark sql 專門有個類為其提供了非常多的處理函數。See GroupedData for all the available aggregate functions.

>>> df.groupBy().avg().collect()
[Row(avg(age)=3.5)]
>>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
[Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(df.name).avg().collect()) [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
>>> sorted(df.groupBy(['name', df.age]).count().collect()) [Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)]

 

DataFrame().head(n=None):

返回頭幾個 df 默認情況是 1 如果超過1 會返回一個列表。

>>> df.head()
Row(age=2, name=u'Alice')
>>> df.head(1) [Row(age=2, name=u'Alice')]

 

DataFrame().hint(name*parameters):

給當前的 df 加上一些 hint 

>>> df.join(df2.hint("broadcast"), "name").show()
+----+---+------+
|name|age|height|
+----+---+------+
| Bob|  5|    85|
+----+---+------+

 

DataFrame().join(otheron=Nonehow=None):

使用給定的條件和其他 df 進行 join。

other: 另外一個 df

on: a string for the join column name, a list of column names, a join expression (Column), or a list of Columns. If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.

how: str, default inner. Must be one of: innercrossouterfullfull_outerleftleft_outerrightright_outerleft_semi, and left_anti.

>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
>>> df.join(df2, 'name', 'outer').select('name', 'height').collect() [Row(name=u'Tom', height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
>>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.join(df2, 'name').select(df.name, df2.height).collect() [Row(name=u'Bob', height=85)]
>>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect() [Row(name=u'Bob', age=5)]

 

DataFrame().limit:

限制拿多少條

 

DataFrame().na:

返回類  DataFrameNaFunctions 用於處理空值

 

DataFrame().orderBy(*cols**kwargs):

返回一個被指定 col 排序好的 df

>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.orderBy(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> from pyspark.sql.functions import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

 

DataFrame().printSchema(): 

打印出該 df 的 schema

>>> df.printSchema()
root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)

 

DataFrame().rdd():

將 df 轉換成 rdd 返回

 

DataFrame().registerTempTable():

將 df 注冊成一個臨時生命周期是 SparkSession 的周期,這個跟上面  createOrReplaceTempView 互為同名函數,就是調用該函數生成的。

 

DataFrame().repartition(numPartitions*cols):

返回一個新的 df,這個新的 df 被給定的 numPartitions 數量進行 hash 重分區。numPartitions可以是指定分區或列的目標數量的int。如果它是一個列,它將被用作第一個分區列。如果沒有指定,則使用默認分區數。

>>> df.repartition(10).rdd.getNumPartitions()
10
>>> data = df.union(df).repartition("age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  5|  Bob|
|  2|Alice|
|  2|Alice|
+---+-----+
>>> data = data.repartition(7, "age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
|  2|Alice|
|  5|  Bob|
|  2|Alice|
|  5|  Bob|
+---+-----+
>>> data.rdd.getNumPartitions()
7
>>> data = data.repartition("name", "age")
>>> data.show()
+---+-----+
|age| name|
+---+-----+
|  5|  Bob|
|  5|  Bob|
|  2|Alice|
|  2|Alice|
+---+-----+

 

DataFrame().replace(to_replacevalue=Nonesubset=None):

返回一個 df 用參數位置2的值替換掉參數位置是1的值。DataFrame.replace() and DataFrameNaFunctions.replace()互為同名函數。

>>> df4.na.replace(10, 20).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
|  20|    80|Alice|
|   5|  null|  Bob|
|null|  null|  Tom|
|null|  null| null|
+----+------+-----+

>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+------+----+
| age|height|name|
+----+------+----+
|  10|    80|   A|
|   5|  null|   B|
|null|  null| Tom|
|null|  null|null|
+----+------+----+

 

DataFrame().select(*cols):

返回 select 能找到的數據

>>> df.select('*').collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.select('name', 'age').collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.select(df.name, (df.age + 10).alias('age')).collect() [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]

 

DataFrame().show(n=20truncate=True):

打印前 n 行數據到控制台

>>> df
DataFrame[age: int, name: string]
>>> df.show() +---+-----+ |age| name| +---+-----+ | 2|Alice| | 5| Bob| +---+-----+
>>> df.show(truncate=3) +---+----+ |age|name| +---+----+ | 2| Ali| | 5| Bob| +---+----+

 

DataFrame().sort(*cols**kwargs):

根據給定的 cols 進行排序之后返回新的 df

>>> df.sort(df.age.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(df.age.desc()).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from pyspark.sql.functions import * >>> df.sort(asc("age")).collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]

 

DataFrame().sort(*cols**kwargs):

df 查看 df 的存儲級別

>>> df.storageLevel
StorageLevel(False, False, False, False, 1)
>>> df.cache().storageLevel StorageLevel(True, True, False, True, 1)
>>> df2.persist(StorageLevel.DISK_ONLY_2).storageLevel StorageLevel(True, False, False, False, 2)

存儲級別又分為

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

 

DataFrame().subtract(ohter):

取目標 df ohter 和 df 的補集。

This is equivalent to EXCEPT in SQL.

 

DataFrame().take(num)
返回 df 前 num 條數據
>>> df.take(2)
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]

 

DataFrame().toDF(*cols)

返回新的 df 根據 list 字段順序

>>> df.toDF('f1', 'f2').collect()
[Row(f1=2, f2=u'Alice'), Row(f1=5, f2=u'Bob')]

 

DataFrame().toJSON(use_unicode=True):

>>> df.toJSON().first()
u'{"age":2,"name":"Alice"}'

 

DataFrame().toLocalIterator():

返回一個包含所有 df row 的迭代器,迭代器將消耗與此DataFrame中最大分區相同的內存。

 

DataFrame().toPandas():

將目前的 df 轉換成 pandas 的 df 

 

DataFrame().union(other):

DataFrame().unionAll(other) 這個和 union 互為同名函數。2.0 版本統一使用 union 函數了。 union 函數是 sql 函數 unionall 函數的功能。

 

DataFrame().withColumn(colNamecol):

返回一個新的 df 根據給定的 colName 和 col 本身增加一列。

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]

 

DataFrame().withColumnRenamed(existingnew):

返回一個新的 df 重命名現在已有的一個 col 名稱。

>>> df.withColumnRenamed('age', 'age2').collect()
[Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]

 

DataFrame().write():

非流接口向外部存儲寫入數據

 

DataFrame().writeStream():

流接口向外部存儲寫入數據

 

 

Reference:

https://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html  pyspark.sql module

https://blog.csdn.net/do_yourself_go_on/article/details/74739260


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM