class pyspark.sql.DataFrame(jdf, sql_ctx)
一個以列名為分組的分布式數據集合
一個DataFrame 相當於一個 與spark sql相關的table,可以使用SQLContext中的各種函數創建。
people = sqlContext.read.parquet("...")
Once created, it can be manipulated using the various domain-specific-language (DSL) functions defined in: DataFrame, Column。
To select a column from the data frame, use the apply method:
ageCol = people.age
一個更具體的例子
# To create DataFrame using SQLContext people = sqlContext.read.parquet("...") department = sqlContext.read.parquet("...") people.filter(people.age > 30).join(department, people.deptId == department.id).groupBy(department.name, "gender").agg({"salary": "avg", "age": "max"})
New in version 1.3.
agg(*exprs)
總計on the entire DataFrame without groups (df.groupBy.agg()的簡寫).
>>> df.agg({"age": "max"}).collect() [Row(max(age)=5)] >>> from pyspark.sql import functions as F >>> df.agg(F.min(df.age)).collect() [Row(min(age)=2)]
New in version 1.3.
alias(alias)
根據alias別名的設定返回一個新的DataFrame
>>> from pyspark.sql.functions import * >>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2") >>> joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') >>> joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect() [Row(name=u'Bob', name=u'Bob', age=5), Row(name=u'Alice', name=u'Alice', age=2)]
New in version 1.3.
approxQuantile(col, probabilities, relativeError)
計算一個用數表示的列的DataFrame近似的分位點.
這個算法的結果有以下確定性的范圍:如果DataFrame有N個元素,如果我們請求分位點的概率為p,錯誤率為err,算法將從DataFrame返回一個樣本x,x的精確rank值接近於(p * N)。更准確的說,
floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
這種方法實現Greenwald-Khanna算法的一個變體(增加速度優化),這個算法第一次是由 Greenwald and Khanna提出的在[[http://dx.doi.org/10.1145/375663.375670 Space-efficient Online Computation of Quantile Summaries]]
Parameters: col - 用數表示的列的name
probabilities-a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
relativeError - The relative target precision to achieve (>= 0).If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.
Returns: the approximate quantiles at the given probabilities
New in version 2.0.
cache()
根據默認的存儲級別持久化(MEMORY_ONLY).
New in version 1.3.
coalesce(numPartitions)
返回一個恰好有numPartitions分區的新DataFrame
Similar to coalesce defined on an RDD,這個操作在一個窄依賴中進行,例如。如果從1000個分區到100個分區,不會出現shuffle,instead each of the 100 new partitions will claim 10 of the current partitions.
>>> df.coalesce(1).rdd.getNumPartitions()
1
New in version 1.4.
collect()
以list形式返回所有記錄,元素是每一行;
>>> df.collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
New in version 1.3.
columns
以list形式返回所有的列的name
>>> df.columns ['age', 'name']
New in version 1.3.
corr(col1, col2, method=None)
計算一個DataFrame中兩列的相關性作為一個double值 ,目前只支持皮爾遜相關系數。DataFrame.corr() 和 DataFrameStatFunctions.corr()是彼此的別名。
Parameters: col1 - The name of the first column
col2 - The name of the second column
method - 相關的方法,目前只支持“pearson”
New in version 1.4.
count()
返回dataframe的行數
>>> df.count()
2
New in version 1.3.
cov(col1, col2)
計算給定列的協方差,有他們的names指定,作為一個double值。DataFrame.cov() 和 DataFrameStatFunctions.cov()是彼此的別名
Parameters: col1 - The name of the first column
col2- The name of the second column
New in version 1.4.
createOrReplaceTempView(name)
根據dataframe創建或者替代一個臨時視圖
這個視圖的生命周期是由創建這個dataframe的SparkSession決定的
>>> df.createOrReplaceTempView("people") >>> df2 = df.filter(df.age > 3) >>> df2.createOrReplaceTempView("people") >>> df3 = spark.sql("select * from people") >>> sorted(df3.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropTempView("people")
New in version 2.0.
createTempView(name)
根據dataframe創建一個臨時視圖
這個視圖的生命周期是由創建這個dataframe的SparkSession決定的。如果這個視圖已經存在於catalog將拋出TempTableAlreadyExistsException異常。
>>> df.createTempView("people") >>> df2 = spark.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True >>> df.createTempView("people") Traceback (most recent call last): ... AnalysisException: u"Temporary table 'people' already exists;" >>> spark.catalog.dropTempView("people")
New in version 2.0
crosstab(col1, col2)
由給定的列計算一個雙向的頻率表.也被稱為一個列聯表。
每一列的不同值的數量應該小於1e4. 最多1e6 非零對頻率將被返回.
The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2.
The name of the first column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.
DataFrame.crosstab() 和 DataFrameStatFunctions.crosstab()是彼此的別名
Parameters: col1 - The name of the first column. Distinct items will make the first item of each row.
col2 - The name of the second column. Distinct items will make the column names of the DataFrame.
New in version 1.4.
cube(*cols)
使用指定的columns創建一個多維立方體為當前DataFrame,這樣我們可以在其上運行聚合
>>> df.cube("name", df.age).count().orderBy("name", "age").show() +-----+----+-----+ | name| age|count| +-----+----+-----+ | null|null| 2| | null| 2| 1| | null| 5| 1| |Alice|null| 1| |Alice| 2| 1| | Bob|null| 1| | Bob| 5| 1| +-----+----+-----+
New in version 1.4.
describe(*cols)
計算,統計數值型的列
包括 計數count, 平均值mean, 標准差stddev, 最小值min, 最大值max。如果cols給定,那么這個函數計算統計所有數值型的列。
Note This function is meant for exploratory data analysis, as we make no guarantee about the backward compatibility of the schema of the resulting DataFrame.
>>> df.describe().show() +-------+------------------+ |summary| age| +-------+------------------+ | count| 2| | mean| 3.5| | stddev|2.1213203435596424| | min| 2| | max| 5| +-------+------------------+ >>> df.describe(['age', 'name']).show() +-------+------------------+-----+ |summary| age| name| +-------+------------------+-----+ | count| 2| 2| | mean| 3.5| null| | stddev|2.1213203435596424| null| | min| 2|Alice| | max| 5| Bob| +-------+------------------+-----+
New in version 1.3.1.
distinct()
返回一個包含不同行的新的DataFrame,也就是對DataFrame中的行進行去重
>>> df.distinct().count()
2
New in version 1.3.
drop(col)
返回一個刪除了指定列的新的DataFrame
Parameters: col - a string name of the column to drop, or a Column to drop.
>>> df.drop('age').collect() [Row(name=u'Alice'), Row(name=u'Bob')] >>> df.drop(df.age).collect() [Row(name=u'Alice'), Row(name=u'Bob')] >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect() [Row(age=5, height=85, name=u'Bob')] >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect() [Row(age=5, name=u'Bob', height=85)]
New in version 1.4.
dropDuplicates(subset=None)
返回一個新的刪除重復行的DataFrame,選擇性地只考慮某些列
drop_duplicates()是dropDuplicates()的別名
>>> from pyspark.sql import Row >>> df = sc.parallelize([ \ ... Row(name='Alice', age=5, height=80), \ ... Row(name='Alice', age=5, height=80), \ ... Row(name='Alice', age=10, height=80)]).toDF() >>> df.dropDuplicates().show() +---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| | 10| 80|Alice| +---+------+-----+ >>> df.dropDuplicates(['name', 'height']).show() +---+------+-----+ |age|height| name| +---+------+-----+ | 5| 80|Alice| +---+------+-----+
New in version 1.4.
drop_duplicates(subset=None)
drop_duplicates()是dropDuplicates()的別名
New in version 1.4.
dropna(how='any', thresh=None, subset=None)
返回一個新的省略具有零值的行的DataFrame。DataFrame.dropna() 和 DataFrameNaFunctions.drop()是彼此的別名
Parameters: how – ‘any’ or ‘all’. 如果‘any’,如果這一行含有任何空值刪除這一行。如果是‘all’,只有這一行所有的值是空值才刪除這一行。
thresh – int, 如果沒有指定默認為None,當這一行少於thresh非空值時刪除這一行。這個參數重寫how參數。
subset – optional list of column names to consider.
>>> df4.na.drop().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
+---+------+-----+
New in version 1.3.1.
dtypes
以一個list形式返回所有列名和數據類型
>>> df.dtypes [('age', 'int'), ('name', 'string')]
New in version 1.3.
explain(extended=False)
在控制台打印(logical and physical)計划以便於調試用途
Parameters: extended-boolean。默認False。如果是False,僅打印physical計划
>>> df.explain() == Physical Plan == Scan ExistingRDD[age#0,name#1]
>>> df.explain(True) == Parsed Logical Plan == ... == Analyzed Logical Plan == ... == Optimized Logical Plan == ... == Physical Plan == ...
New in version 1.3.
fillna(value, subset=None)
替換空值,na.fill()的別名。DataFrame.fillna() 和 DataFrameNaFunctions.fill()是彼此的別名
Parameters: value – int, long, float, string, or dict. value值替換空值。如果value是一個dict,subset將被忽略,value必須從列名映射(string)替換值。這個替換值必須是an int, long, float, or string。
subset – optional list of column names to consider。列在subset中指定,沒有匹配的數據類型將被忽略。例如,如果value是string,subset包含一個非字符串的列,這個非字符串的列被忽略。
>>> df4.na.fill(50).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
| 10| 80|Alice|
| 5| 50| Bob|
| 50| 50| Tom|
| 50| 50| null|
+---+------+-----+
>>> df4.na.fill({'age': 50, 'name': 'unknown'}).show() +---+------+-------+ |age|height| name| +---+------+-------+ | 10| 80| Alice| | 5| null| Bob| | 50| null| Tom| | 50| null|unknown| +---+------+-------+
New in version 1.3.1.
filter(condition)
根據給定的condition過濾rows
where() 是 filter()的別名
Parameters: condition –a Column of types.BooleanType or a string of SQL expression.
>>> df.filter(df.age > 3).collect() [Row(age=5, name=u'Bob')] >>> df.where(df.age == 2).collect() [Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect() [Row(age=5, name=u'Bob')] >>> df.where("age = 2").collect() [Row(age=2, name=u'Alice')]
New in version 1.3.
first()
Returns the first row as a Row.
>>> df.first() Row(age=2, name=u'Alice')
New in version 1.3.
foreach(f)
f 函數適用於DataFrame的所有行
這是df.rdd.foreach()的縮寫
>>> def f(person): ... print(person.name) >>> df.foreach(f)
New in version 1.3.
foreachPartition(f)
f 函數適用於DataFrame的每個分區
是df.rdd.foreachPartition()的縮寫
>>> def f(people): ... for person in people: ... print(person.name) >>> df.foreachPartition(f)
New in version 1.3.
freqItems(cols, support=None)
Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in ※http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou§. DataFrame.freqItems() and DataFrameStatFunctions.freqItems() are aliases.
Note This function is meant for exploratory data analysis, as we make no guarantee about the backward compatibility of the schema of the resulting DataFrame.
Parameters: cols – Names of the columns to calculate frequent items for as a list or tuple of strings.
support – The frequency with which to consider an item ‘frequent’. Default is 1%. The support must be greater than 1e-4.
New in version 1.4.
groupBy(*cols)
根據指定的columns Groups the DataFrame,這樣可以在DataFrame上進行聚合。從所有可用的聚合函數中查看GroupedData
groupby()是groupBy()的一個別名。
Parameters: cols –list of columns to group by.每個元素應該是一個column name (string)或者一個expression (Column)。
>>> df.groupBy().avg().collect() [Row(avg(age)=3.5)] >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect()) [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] >>> sorted(df.groupBy(df.name).avg().collect()) [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)] >>> sorted(df.groupBy(['name', df.age]).count().collect()) [Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)]
New in version 1.3.
groupby(*cols)
groupby()是groupBy()的一個別名。
New in version 1.4.
head(n=None)
返回前n行
注意,這個方法應該只被用於期望resulting array變小,所有的data加載進driver的內存。
Parameters: n – int,默認為 1.要返回多少行
Returns: 如果n大於1,返回a list of Row。如果n是1,返回單獨行
>>> df.head() Row(age=2, name=u'Alice') >>> df.head(1) [Row(age=2, name=u'Alice')]
New in version 1.3.
intersect(other)
返回一個新的DataFrame,新的DataFrame中的行是這個DataFrame與另一個DataFrame共有的行。這個函數也就是求交集
相當於SQL中的INTERSECT
New in version 1.3.
isLocal()
如果collect()和take()能在本地運行返回True (without any Spark executors)
New in version 1.3.
isStreaming
當這個Dataset包含一個或多個當數據到達時連續不斷的返回數據的數據源,此方法返回True。一個Dataset讀取流數據必須在DataStreamWriter中使用start()方法執行一個StreamingQuery。
Methods that return a single answer,(e.g., count() or collect()) will throw an AnalysisException when there is a streaming source present.
Note 實驗的
New in version 2.0.
join(other, on=None, how=None)
根據給定的join表達式與別的DataFrame join
Parameters: other - Right side of the join
on - a string for the join column name, a list of column names, a join expression (Column), or a list of Columns.
If on is a string or a list of strings indicating the name of the join column(s), the column(s) must exist on both sides, and this performs an equi-join.
how - str, default ‘inner’. One of inner, outer, left_outer, right_outer, leftsemi.
以下執行一個full outer join 在df1與df2之間
>>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect() [Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)] >>> df.join(df2, 'name', 'outer').select('name', 'height').collect() [Row(name=u'Tom', height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)] >>> cond = [df.name == df3.name, df.age == df3.age] >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df.join(df2, 'name').select(df.name, df2.height).collect() [Row(name=u'Bob', height=85)] >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect() [Row(name=u'Bob', age=5)]
New in version 1.3.
limit(num)
根據指定的num限制結果數量
>>> df.limit(1).collect() [Row(age=2, name=u'Alice')] >>> df.limit(0).collect() []
New in version 1.3.
na
Returns a DataFrameNaFunctions for handling missing values.
New in version 1.3.1.
orderBy(*cols, **kwargs)
按指定的列排序返回一個新的DataFrame
Parameters: cols - list of Column or column names to sort by
ascending – boolean or list of boolean (default True).Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.
>>> df.sort(df.age.desc()).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.sort("age", ascending=False).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.orderBy(df.age.desc()).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> from pyspark.sql.functions import * >>> df.sort(asc("age")).collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] >>> df.orderBy(desc("age"), "name").collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
New in version 1.3.
persist(storageLevel=StorageLevel(False, True, False, False, 1))
Sets the storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (MEMORY_ONLY).
New in version 1.3.
printSchema()
以數的形式打印schema
>>> df.printSchema() root |-- age: integer (nullable = true) |-- name: string (nullable = true)
New in version 1.3.
randomSplit(weights, seed=None)
根據提供的權重隨機拆分DataFrame
Parameters: weights – doubles類型的list去拆分DataFrame。Weights will be normalized if they don’t sum up to 1.0
seed – 抽樣的seed
>>> splits = df4.randomSplit([1.0, 2.0], 24) >>> splits[0].count() 1
>>> splits[1].count()
3
New in version 1.4.
rdd
以一個pyspark.RDD形式返回Row的內容
New in version 1.3.
registerTempTable(name)
根據給定的name注冊RDD為一個臨時的table
這個臨時table的生命周期系於創建這個DataFrame的SQLContext
>>> df.registerTempTable("people") >>> df2 = spark.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropTempView("people")
Note Deprecated in 2.0, use createOrReplaceTempView instead.
New in version 1.3.
repartition(numPartitions, *cols)
返回一個新的DataFrame,這個DataFrame以給定的分區表達式分區。這個結果DataFrame是被散列分區的
numPartitions可以是一個int指定目標分區的數量或者是一個Column。如果是一個Column,it will be used as the first partitioning column。如果沒有指定,將使用默認的分區數
Changed in version 1.6: Added optional arguments to specify the partitioning columns. Also made numPartitions optional if partitioning columns are specified.
>>> df.repartition(10).rdd.getNumPartitions() 10 >>> data = df.union(df).repartition("age") >>> data.show() +---+-----+ |age| name| +---+-----+ | 5| Bob| | 5| Bob| | 2|Alice| | 2|Alice| +---+-----+ >>> data = data.repartition(7, "age") >>> data.show() +---+-----+ |age| name| +---+-----+ | 5| Bob| | 5| Bob| | 2|Alice| | 2|Alice| +---+-----+ >>> data.rdd.getNumPartitions() 7 >>> data = data.repartition("name", "age") >>> data.show() +---+-----+ |age| name| +---+-----+ | 5| Bob| | 5| Bob| | 2|Alice| | 2|Alice| +---+-----+
New in version 1.3.
replace(to_replace, value, subset=None)
以value替換to_replace返回一個新的DataFrame。DataFrame.replace()和DataFrameNaFunctions.replace()是彼此的別名
Parameters: to_replace – int, long, float, string, or list。 將被替代的值。如果該值是一個dict,那么value將被忽略,to_replace必須是column name (string)的映射來替換value。將被替代的值必須是an int, long, float, or string.
value – int, long, float, string, or list。用於替代別的值的值。這個替代的值必須是an int, long, float, or string。如果這個value是a list or tuple,value應該與to_replace有相同的長度。
subset – optional list of column names to consider。subset指定的Columns如果沒有匹配的數據類型將被忽略。例如,如果value是string,subset包含一個 non-string column,這個 non-string column將被忽略。
>>> df4.na.replace(10, 20).show()
+----+------+-----+
| age|height| name|
+----+------+-----+
| 20| 80|Alice|
| 5| null| Bob|
|null| null| Tom|
|null| null| null|
+----+------+-----+
>>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show() +----+------+----+ | age|height|name| +----+------+----+ | 10| 80| A| | 5| null| B| |null| null| Tom| |null| null|null| +----+------+----+
New in version 1.4.
rollup(*cols)
使用當前DataFrame給定的cols創建一個多維的rollup,這樣我們可以在其上進行聚合
>>> df.rollup("name", df.age).count().orderBy("name", "age").show() +-----+----+-----+ | name| age|count| +-----+----+-----+ | null|null| 2| |Alice|null| 1| |Alice| 2| 1| | Bob|null| 1| | Bob| 5| 1| +-----+----+-----+
New in version 1.4.
sample(withReplacement, fraction, seed=None)
返回這個DataFrame抽樣后的子集
>>> df.sample(False, 0.5, 42).count()
2
New in version 1.3.
sampleBy(col, fractions, seed=None)
Returns a stratified sample without replacement based on the fraction given on each stratum.
Parameters: col – column that defines strata
fractions – sampling fraction for each stratum. If a stratum is not specified, we treat its fraction as zero.
seed – random seed
Returns: a new DataFrame that represents the stratified sample
>>> from pyspark.sql.functions import col >>> dataset = sqlContext.range(0, 100).select((col("id") % 3).alias("key")) >>> sampled = dataset.sampleBy("key", fractions={0: 0.1, 1: 0.2}, seed=0) >>> sampled.groupBy("key").count().orderBy("key").show() +---+-----+ |key|count| +---+-----+ | 0| 5| | 1| 9| +---+-----+
New in version 1.5.
schema
以一個types.StructType類型返回DataFrame的schema
>>> df.schema
StructType(List(StructField(age,IntegerType,true),StructField(name,StringType,true)))
New in version 1.3.
select(*cols)
Projects a set of expressions and returns a new DataFrame.
Parameters: cols-list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataFrame
>>> df.select('*').collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] >>> df.select('name', 'age').collect() [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] >>> df.select(df.name, (df.age + 10).alias('age')).collect() [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
New in version 1.3.
selectExpr(*expr)
Projects a set of SQL expressions and returns a new DataFrame.
這是select() 的變體,接受SQL表達式
>>> df.selectExpr("age * 2", "abs(age)").collect() [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]
New in version 1.3.
show(n=20, truncate=True)
在控制台打印前n行
Parameters: n - 打印前n行.
truncate - 是否截斷長字符串並使網格右側排列
>>> df DataFrame[age: int, name: string] >>> df.show() +---+-----+ |age| name| +---+-----+ | 2|Alice| | 5| Bob| +---+-----+
New in version 1.3.
sort(*cols, **kwargs)
根據指定cols,返回一個新的DataFrame
Parameters: cols - 根據Column list或column name 排序
ascending - boolean or list of boolean (default True).升序和降序排序。指定多個排序訂單列表。如果指定一個列表,list的長度必須等於cols的長度。
>>> df.sort(df.age.desc()).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.sort("age", ascending=False).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.orderBy(df.age.desc()).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> from pyspark.sql.functions import * >>> df.sort(asc("age")).collect() [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')] >>> df.orderBy(desc("age"), "name").collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')] >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect() [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
New in version 1.3.
sortWithinPartitions(*cols, **kwargs)
根據指定的cols排序,返回一個新的DataFrame在每個分區
Parameters: cols - 根據Column list或column name 排序.
ascending - boolean or list of boolean (default True).升序和降序排序。指定多個排序訂單列表。如果指定一個列表,list的長度必須等於cols的長度。
>>> df.sortWithinPartitions("age", ascending=False).show() +---+-----+ |age| name| +---+-----+ | 2|Alice| | 5| Bob| +---+-----+
New in version 1.6.
stat
由統計函數返回一個DataFrameStatFunctions
New in version 1.4.
subtract(other)
Return a new DataFrame containing rows in this frame but not in another frame.
相當於SQL中的EXCEPT
New in version 1.3.
take(num)
Returns the first num rows as a list of Row.
>>> df.take(2) [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
New in version 1.3.
toDF(*cols)
根據指定的cols返回一個新的class:DataFrame
Parameters: cols- list of new column names (string)
>>> df.toDF('f1', 'f2').collect() [Row(f1=2, f2=u'Alice'), Row(f1=5, f2=u'Bob')]
toJSON(use_unicode=True)
將一個DataFrame轉換成一個string的RDD
每一行轉變成了一個JSON中一個元素在返回的RDD
>>> df.toJSON().first() u'{"age":2,"name":"Alice"}'
New in version 1.3.
toLocalIterator()
返回一個包含DataFrame所有行的迭代器,迭代器將使用盡可能多的內存在這個DataFrame最大的分區
>>> list(df.toLocalIterator()) [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
New in version 2.0.
toPandas()
以Pandas pandas.DataFrame形式返回這個DataFrame的內容
Note that this method should only be used if the resulting Pandas’s DataFrame is expected to be small, as all the data is loaded into the driver’s memory
只有Pandas安裝和可獲得時可用
>>> df.toPandas() age name 0 2 Alice 1 5 Bob
New in version 1.3.
union(other)
返回一個新的DataFrame,包含本frame與other frame行的並集
相當於SQL中的 UNION ALL。To do a SQL-style set union (that does deduplication of elements), use this function followed by a distinct.
New in version 2.0.
unionAll(other)
返回一個新的DataFrame,包含本frame與other frame行的並集
Note Deprecated in 2.0, use union instead.
New in version 1.3.
unpersist(blocking=False)
標志這個DataFrame為非持久性,並且從內存和磁盤中刪除所有blocks
Note blocking default has changed to False to match Scala in 2.0.
New in version 1.3.
where(condition)
where() 是filter()的一個別名.
New in version 1.3.
withColumn(colName, col)
通過添加或替換與現有列有相同的名字的列,返回一個新的DataFrame
Parameters: colName - string,新colmun的name.
col - 新列的一個colmun表達式
>>> df.withColumn('age2', df.age + 2).collect() [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
New in version 1.3.
withColumnRenamed(existing, new)
通過重命名一個已存在的colmun返回一個新的DataFrame
Parameters: existing -string,被重命名的已存在的colmun.
col - string,此column的新name.
>>> df.withColumnRenamed('age', 'age2').collect() [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
New in version 1.3.
write
Interface for saving the content of the non-streaming DataFrame out into external storage.
保存非流式的DataFrame的內容到外部存儲器的接口
Returns: DataFrameWriter
New in version 1.4.
writeStream
Interface for saving the content of the streaming DataFrame out into external storage.
保存流式的DataFrame的內容到外部存儲器的接口
Note Experimental.
Returns: DataStreamWriter
New in version 2.0.
class pyspark.sql.GroupedData(jgd, sql_ctx)
一套DataFrame聚合方法,由DataFrame.groupBy()創建
Note Experimental
New in version 1.3.
agg(*exprs)
計算聚集並返回一個DataFrame結果
可用的聚合函數有avg, max, min, sum, count.
如果exprs是一個dict從string映射到string,然后key是列,在其上執行聚合,value是聚合函數
Alternatively, exprs can also be a list of aggregate Column expressions.
Parameters: exprs - a dict mapping from column name (string) to aggregate functions (string), or a list of Column.
>>> gdf = df.groupBy(df.name) >>> sorted(gdf.agg({"*": "count"}).collect()) [Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)] >>> from pyspark.sql import functions as F >>> sorted(gdf.agg(F.min(df.age)).collect()) [Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]
New in version 1.3.
avg(*cols)
為每組每個數值型的列計算平均值
mean() 是avg()的別名
Parameters: cols - list of column names (string). Non-numeric columns are ignored
>>> df.groupBy().avg('age').collect() [Row(avg(age)=3.5)] >>> df3.groupBy().avg('age', 'height').collect() [Row(avg(age)=3.5, avg(height)=82.5)]
New in version 1.3.
count()
計算每組記錄的數量
>>> sorted(df.groupBy(df.age).count().collect())
[Row(age=2, count=1), Row(age=5, count=1)]
New in version 1.3.
max(*cols)
計算每組每個數值列的最大值
>>> df.groupBy().max('age').collect() [Row(max(age)=5)] >>> df3.groupBy().max('age', 'height').collect() [Row(max(age)=5, max(height)=85)]
New in version 1.3.
mean(*cols)
計算每組每個數值型的列的平均值
mean() 是 avg() 的別名
Parameters: cols - list of column names (string). Non-numeric columns are ignored
>>> df.groupBy().mean('age').collect() [Row(avg(age)=3.5)] >>> df3.groupBy().mean('age', 'height').collect() [Row(avg(age)=3.5, avg(height)=82.5)]
New in version 1.3.
min(*cols)
計算每組每個數值型列的最小值
Parameters: cols - list of column names (string). Non-numeric columns are ignored
>>> df.groupBy().min('age').collect() [Row(min(age)=2)] >>> df3.groupBy().min('age', 'height').collect() [Row(min(age)=2, min(height)=80)]
New in version 1.3.
pivot(pivot_col, values=None)
Pivots a column of the current [[DataFrame]] and perform the specified aggregation.
There are two versions of pivot function: one that requires the caller to specify the list of distinct values to pivot on, and one that does not.
The latter is more concise but less efficient, because Spark needs to first compute the list of distinct values internally.
Parameters: pivot_col - Name of the column to pivot.
values - List of values that will be translated to columns in the output DataFrame.
# Compute the sum of earnings for each year by course with each course as a separate column
>>> df4.groupBy("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect() [Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]
# Or without specifying column values (less efficient)
>>> df4.groupBy("year").pivot("course").sum("earnings").collect() [Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
New in version 1.6.
sum(*cols)
計算每組每個數值型列的總和
Parameters: cols - list of column names (string). Non-numeric columns are ignored.
>>> df.groupBy().sum('age').collect() [Row(sum(age)=7)] >>> df3.groupBy().sum('age', 'height').collect() [Row(sum(age)=7, sum(height)=165)]
New in version 1.3.