《Spark Python API 官方文檔中文版》 之 pyspark.sql (三)


摘要:在Spark開發中,由於需要用Python實現,發現API與Scala的略有不同,而Python API的中文資料相對很少。每次去查英文版API的說明相對比較慢,還是中文版比較容易get到所需,所以利用閑暇之余將官方文檔翻譯為中文版,並親測Demo的代碼。在此記錄一下,希望對那些對Spark感興趣和從事大數據開發的人員提供有價值的中文資料,對PySpark開發人員的工作和學習有所幫助。

官網地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html            

pyspark.sql module

Module Context

Spark SQL和DataFrames重要的類有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 將分布式數據集分組到指定列名的數據框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame數據的行
pyspark.sql.HiveContext 訪問Hive數據的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()創建的聚合方法集
pyspark.sql.DataFrameNaFunctions 處理丟失數據(空數據)的方法
pyspark.sql.DataFrameStatFunctions 統計功能的方法
pyspark.sql.functions DataFrame可用的內置函數
pyspark.sql.types 可用的數據類型列表
pyspark.sql.Window 用於處理窗口函數

4.class pyspark.sql.GroupedData(jdf, sql_ctx)

由DataFrame.groupBy()創建的DataFrame上的一組聚合方法。

4.1 agg(*exprs)

計算聚合並將結果作為DataFrame返回。
可用的集合函數是avg,max,min,sum,count。
如果exprs是從字符串到字符串的單個字典映射,那么鍵是要執行聚合的列,值是聚合函數。
另外,exprs也可以是聚合列表達式的列表。
參數:●  exprs – 從列名(字符串)到聚集函數(字符串)的字典映射或列的列表。

>>> gdf = df.groupBy(df.name)
>>> gdf.agg({"*": "count"}).collect()
[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
>>> from pyspark.sql import functions as F
>>> gdf.agg(F.min(df.age)).collect()
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]

4.2 avg(*args)

計算每個組的每個數字列的平均值。
mean()是avg()的別名。
參數:●  cols – 列名稱列表(字符串),非數字列被忽略。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.groupBy().avg('age').collect()
[Row(avg(age)=3.5)]
>>> l3=[('Alice',2,85),('Bob',5,80)]
>>> df3 = sqlContext.createDataFrame(l3,['name','age','height'])
>>> df3.groupBy().avg('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]

4.3 count()

統計每個組的記錄數。

>>> df.groupBy(df.age).count().collect()
[Row(age=2, count=1), Row(age=5, count=1)] 

4.4 max(*args)

計算每個組的每個數字列的最大值。

>>> df.groupBy().max('age').collect()
[Row(max(age)=5)]
>>> df3.groupBy().max('age', 'height').collect()
[Row(max(age)=5, max(height)=85)]

4.5 mean(*args)

計算每個組的每個數字列的平均值。
mean()是avg()的別名。
參數:●  cols – 列名稱列表(字符串),非數字列被忽略。

>>> df.groupBy().mean('age').collect()
[Row(avg(age)=3.5)]
>>> df3.groupBy().mean('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]

4.6 min(*args)

計算每個組的每個數字列的最小值。
參數:●  cols – 列名稱列表(字符串),非數字列被忽略。

>>> df.groupBy().min('age').collect()
[Row(min(age)=2)]
>>> df3.groupBy().min('age', 'height').collect()
[Row(min(age)=2, min(height)=80)]

4.7 pivot(pivot_col, values=None)

旋轉當前[[DataFrame]]的列並執行指定的聚合。 有兩個版本的透視函數:一個需要調用者指定不同值的列表以進行透視,另一個不需要。 后者更簡潔但效率更低,因為Spark需要首先在內部計算不同值的列表。
參數:●  pivot_col – 要旋轉的列的名稱。
      ●  values – 將被轉換為輸出DataFrame中的列的值的列表。

// 計算每個課程每年的收入總和作為一個單獨的列
>>> l4=[(2012,'dotNET',10000),(2012,'dotNET',5000),(2012,'Java',20000),(2013,'dotNET',48000),(2013,'Java',30000)]
>>> df4 = sqlContext.createDataFrame(l4,['year','course','earnings'])
>>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() 
[Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]
// 或者不指定列值(效率較低)
>>> df4.groupBy("year").pivot("course").sum("earnings").collect() 
[Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]

4.8 sum(*args) 

計算每個組的每個數字列的總和。
參數:●  cols – 列名稱列表(字符串),非數字列被忽略。

>>> df.groupBy().sum('age').collect()
[Row(sum(age)=7)]
>>> df3.groupBy().sum('age', 'height').collect()
[Row(sum(age)=7, sum(height)=165)]

5.class pyspark.sql.Column(jc)

DataFrame中的一列。
列實例可以通過以下方式創建:

# 1. Select a column out of a DataFrame
df.colName
df["colName"]
# 2. Create from an expression
df.colName + 1
1 / df.colName

5.1 alias(*alias)

使用新名稱返回此列的別名(在返回多個列的表達式情況下如explode)。

>>> df.select(df.age.alias("age2")).collect()
[Row(age2=2), Row(age2=5)]

5.2 asc()

基於給定列名稱的升序返回一個排序表達式。

5.3 astype(dataType)

將列轉換為dataType類型。

>>> df.select(df.age.astype("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> from pyspark.sql.types import StringType
>>> df.select(df.age.astype(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]

5.4 between(lowerBound, upperBound)

一個布爾表達式,如果此表達式的值位於給定列之間,則該表達式的值為true。

>>> df.select(df.name, df.age.between(2, 4)).show()
+-----+--------------------------+
| name|((age >= 2) && (age <= 4))|
+-----+--------------------------+
|Alice|                      true|
|  Bob|                     false|
+-----+--------------------------+

5.5 bitwiseAND(other)

二元運算符

5.6 bitwiseOR(other)

二元運算符

5.7 bitwiseXOR(other)

二元運算符

5.8 cast(dataType)

將列轉換為dataType類型。

>>> df.select(df.age.cast("string").alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]
>>> df.select(df.age.cast(StringType()).alias('ages')).collect()
[Row(ages=u'2'), Row(ages=u'5')]

5.9 desc()

基於給定列名稱的降序返回一個排序表達式。

5.10 endswith(other)

二元運算符

5.11 getField(name) 

在StructField中通過名稱獲取字段的表達式。

>>> from pyspark.sql import Row
>>> df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
>>> df.select(df.r.getField("b")).show()
+----+
|r[b]|
+----+
|   b|
+----+
>>> df.select(df.r.a).show()
+----+
|r[a]|
+----+
|   1|
+----+

5.12 getItem(key)

從列表中獲取位置序號項,或者通過字典的key獲取項的表達式。

>>> df = sc.parallelize([([1, 2], {"key": "value"})]).toDF(["l", "d"])
>>> df.select(df.l.getItem(0), df.d.getItem("key")).show()
+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+
>>> df.select(df.l[0], df.d["key"]).show()
+----+------+
|l[0]|d[key]|
+----+------+
|   1| value|
+----+------+

5.13 inSet(*cols)

一個布爾表達式,如果此表達式的值由參數的評估值包含,則該值被評估為true。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df[df.name.inSet("Bob", "Mike")].collect()
[Row(name=u'Bob', age=5)]
>>> df[df.age.inSet([1, 2, 3])].collect()
[Row(name=u'Alice', age=2)]

注:在1.5中已過時,用Column.isin()代替。

5.14 isNotNull()

如果當前表達式不為null,則為真。

5.15 isNull()

如果當前表達式為null,則為真。

5.16 isin(*cols)

一個布爾表達式,如果此表達式的值由參數的評估值包含,則該值被評估為true。

>>> df[df.name.isin("Bob", "Mike")].collect()
[Row(name=u'Bob', age=5)]
>>> df[df.age.isin([1, 2, 3])].collect()
[Row(name=u'Alice', age=2)]

5.17 like(other)

二元運算符

5.18 otherwise(value)

評估條件列表並返回多個可能的結果表達式之一。 如果不調用Column.otherwise(),則不匹配條件返回None。
例如,請參閱pyspark.sql.functions.when()
參數:● value – 一個文字值或一個Column表達式。

>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+---------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0|
+-----+---------------------------------+
|Alice|                                0|
|  Bob|                                1|
+-----+---------------------------------+

5.19 over(window)

定義一個窗口列。
參數:window – 一個WindowSpec
返回:一列
注:Window方法僅再HiveContext1.4支持。

5.20 rlike(other)

二元運算符

5.21 startswith(other)

二元運算符

5.22 substr(startPos, length)

返回一個新列,它是列的一個子字符串。
參數:● startPos – 其實位置 (int或者Column)
           ● length – 子串的長度(int或者Column)

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.select(df.name.substr(1, 3).alias("col")).collect()
[Row(col=u'Ali'), Row(col=u'Bob')]

5.23 when(condition, value)

評估條件列表並返回多個可能的結果表達式之一。 如果不調用Column.otherwise(),則不匹配條件返回None。
例如,請參閱pyspark.sql.functions.when()。
參數:● condition – 一個布爾類型的列表達式。
           ● value – 一個文字值或一個列表達式。

>>> from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+--------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0|
+-----+--------------------------------------------------------+
|Alice|                                                      -1|
|  Bob|                                                       1|
+-----+--------------------------------------------------------+

6. class pyspark.sql.Row

DataFrame中的一行,其中的字段可以像屬性一樣訪問。
Row可以用來通過使用命名參數來創建一個行對象,字段將按名稱排序。

>>> from pyspark.sql import Row
>>> row = Row(name="Alice", age=11)
>>> row
Row(age=11, name='Alice')
>>> row['name'], row['age']
('Alice', 11)
>>> row.name, row.age
('Alice', 11)

Row也可以用來創建另一個Row像類一樣,然后它可以被用來創建Row對象,比如

>>> Person = Row("name", "age")
>>> Person
<Row(name, age)>
>>> Person("Alice", 11)
Row(name='Alice', age=11)

6.1 asDict(recursive=False)

作為字典返回
參數:● recursive – 將嵌套的Row轉換為字典(默認值:False)。

>>> Row(name="Alice", age=11).asDict() == {'name': 'Alice', 'age': 11}
True
>>> row = Row(key=1, value=Row(name='a', age=2))
>>> row.asDict() == {'key': 1, 'value': Row(age=2, name='a')}
True
>>> row.asDict(True) == {'key': 1, 'value': {'name': 'a', 'age': 2}}
True

7. class pyspark.sql.DataFrameNaFunctions(df)

在DataFrame中處理丟失的數據的功能。

7.1 drop(how='any', thresh=None, subset=None)

返回一個新的DataFrame,省略含有空值的行。DataFrame.dropna()和 DataFrameNaFunctions.drop()是彼此的別名。
參數:● how – 'any'或者'all'.如果為'any', 如果它包含任何空值,則丟掉一行。如果為'all',只有當它的所有值都為空時才丟掉一行。
           ● thresh – 默認值為None,如果指定為int,刪除小於閾值的非空值的行。 這將覆蓋how參數。
           ● subset – 要考慮的列名的可選列表。

>>> l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]
>>> df4 = sqlContext.createDataFrame(l4,['name','age','height'])
>>> df4.na.drop().show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 10|    80|
+-----+---+------+

7.2 fill(value, subset=None)

DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.
替換null值,是na.fill()的別名。 DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的別名。
參數:● value – 整形,長整形,浮點型,字符串,或者字典。用來替換空值的值。如果值是字典,則subset將被忽略,值必須是從列名(字符串)到要替換值的映射。替換值必須是整形,長整形,浮點型或字符串。
           ● subset – 要替換的列名的可選列表。在subset指定的列,如果不具有匹配的數據類型會被忽略。例如,如果value是一個字符串,並且subset包含一個非字符串列,那么非字符串列將被忽略。

>>> df4.na.fill(50).show()
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 10|    80|
|  Bob|  5|    50|
|  Tom| 50|    50|
| null| 50|    50|
+-----+---+------+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
+-------+---+------+
|   name|age|height|
+-------+---+------+
|  Alice| 10|    80|
|    Bob|  5|  null|
|    Tom| 50|  null|
|unknown| 50|  null|
+-------+---+------+

7.3 replace(to_replace, value, subset=None)

返回用另外一個值替換了一個值的新的DataFrame。DataFrame.replace() 和 DataFrameNaFunctions.replace()是彼此的別名。
參數:● to_replace – 整形,長整形,浮點型,字符串,或者列表。要替換的值。如果值是字典,那么值會被忽略,to_replace必須是一個從列名(字符串)到要替換的值的映射。要替換的值必須是一個整形,長整形,浮點型,或者字符串。
          ● value – 整形,長整形,浮點型,字符串或者列表。要替換為的值。要替換為的值必須是一個整形,長整形,浮點型,或者字符串。如果值是列表或者元組,值應該和to_replace有相同的長度。
          ● subset – 要考慮替換的列名的可選列表。在subset指定的列如果沒有匹配的數據類型那么將被忽略。例如,如果值是字符串,並且subset參數包含一個非字符串的列,那么非字符串的列被忽略。

>>> l4=[('Alice',10,80),('Bob',5,None),('Tom',None,None),(None,None,None)]
>>> df4 = sqlContext.createDataFrame(l4,['name','age','height'])
>>> df4.na.replace(10, 20).show()
+-----+----+------+
| name| age|height|
+-----+----+------+
|Alice|  20|    80|
|  Bob|   5|  null|
|  Tom|null|  null|
| null|null|  null|
+-----+----+------+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
+----+----+------+
|name| age|height|
+----+----+------+
|   A|  10|    80|
|   B|   5|  null|
| Tom|null|  null|
|null|null|  null|
+----+----+------+

8. class pyspark.sql.DataFrameStatFunctions(df)

DataFrame的統計函數的功能。

8.1 corr(col1, col2, method=None)

以雙精度值計算DataFrame的兩列的相關性。目前只支持皮爾森相關系數. DataFrame.corr() and DataFrameStatFunctions.corr() 互為別名。

參數:● col1 – 第一列的名稱
           ● col2 – 第二列的名稱
           ● method – 相關方法,目前只支持“皮爾森”

8.2 cov(col1, col2)

計算給定列的樣本協方差(由它們的名稱指定)作為雙精度值。DataFrame.cov() and DataFrameStatFunctions.cov() 互為別名。

參數:● col1 – 第一列的名稱
           ● col2 – 第二列的名稱

8.3 crosstab(col1, col2)

計算給定列的成對頻率表. 也被稱為應急表. 每列的去重后不同值的數量應小於1e4. 最多1e6非零對頻率將被返回. 每行的第一列將是col1的不同值,列名將是col2的不同值.第一列的名稱應該為$col1_$col2. 沒有出現的對數將為零. DataFrame.crosstab() and DataFrameStatFunctions.crosstab() 互為別名

參數:● col1 – 第一列的名稱. 去重項將成為每一行的第一項。
           ● col2 – 第二列的名稱. 去重項將成為DataFrame的列名稱。

8.4 freqItems(cols, support=None)

找到列的頻繁項,可能有誤差。使用“http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”中描述的頻繁元素計數算法。 DataFrame.freqItems() and DataFrameStatFunctions.freqItems()互為別名。

注:此功能用於探索性數據分析,因為我們不保證所生成的DataFrame的模式的向后兼容性。
參數:● cols – 用於計算頻繁項的列的名稱,為字符串的列表或元組。
           ● support –“頻繁”項目的頻率。 默認值是1%,必須大於1e-4。

8.5 sampleBy(col, fractions, seed=None)

根據每層上給出的分數返回一個沒有更換的分層樣本。
參數:● col – 定義分層的列
           ● fractions – 每層的抽樣比例,如果沒有指定層,我們將其分數視為零。
           ● seed – 隨機值
返回: 一個代表分層樣本的新DataFrame

>>> from pyspark.sql.functions import col
>>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key"))
>>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0)
>>> sampled.groupBy("key").count().orderBy("key").show()
+---+-----+
|key|count|
+---+-----+
|  0|    5|
|  1|    9|
+---+-----+

9. class pyspark.sql.Window

用於在DataFrame中定義窗口的實用函數。
例如:

>>> # PARTITION BY country ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
>>> window = Window.partitionBy("country").orderBy("date").rowsBetween(-sys.maxsize, 0)

>>> # PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
>>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)

9.1 static orderBy(*cols)

用定義的順序創建一個WindowSpec。

9.2 static partitionBy(*cols)

用定義的分區創建一個WindowSpec。

10. class pyspark.sql.WindowSpec(jspec)

定義分區,排序和框邊界的窗口規范。
使用Window中的靜態方法創建一個WindowSpec

10.1 orderBy(*cols)

定義WindowSpec中的排序列。
參數:● cols – 列或表達式的名稱

10.2 partitionBy(*cols)

定義WindowSpec中的分區列。
參數:● cols – 列或表達式的名稱

10.3 rangeBetween(start, end)

定義從開始(包含)到結束(包含)的框邊界。
start, end都是相對於當前行。 例如,“0”表示“當前行”,而“-1”表示在當前行之前一次,“5”表示當前行之后五次關閉。
參數:● start – 開始邊界(包括)。 如果這是-sys.maxsize(或更低),則該框架是無限的。
           ● end – 結束邊界(包括)。如果這是sys.maxsize(或更高),則該框架是無限的。

10.4 rowsBetween(start, end)

定義從開始(包含)到結束(包含)的框邊界。
start, end都是相對於當前行。 例如,“0”表示“當前行”,而“-1”表示在當前行之前一次,“5”表示當前行之后五次關閉。
參數:● start – 開始邊界(包括)。 如果這是-sys.maxsize(或更低),則該框架是無限的。
           ● end – 結束邊界(包括)。如果這是sys.maxsize(或更高),則該框架是無限的。

11. class pyspark.sql.DataFrameReader(sqlContext)

用於從外部存儲系統(例如文件系統,鍵值存儲等)加載DataFrame的接口。 使用SQLContext.read()來訪問這個。

11.1 format(source)

指定輸入數據源格式。
參數:● source – string,數據源名稱,例如:'json','parquet'。

people.json文件內容:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

>>> df = sqlContext.read.format('json').load('/test/people.json') 
>>> df.dtypes
[('age', 'bigint'), ('name', 'string')]

11.2 jdbc

(url,table,column=None,lowerBound=None,upperBound=None,numPartitions=None,predicates=None,properties=None)
構建一個DataFrame表示通過JDBC URL url命名的table和連接屬性連接的數據庫表。
column參數可用於對表進行分區,然后根據傳遞給此函數的參數並行檢索它。
predicates參數給出了一個適合包含在WHERE子句中的列表表達式; 每一個都定義了DataFrame的一個分區。
注:不要在大型集群上並行創建太多分區; 否則Spark可能會使外部數據庫系統崩潰。

參數:● url – 一個JDBC URL
           ● table – 表名稱
      ● column – 用於分區的列
      ● lowerBound – 分區列的下限
      ● upperBound – 分區列的上限
      ● numPartitions – 分區的數量
      ● predicates – 表達式列表
      ● properties – JDBC數據庫連接參數,任意字符串的標簽/值的列表。通常至少應該包括一個“用戶”和“密碼”屬性。
返回 : 一個DataFrame

11.3 json(path, schema=None)

加載一個JSON文件(每行一個對象)或一個存儲JSON對象的字符串RDD(每個記錄一個對象),並返回結果為:class`DataFrame`。
如果未指定schema參數,則此函數會經過一次輸入以確定輸入模式。
參數:● path - 字符串表示JSON數據集的路徑,或者存儲JSON對象的字符串的RDD
           ● schema – 輸入模式的可選StructType。
你可以設置以下特定於JSON的選項來處理非標准的JSON文件:
* primitivesAsString (默認false): 將所有原始值推斷為字符串類型
* allowComments (默認false): 忽略JSON記錄中的Java / C++樣式注釋
* allowUnquotedFieldNames (默認false): 允許未加引號的JSON字段名稱
* allowSingleQuotes (默認true): 允許除雙引號外的單引號
* allowNumericLeadingZeros (默認false): 允許數字中的前導零(例如00012)

>>> df1 = sqlContext.read.json('/test/people.json')
>>> df1.dtypes
[('age', 'bigint'), ('name', 'string')]
>>> rdd = sc.textFile('/test/people.json')
>>> df2 = sqlContext.read.json(rdd)
>>> df2.dtypes
[('age', 'bigint'), ('name', 'string')]

11.4 load(path=None, format=None, schema=None, **options)

從數據源加載數據並將其作為:class`DataFrame`返回。
參數:● path - 可選字符串或文件系統支持的數據源的字符串列表
   ● format – 數據源格式的可選字符串。 默認為“parquet”
           ● schema – 輸入模式的可選StructType。
           ● options – 所有其他字符串選項。

注:parquet_partitioned文件夾路徑為:spark-1.6.2-bin-hadoop2.6\python\test_support\sql\parquet_partitioned
       people.json和people1.json文件路徑為:spark-1.6.2-bin-hadoop2.6\python\test_support\sql

>>> df = sqlContext.read.load('/test/parquet_partitioned', opt1=True,opt2=1, opt3='str')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
>>> df = sqlContext.read.format('json').load(['/test/people.json','/test/people1.json'])
>>> df.dtypes
[('age', 'bigint'), ('aka', 'string'), ('name', 'string')]

11.5 option(key, value)

為基礎數據源添加一個輸入選項。

11.6 options(**options)

為基礎數據源添加多個輸入選項。

11.7 orc(path)

加載ORC文件,將結果作為DataFrame返回。
注:目前ORC支持只能與HiveContext一起使用。

11.8 parquet(*paths)

加載parquet文件, 將結果作為DataFrame返回。

>>> df = sqlContext.read.parquet('/test/parquet_partitioned')
>>> df.dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]

11.9 schema(schema)

指定輸入的schema.
某些數據源(例如JSON)可以從數據自動推斷輸入模式。通過在這里指定模式,底層數據源可以跳過模式推斷步驟,從而加速數據加載。
參數:● schema – 一個StructType對象

11.10 table(tableName)

以DataFrame的形式返回指定的表。
參數:● tableName – 字符串的表名稱

>>> df = sqlContext.read.parquet('/test/parquet_partitioned')
>>> df.registerTempTable('tmpTable')
>>> sqlContext.read.table('tmpTable').dtypes
[('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]

11.11 text(paths)

加載一個文本文件並返回一個名為"value"的單個字符串列的[[DataFrame]]。
文本文件中的每一行都是生成的DataFrame中的新行。
參數:●  paths – 字符串或字符串列表,用於輸入路徑。

>>> df = sqlContext.read.text('/test/text-test.txt')
>>> df.collect()
[Row(value=u'hello'), Row(value=u'this')]

12. class pyspark.sql.DataFrameWriter(df)

用於將[[DataFrame]]寫入外部存儲系統(例如文件系統,鍵值存儲等)的接口。使用DataFrame.write()來訪問這個。

12.1 format(source)

指定基礎輸出數據源。
參數:●  source – 字符串,數據源的名稱,例如 'json','parquet'。

>>> df.write.format('json').save(os.path.join(tempfile.mkdtemp(), 'data'))

12.2 insertInto(tableName, overwrite=False)

將DataFrame的內容插入到指定的表中。
它要求DataFrame類的架構與表的架構相同。
可以覆蓋任何現有的數據。

12.3 jdbc(url, table, mode=None, properties=None)

通過JDBC將DataFrame的內容保存到外部數據庫表中。
注:不要在大型集群上並行創建太多分區; 否則Spark可能會使外部數據庫系統崩潰。
參數:● url – 一個形式為jdbc:subprotocol:subname的JDBC URL
   ● table – 外部數據庫中表的名稱。
           ● mode – 指定數據已經存在時保存操作的行為:
      ● append: 將此DataFrame的內容附加到現有數據。
      ● overwrite: 覆蓋現有數據。
      ● ignore: 如果數據已經存在,靜默地忽略這個操作。
   ● error (默認): 如果數據已經存在,則拋出異常。
      ● properties – JDBC數據庫連接參數,任意字符串標簽/值的列表。 通常至少應該包括一個“用戶”和“密碼”屬性。

12.4 json(path, mode=None)

以指定的路徑以JSON格式保存DataFrame的內容。
參數:● path – 任何Hadoop支持的文件系統中的路徑。
           ● mode –指定數據已經存在時保存操作的行為。
           ● append: 將此DataFrame的內容附加到現有數據。
           ● overwrite: 覆蓋現有數據。
           ● ignore: 如果數據已經存在,靜默地忽略這個操作。
           ● error (默認): 如果數據已經存在,則拋出異常。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.write.json('file:///data/dfjson')
[root@slave1 dfjson]# ll
total 8
-rw-r--r-- 1 root root  0 Nov 24 12:08 part-r-00000-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 25 Nov 24 12:08 part-r-00001-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root  0 Nov 24 12:08 part-r-00002-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root 23 Nov 24 12:08 part-r-00003-edbd9c5e-87b2-41f4-81ba-cd59c8ca490e
-rw-r--r-- 1 root root  0 Nov 24 12:08 _SUCCESS
[root@slave1 dfjson.json]# cat part*
{"name":"Alice","age":2}
{"name":"Bob","age":5}

12.5 mode(saveMode)

指定數據或表已經存在的行為。
選項包括:
  append: 將此DataFrame的內容附加到現有數據。
  overwrite: 覆蓋現有數據。
  error: 如果數據已經存在,則拋出異常。
  ignore: 如果數據已經存在,靜默地忽略這個操作。

>>> df.write.mode('append').parquet(os.path.join(tempfile.mkdtemp(), 'data'))

12.6 option(key, value)

 添加一個底層數據源的輸出選項。

12.7 options(**options)

添加底層數據源的多個輸出選項。

12.8 orc(path, mode=None, partitionBy=None)

以指定的路徑以ORC格式保存DataFrame的內容。
注:目前ORC支持只能與HiveContext一起使用。
參數:● path – 任何Hadoop支持的文件系統中的路徑。
           ● mode –指定數據已經存在時保存操作的行為:
      append: 將此DataFrame的內容附加到現有數據。
      overwrite: 覆蓋現有數據。
      ignore: 如果數據已經存在,靜默地忽略這個操作。
      error (默認): 如果數據已經存在,則拋出異常。
           ● partitionBy – 分區列的名稱。

>>> orc_df = hiveContext.read.orc('python/test_support/sql/orc_partitioned')
>>> orc_df.write.orc(os.path.join(tempfile.mkdtemp(), 'data'))

12.9 parquet(path, mode=None, partitionBy=None)

將DataFrame的內容以Parquet格式保存在指定的路徑中。
參數:● path – 任何Hadoop支持的文件系統中的路徑。
           ● mode – 指定數據已經存在時保存操作的行為。
      append: 將此DataFrame的內容附加到現有數據。
      overwrite: 覆蓋現有數據。
      ignore: 如果數據已經存在,靜默地忽略這個操作。
      error (默認): 如果數據已經存在,則拋出異常。
     ● partitionBy – 分區列的名稱。

>>> df.write.parquet("file:///data/dfparquet")

[root@slave1 dfparquet]# ll
total 24
-rw-r--r-- 1 root root 285 Nov 24 12:23 _common_metadata
-rw-r--r-- 1 root root 750 Nov 24 12:23 _metadata
-rw-r--r-- 1 root root 285 Nov 24 12:23 part-r-00000-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 534 Nov 24 12:23 part-r-00001-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 285 Nov 24 12:23 part-r-00002-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root 523 Nov 24 12:23 part-r-00003-36364710-b925-4a3a-bd11-b295b6bd7c2e.gz.parquet
-rw-r--r-- 1 root root   0 Nov 24 12:23 _SUCCESS

12.10 partitionBy(*cols)

按文件系統上的給定列對輸出進行分區。
如果指定,則輸出將在文件系統上進行布局,類似於Hive的分區方案。
參數:● cols – 列的名稱.

>>> df.write.partitionBy('year', 'month').parquet(os.path.join(tempfile.mkdtemp(), 'data'))

12.11 save(path=None, format=None, mode=None, partitionBy=None, **options)

將DataFrame的內容保存到數據源。
數據源由format和一組options指定。 如果未指定format,則將使用由spark.sql.sources.default配置的缺省數據源。
參數:● path – Hadoop支持的文件系統中的路徑。
           ● format – 用於保存的格式。
           ● mode – 指定數據已經存在時保存操作的行為。
      append: 將此DataFrame的內容附加到現有數據。
      overwrite: 覆蓋現有數據。
      ignore: 如果數據已經存在,靜默地忽略這個操作。
      error (默認): 如果數據已經存在,則拋出異常。
            ● partitionBy – 分區列的名稱。
            ● options – all other string options

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.write.mode('append').save("file:///data/dfsave")

12.12 saveAsTable(name, format=None, mode=None, partitionBy=None, **options)

將DataFrame的內容保存為指定的表格。
在表已經存在的情況下,這個函數的行為依賴於由mode函數指定的保存模式(默認為拋出異常)。 當模式為覆蓋時,[[DataFrame]]的模式不需要與現有表的模式相同。

append: 將此DataFrame的內容附加到現有數據。
overwrite: 覆蓋現有數據。
error: 如果數據已經存在,則拋出異常。
ignore: 如果數據已經存在,靜默地忽略這個操作。

參數:● name – 表名
           ● format – 用於保存的格式
           ● mode – 追加,覆蓋,錯誤,忽略之一(默認:錯誤)
           ● partitionBy – 分區列的名稱
           ● options – 所有其他字符串選項

12.13 text(path)

將DataFrame的內容保存在指定路徑的文本文件中。
DataFrame必須只有一個字符串類型的列。每行成為輸出文件中的新行。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM