《Spark Python API 官方文檔中文版》 之 pyspark.sql (四)


摘要:在Spark開發中,由於需要用Python實現,發現API與Scala的略有不同,而Python API的中文資料相對很少。每次去查英文版API的說明相對比較慢,還是中文版比較容易get到所需,所以利用閑暇之余將官方文檔翻譯為中文版,並親測Demo的代碼。在此記錄一下,希望對那些對Spark感興趣和從事大數據開發的人員提供有價值的中文資料,對PySpark開發人員的工作和學習有所幫助。

官網地址:http://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html           

pyspark.sql module

pyspark.sql.types module

Spark SQL和DataFrames重要的類有:
pyspark.sql.SQLContext DataFrame和SQL方法的主入口
pyspark.sql.DataFrame 將分布式數據集分組到指定列名的數據框中
pyspark.sql.Column DataFrame中的列
pyspark.sql.Row DataFrame數據的行
pyspark.sql.HiveContext 訪問Hive數據的主入口
pyspark.sql.GroupedData 由DataFrame.groupBy()創建的聚合方法集
pyspark.sql.DataFrameNaFunctions 處理丟失數據(空數據)的方法
pyspark.sql.DataFrameStatFunctions 統計功能的方法
pyspark.sql.functions DataFrame可用的內置函數
pyspark.sql.types 可用的數據類型列表
pyspark.sql.Window 用於處理窗口函數

1.class pyspark.sql.types.DataType

數據類型的基類。

1.1 fromInternal(obj)

將內部SQL對象轉換為本機Python對象。

1.2 json()

1.3 jsonValue()

1.4 needConversion()

這種類型是否需要在Python對象和內部SQL對象之間進行轉換?
這用於避免ArrayType / MapType / StructType的不必要的轉換。

1.5 simpleString()

1.6 toInternal(obj)

將Python對象轉換為內部SQL對象。

2.class pyspark.sql.types.NullType

Null類型
表示無的數據類型,用於無法推斷的類型。

3.class pyspark.sql.types.StringType

String 數據類型。

4.class pyspark.sql.types.BinaryType

二進制(字節數組)數據類型。

5.class pyspark.sql.types.BooleanType

Boolean 數據類型。

6.class pyspark.sql.types.DateType

Date (datetime.date) 數據類型。

7.class pyspark.sql.types.TimestampType

Timestamp (datetime.datetime) 數據類型。

8.class pyspark.sql.types.DecimalType(precision=10, scale=0)

Decimal (decimal.Decimal) 數據類型。
DecimalType必須具有固定的精度(最大總位數)和比例(點右邊的位數)。 例如,(5,2)可以支持[-999.99至999.99]之間的值。
precision可以達到38,scale要小於或等於precision。
創建DecimalType時,默認的precision和scale是(10,0)。 當從十進制對象中推斷模式時,它將是DecimalType(38,18)。
參數:●  precision – 最大的總位數 (默認: 10)
        ●  scale – 點右側的位數 (默認: 0)

9.class pyspark.sql.types.DoubleType

Double 數據類型,表示雙精度浮點數。

10.class pyspark.sql.types.FloatType

Float數據類型,表示單精度浮點數。

11.class pyspark.sql.types.ByteType

Byte 數據類型,即單個字節中的有符號整數。

12.class pyspark.sql.types.IntegerType

Int數據類型,即有符號的32位整數。

13.class pyspark.sql.types.LongType

Long數據類型,即有符號的64位整數。
如果數值超出[-9223372036854775808,9223372036854775807]的范圍,請使用DecimalType。

14.class pyspark.sql.types.ShortType

Short數據類型,即有符號的16位整數。

15.class pyspark.sql.types.ArrayType(elementType, containsNull=True)

數組數據類型。
參數:●  elementType – 數組中每個元素的DataType。
         ●  containsNull – 布爾值,數組是否可以包含null(None)值。

16.class pyspark.sql.types.MapType(keyType, valueType, valueContainsNull=True)

Map數據類型。
參數:●  keyType – map中key的數據類型。
        ●  valueType – map中value的數據類型。
        ●  valueContainsNull – 指示values是否可以包含null(無)值。
map數據類型中的鍵不允許為null(無)。

17.class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)

StructType中的一個字段。
參數:●  name – 字符串,字段的名稱。
        ●  dataType – 字段的數據類型。
        ●  nullable – boolean,該字段是否可以為null(None)。
        ●  metadata – 從字符串到簡單類型的字典,可以自動內部轉到JSON

18.class pyspark.sql.types.StructType(fields=None)

結構類型,由StructField的列表組成。
這是表示一個行的數據類型。

18.1 add(field, data_type=None, nullable=True, metadata=None)

通過添加新元素來構造一個StructType來定義schema。 該方法接受:
a:一個參數是一個StructField對象。
b:介於2到4之間的參數(name,data_type,nullable(可選),metadata(可選))。data_type參數可以是String或DataType對象。

>>> from pyspark.sql.types import *
>>> struct1 = StructType().add("f1", StringType(), True).add("f2", StringType(), True, None)
>>> struct2 = StructType([StructField("f1", StringType(), True),         StructField("f2", StringType(), True, None)])
>>> struct1 == struct2
True
>>> struct1 = StructType().add(StructField("f1", StringType(), True))
>>> struct2 = StructType([StructField("f1", StringType(), True)])
>>> struct1 == struct2
True
>>> struct1 = StructType().add("f1", "string", True)
>>> struct2 = StructType([StructField("f1", StringType(), True)])
>>> struct1 == struct2
True

參數:●  field – 字段的名稱或者StructField對象
        ●  data_type – 如果存在,則創建StructField的DataType
        ●  nullable – 要添加的字段是否可以是nullable (默認True)
        ●  metadata – 任何其他元數據(默認無)
返回:一個新的更新的StructType

pyspark.sql.functions module

內建函數的集合

1.pyspark.sql.functions.abs(col)

計算絕對值。

2.pyspark.sql.functions.acos(col)

計算給定值的余弦逆; 返回的角度在0到π的范圍內。

3.pyspark.sql.functions.add_months(start, months)

返回start后months個月的日期

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(add_months(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 5, 8))]

4.pyspark.sql.functions.approxCountDistinct(col, rsd=None)

返回col的近似不同計數的新列。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.agg(approxCountDistinct(df.age).alias('c')).collect()
[Row(c=2)]

5.pyspark.sql.functions.array(*cols)

創建一個新的數組列。
參數:●  cols – 列名(字符串)列表或具有相同數據類型的列表達式列表。

>>> df.select(array('age', 'age').alias("arr")).collect()
[Row(arr=[2, 2]), Row(arr=[5, 5])]
>>> df.select(array([df.age, df.age]).alias("arr")).collect()
[Row(arr=[2, 2]), Row(arr=[5, 5])]

6.pyspark.sql.functions.array_contains(col, value)

集合函數:如果數組包含給定值,則返回True。 收集元素和值必須是相同的類型。
參數:●  col – 數組中的列名稱

         ●  value – 數組中要檢查的值

>>> df = sqlContext.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>>> df.select(array_contains(df.data, "a")).collect()
[Row(array_contains(data,a)=True), Row(array_contains(data,a)=False)]

7.pyspark.sql.functions.asc(col)

基於給定列名稱的升序返回一個排序表達式。

8.pyspark.sql.functions.ascii(col)

計算字符串列的第一個字符的數值。

9.pyspark.sql.functions.asin(col)

計算給定值的正弦倒數; 返回的角度在負π/ 2到π/ 2的范圍內。

10.pyspark.sql.functions.atan(col)

計算給定值的正切倒數。

11.pyspark.sql.functions.atan2(col1, col2)

返回直角坐標(x,y)到極坐標(r,theta)轉換的角度theta。

12.pyspark.sql.functions.avg(col)

聚合函數:返回組中的值的平均值。

13.pyspark.sql.functions.base64(col)

計算二進制列的BASE64編碼,並將其作為字符串列返回。

14.pyspark.sql.functions.bin(col)

返回給定列的二進制值的字符串表示形式

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.select(bin(df.age).alias('c')).collect()
[Row(c=u'10'), Row(c=u'101')]

15.pyspark.sql.functions.bitwiseNOT(col)

不按位計算。

16.pyspark.sql.functions.broadcast(df)

將DataFrame標記為足夠小以用於廣播連接。

17.pyspark.sql.functions.cbrt(col)

計算給定值的立方根。

18.pyspark.sql.functions.ceil(col)

計算給定值的上限。

19.pyspark.sql.functions.coalesce(*cols)

返回不為空的第一列。

>>> cDf = sqlContext.createDataFrame([(None, None), (1, None), (None, 2)], ("a", "b"))
>>> cDf.show()
+----+----+
|   a|   b|
+----+----+
|null|null|
|   1|null|
|null|   2|
+----+----+
>>> cDf.select(coalesce(cDf["a"], cDf["b"])).show()
+-------------+
|coalesce(a,b)|
+-------------+
|         null|
|            1|
|            2|
+-------------+

注:使用coalesce需先引用,from pyspark.sql.functions import *

>>> cDf.select('*', coalesce(cDf["a"], lit(0.0))).show()
+----+----+---------------+
|   a|   b|coalesce(a,0.0)|
+----+----+---------------+
|null|null|            0.0|
|   1|null|            1.0|
|null|   2|            0.0|
+----+----+---------------+

20.pyspark.sql.functions.col(col)

根據給定的列名返回一個列。

21.pyspark.sql.functions.collect_list(col)

聚合函數:返回重復對象的列表。

22.pyspark.sql.functions.collect_set(col)

聚合函數:返回一組消除重復元素的對象。

23.pyspark.sql.functions.column(col)

根據給定的列名返回一個列。

24.pyspark.sql.functions.concat(*cols)[source]

將多個輸入字符串列連接成一個字符串列。

>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat(df.s, df.d).alias('s')).collect()
[Row(s=u'abcd123')]

25.pyspark.sql.functions.concat_ws(sep, *cols)[source]

使用給定的分隔符將多個輸入字符串列連接到一個字符串列中。

>>> df = sqlContext.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
[Row(s=u'abcd-123')]

26.pyspark.sql.functions.conv(col, fromBase, toBase)[source]

將字符串列中的數字從一個基數轉換為另一個基數。

>>> df = sqlContext.createDataFrame([("010101",)], ['n'])
>>> df.select(conv(df.n, 2, 16).alias('hex')).collect()
[Row(hex=u'15')]

27.pyspark.sql.functions.corr(col1, col2)

返回col1和col2的皮爾森相關系數的新列。

>>> from pyspark.sql.functions import *
>>> a = [x * x - 2 * x + 3.5 for x in range(20)]
>>> b = range(20)
>>> corrDf = sqlContext.createDataFrame(zip(a, b))
>>> corrDf = corrDf.agg(corr(corrDf._1, corrDf._2).alias('c'))
>>> corrDf.selectExpr('abs(c - 0.9572339139475857) < 1e-16 as t').collect()
[Row(t=True)]

28.pyspark.sql.functions.cos(col)

計算給定值的余弦。

29.pyspark.sql.functions.cosh(col)

計算給定值的雙曲余弦。

30.pyspark.sql.functions.count(col)

聚合函數:返回組中的項數量。

31.pyspark.sql.functions.countDistinct(col, *cols)

返回一列或多列的去重計數的新列。

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.agg(countDistinct(df.age, df.name).alias('c')).collect()
[Row(c=2)]
>>> df.agg(countDistinct("age", "name").alias('c')).collect()
[Row(c=2)]

32.pyspark.sql.functions.crc32(col)

計算二進制列的循環冗余校驗值(CRC32),並將該值作為bigint返回。

>>> from pyspark.sql.functions import *
>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(crc32('a').alias('crc32')).collect()
[Row(crc32=2743272264)]

33.pyspark.sql.functions.cumeDist()

窗口函數:.. note ::在1.6中不推薦使用,而是使用cume_dist。

34.pyspark.sql.functions.cume_dist()

窗口函數:返回窗口分區內值的累積分布,即在當前行下面的行的分數。

35.pyspark.sql.functions.current_date()

以日期列的形式返回當前日期。

36.pyspark.sql.functions.current_timestamp()

將當前時間戳作為時間戳列返回。

37.pyspark.sql.functions.date_add(start, days)

返回start后days天的日期

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_add(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 9))]

38.pyspark.sql.functions.date_format(date, format)

將日期/時間戳/字符串轉換為由第二個參數給定日期格式指定格式的字符串值。
一個模式可能是例如dd.MM.yyyy,可能會返回一個字符串,如“18 .03.1993”。 可以使用Java類java.text.SimpleDateFormat的所有模式字母。
注意:盡可能使用像年份這樣的專業功能。 這些受益於專門的實施。

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(date_format('a', 'MM/dd/yyy').alias('date')).collect()
[Row(date=u'04/08/2015')]

39.pyspark.sql.functions.date_sub(start, days)

返回start前days天的日期

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['d'])
>>> df.select(date_sub(df.d, 1).alias('d')).collect()
[Row(d=datetime.date(2015, 4, 7))]

40.pyspark.sql.functions.datediff(end, start)

返回從start到end的天數。

>>> df = sqlContext.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
>>> df.select(datediff(df.d2, df.d1).alias('diff')).collect()
[Row(diff=32)]

41.pyspark.sql.functions.dayofmonth(col)

將給定日期的月份的天解壓為整數。

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofmonth('a').alias('day')).collect()
[Row(day=8)]

42.pyspark.sql.functions.dayofyear(col)

將給定日期的年份中的某一天提取為整數。

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(dayofyear('a').alias('day')).collect()
[Row(day=98)]

43.pyspark.sql.functions.decode(col, charset)

使用提供的字符集(“US-ASCII”,“ISO-8859-1”,“UTF-8”,“UTF-16BE”,“UTF-16LE”,“UTF-16”之一)從二進制計算第一個參數到字符串中

44.pyspark.sql.functions.denseRank()

窗口函數:.. note ::在1.6中不推薦使用,而是使用dense_rank。

45.pyspark.sql.functions.dense_rank()

窗口函數:返回窗口分區內的行的等級,沒有任何間隙。
rank和denseRank的區別在於,當有關系時,denseRank在排序順序上沒有差距。 也就是說,如果你使用密集排名進行比賽,並且有三個人排在第二位,那么你會說所有三個排在第二位,下一個排在第三位。

46.pyspark.sql.functions.desc(col)

基於給定列名稱的降序返回一個排序表達式。

47.pyspark.sql.functions.encode(col, charset)

使用提供的字符集(‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’之一)將第一個參數從字符串計算為二進制

48.pyspark.sql.functions.exp(col)

計算給定值的指數。

49.pyspark.sql.functions.explode(col)

返回給定數組或映射中每個元素的新行。

>>> from pyspark.sql import Row
>>> eDF = sqlContext.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
>>> eDF.select(explode(eDF.intlist).alias("anInt")).collect()
[Row(anInt=1), Row(anInt=2), Row(anInt=3)]
>>> eDF.select(explode(eDF.mapfield).alias("key", "value")).show()
+---+-----+
|key|value|
+---+-----+
|  a|    b|
+---+-----+

50.pyspark.sql.functions.expm1(col)

計算給定值的指數減1。

51.pyspark.sql.functions.expr(str)

將表達式字符串分析到它表示的列中

>>> l=[('Alice',2),('Bob',5)]
>>> df = sqlContext.createDataFrame(l,['name','age'])
>>> df.select(expr("length(name)")).collect()
[Row(length(name)=5), Row(length(name)=3)]

52.pyspark.sql.functions.factorial(col)

計算給定值的階乘。

>>> df = sqlContext.createDataFrame([(5,)], ['n'])
>>> df.select(factorial(df.n).alias('f')).collect()
[Row(f=120)]

53.pyspark.sql.functions.first(col)

聚合函數:返回組中的第一個值。

54.pyspark.sql.functions.floor(col)

計算給定值的最小。

55.pyspark.sql.functions.format_number(col, d)

將數字X格式化為像'#, - #, - #.-'這樣的格式,四舍五入到小數點后的位置,並以字符串形式返回結果。
參數:●  col – 要格式化的數值的列名稱
         ●  d – N小數位

>>> from pyspark.sql.functions import *
>>> sqlContext.createDataFrame([(5,)], ['a']).select(format_number('a',4).alias('v')).collect()
[Row(v=u'5.0000')]

56.pyspark.sql.functions.format_string(format, *cols)

以printf樣式格式化參數,並將結果作為字符串列返回。
參數:●  format – 要格式化的格式
         ●  cols - 要格式化的列
p.s.這里官網可能有誤,參數與format_number一樣了。

>>> from pyspark.sql.functions import *
>>> df = sqlContext.createDataFrame([(5, "hello")], ['a', 'b'])
>>> df.select(format_string('%d %s', df.a, df.b).alias('v')).collect()
[Row(v=u'5 hello')]

57.pyspark.sql.functions.from_unixtime(timestamp, format='yyyy-MM-dd HH:mm:ss')

將來自unix時期(1970-01-01 00:00:00 UTC)的秒數轉換為以給定格式表示當前系統時區中該時刻的時間戳的字符串。

58.pyspark.sql.functions.from_utc_timestamp(timestamp, tz)

假設時間戳是UTC,並轉換為給定的時區

>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(from_utc_timestamp(df.t, "PST").alias('t')).collect()
[Row(t=datetime.datetime(1997, 2, 28, 2, 30))]

59.pyspark.sql.functions.get_json_object(col, path)

從基於指定的json路徑的json字符串中提取json對象,並返回提取的json對象的json字符串。 如果輸入的json字符串無效,它將返回null。
參數:●  col – json格式的字符串列
         ●  path – 提取json對象的路徑

>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
>>> df = sqlContext.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, get_json_object(df.jstring, '$.f1').alias("c0"),get_json_object(df.jstring, '$.f2').alias("c1") ).collect()
[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]

60.pyspark.sql.functions.greatest(*cols)

返回列名稱列表的最大值,跳過空值。 該功能至少需要2個參數。 如果所有參數都為空,它將返回null

>>> df = sqlContext.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
>>> df.select(greatest(df.a, df.b, df.c).alias("greatest")).collect()
[Row(greatest=4)]

61.pyspark.sql.functions.hex(col) 

計算給定列的十六進制值,可以是StringType,BinaryType,IntegerType或LongType

>>> sqlContext.createDataFrame([('ABC', 3)], ['a', 'b']).select(hex('a'), hex('b')).collect()
[Row(hex(a)=u'414243', hex(b)=u'3')]

62.pyspark.sql.functions.hour(col)

將給定日期的小時數提取為整數。

>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(hour('a').alias('hour')).collect()
[Row(hour=13)]

63.pyspark.sql.functions.hypot(col1, col2)

計算sqrt(a ^ 2 ^ + b ^ 2 ^),無中間上溢或下溢。

64.pyspark.sql.functions.initcap(col)

在句子中將每個單詞的第一個字母翻譯成大寫。

>>> sqlContext.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect()
[Row(v=u'Ab Cd')]

65.pyspark.sql.functions.input_file_name()

為當前Spark任務的文件名創建一個字符串列。

66.pyspark.sql.functions.instr(str, substr)

找到給定字符串中第一次出現substr列的位置。 如果其中任一參數為null,則返回null。
注:位置不是從零開始的,但是基於1的索引,如果在str中找不到substr,則返回0。

>>> df = sqlContext.createDataFrame([('abcd',)], ['s',])
>>> df.select(instr(df.s, 'b').alias('s')).collect()
[Row(s=2)]

67.pyspark.sql.functions.isnan(col)

如果列是NaN,則返回true的表達式。

>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
>>> df.select(isnan("a").alias("r1"), isnan(df.a).alias("r2")).collect()
[Row(r1=False, r2=False), Row(r1=True, r2=True)]

68.pyspark.sql.functions.isnull(col)

如果列為null,則返回true的表達式

>>> df = sqlContext.createDataFrame([(1, None), (None, 2)], ("a", "b"))
>>> df.select(isnull("a").alias("r1"), isnull(df.a).alias("r2")).collect()
[Row(r1=False, r2=False), Row(r1=True, r2=True)]

69.pyspark.sql.functions.json_tuple(col, *fields)

根據給定的字段名稱為json列創建一個新行。
參數:●  col – json格式的字符串列
        ●  fields – 要提取的字段列表

>>> data = [("1", '''{"f1": "value1", "f2": "value2"}'''), ("2", '''{"f1": "value12"}''')]
>>> df = sqlContext.createDataFrame(data, ("key", "jstring"))
>>> df.select(df.key, json_tuple(df.jstring, 'f1', 'f2')).collect()
[Row(key=u'1', c0=u'value1', c1=u'value2'), Row(key=u'2', c0=u'value12', c1=None)]

70.pyspark.sql.functions.kurtosis(col)

聚合函數:返回組中的值的峰度。

71.pyspark.sql.functions.lag(col, count=1, default=None)

窗口函數:返回當前行之前偏移行的值;如果當前行之前的行數小於偏移量,則返回defaultValue。 例如,一個偏移量將返回窗口分區中任何給定點的前一行。
這相當於SQL中的LAG函數。
參數:●  col – 列或表達式的名稱
         ●  count – 要延伸的行數
        ●  default – 默認值

72.pyspark.sql.functions.last(col)

聚合函數:返回組中的最后一個值。

73.pyspark.sql.functions.last_day(date)

返回給定日期所屬月份的最后一天。

>>> df = sqlContext.createDataFrame([('1997-02-10',)], ['d'])
>>> df.select(last_day(df.d).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]

74.pyspark.sql.functions.lead(col, count=1, default=None)

Window函數:返回當前行之后偏移行的值;如果當前行之后的行數小於偏移行,則返回defaultValue。 例如,一個偏移量將返回窗口分區中任意給定點的下一行。
這相當於SQL中的LEAD函數。
參數:●  col – 列或表達式的名稱
         ●  count – 要延伸的行數
         ●  default – 默認值

75.pyspark.sql.functions.least(*cols)

返回列名稱列表的最小值,跳過空值。 該功能至少需要2個參數。 如果所有參數都為空,它將返回null

>>> df = sqlContext.createDataFrame([(1, 4, 3)], ['a', 'b', 'c'])
>>> df.select(least(df.a, df.b, df.c).alias("least")).collect()
[Row(least=1)]

76.pyspark.sql.functions.length(col)

計算字符串或二進制表達式的長度

>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(length('a').alias('length')).collect()
[Row(length=3)]

 77.pyspark.sql.functions.levenshtein(left, right)

計算兩個給定字符串的Levenshtein距離。

>>> from pyspark.sql.functions import *
>>> df0 = sqlContext.createDataFrame([('kitten', 'sitting',)], ['l', 'r'])
>>> df0.select(levenshtein('l', 'r').alias('d')).collect()
[Row(d=3)]

78.pyspark.sql.functions.lit(col)

創建一個文字值的列

79.pyspark.sql.functions.locate(substr, str, pos=0)

找到第一個出現的位置在位置pos后面的字符串列中。
注:位置不是從零開始,而是從1開始。 如果在str中找不到substr,則返回0。
參數: substr – 一個字符串
         str – 一個StringType的列
         pos – 起始位置(基於零)

>>> df = sqlContext.createDataFrame([('abcd',)], ['s',])
>>> df.select(locate('b', df.s, 1).alias('s')).collect()
[Row(s=2)]

80.pyspark.sql.functions.log(arg1, arg2=None)

返回第二個參數的第一個基於參數的對數。
如果只有一個參數,那么這個參數就是自然對數。

>>> df.select(log(10.0, df.age).alias('ten')).map(lambda l: str(l.ten)[:7]).collect()
['0.30102', '0.69897']
>>> df.select(log(df.age).alias('e')).map(lambda l: str(l.e)[:7]).collect()
['0.69314', '1.60943']

81.pyspark.sql.functions.log10(col)

計算Base 10中給定值的對數。

82.pyspark.sql.functions.log1p(col)

計算給定值的自然對數加1。

83.pyspark.sql.functions.log2(col)

返回參數的基數為2的對數。

>>> sqlContext.createDataFrame([(4,)], ['a']).select(log2('a').alias('log2')).collect()
[Row(log2=2.0)]

84.pyspark.sql.functions.lower(col)

將字符串列轉換為小寫。

85.pyspark.sql.functions.lpad(col, len, pad)

用pad填充字符串列的寬度len

>>> df = sqlContext.createDataFrame([('abcd',)], ['s',])
>>> df.select(lpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'##abcd')]

86.pyspark.sql.functions.ltrim(col)

從左端修剪指定字符串值的空格。

87.pyspark.sql.functions.max(col)

聚合函數:返回組中表達式的最大值。

88.pyspark.sql.functions.md5(col)

計算MD5摘要並以32個字符的十六進制字符串的形式返回值。

>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(md5('a').alias('hash')).collect()
[Row(hash=u'902fbdd2b1df0c4f70b4a5d23525e932')]

89.pyspark.sql.functions.mean(col)

聚合函數:返回組中的值的平均值

90.pyspark.sql.functions.min(col)

聚合函數:返回組中表達式的最小值。

91.pyspark.sql.functions.minute(col)

提取給定日期的分鍾數為整數

>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(minute('a').alias('minute')).collect()
[Row(minute=8)]

92.pyspark.sql.functions.monotonicallyIncreasingId()

注意在1.6中不推薦使用monotonically_increasing_id

93.pyspark.sql.functions.monotonically_increasing_id()

生成單調遞增的64位整數的列。

生成的ID保證是單調遞增和唯一的,但不是連續的。 當前的實現將分區ID放在高31位,並將每個分區內的記錄號放在低33位。 假設
數據幀的分區少於10億個,每個分區少於80億條記錄

例如,考慮一個DataFrame有兩個分區,每個分區有三個記錄。 該表達式將返回以下ID:0,1,2,8589934592(1L << 33),
8589934593,8589934594

>>> df0 = sc.parallelize(range(2), 2).mapPartitions(lambda x: [(1,), (2,), (3,)]).toDF(['col1'])
>>> df0.select(monotonically_increasing_id().alias('id')).collect()
[Row(id=0), Row(id=1), Row(id=2), Row(id=8589934592), Row(id=8589934593), Row(id=8589934594)]

94.pyspark.sql.functions.month(col)

將給定日期的月份提取為整數

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(month('a').alias('month')).collect()
[Row(month=4)]

95.pyspark.sql.functions.months_between(date1, date2)

返回date1和date2之間的月數。

>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00', '1996-10-30')], ['t', 'd'])
>>> df.select(months_between(df.t, df.d).alias('months')).collect()
[Row(months=3.9495967...)]

96.pyspark.sql.functions.nanvl(col1, col2)

如果不是NaN,則返回col1;如果col1是NaN,則返回col2
兩個輸入都應該是浮點列(DoubleType或FloatType)

>>> df = sqlContext.createDataFrame([(1.0, float('nan')), (float('nan'), 2.0)], ("a", "b"))
>>> df.select(nanvl("a", "b").alias("r1"), nanvl(df.a, df.b).alias("r2")).collect()
[Row(r1=1.0, r2=1.0), Row(r1=2.0, r2=2.0)]

97.pyspark.sql.functions.next_day(date, dayOfWeek)

返回晚於日期列值的第一個日期
星期幾參數不區分大小寫,並接受:“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”.

>>> df = sqlContext.createDataFrame([('2015-07-27',)], ['d'])
>>> df.select(next_day(df.d, 'Sun').alias('date')).collect()
[Row(date=datetime.date(2015, 8, 2))]

98.pyspark.sql.functions.ntile(n)

窗口函數:在有序的窗口分區中返回ntile組ID(從1到n)。 例如,如果n是4,則第一季度行將得到值1,第二季度將得到2,第三季
度將得到3,並且最后一個季度將得到4。
這相當於SQL中的NTILE函數。

99.pyspark.sql.functions.percentRank()

窗口函數:.. note ::在1.6中不推薦使用,而是使用percent_rank

100.pyspark.sql.functions.percent_rank()

窗口函數:返回窗口分區內行的相對等級(即百分位數)

101.pyspark.sql.functions.pow(col1, col2)

返回引發第二個參數的第一個參數的值。

102.pyspark.sql.functions.quarter(col)

提取給定日期的四分之一整數

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(quarter('a').alias('quarter')).collect()
[Row(quarter=2)]

103.pyspark.sql.functions.rand(seed=None)

用i.i.d生成一個隨機列 來自U的樣本[0.0,1.0]。

104.pyspark.sql.functions.randn(seed=None)

用i.i.d生成一列 來自標准正態分布的樣本。

105.pyspark.sql.functions.rank()

窗口函數:返回窗口分區內的行的等級

rank和denseRank的區別在於,當有關系時,denseRank在排序順序上沒有差距。 也就是說,如果你使用密集排名進行比賽,並且有
三個人排在第二位,那么你會說所有三個排在第二位,下一個排在第三位。

這相當於SQL中的RANK函數。

106.pyspark.sql.functions.regexp_extract(str, pattern, idx)

從指定的字符串列中提取由java正則表達式標識的特定(idx)組

>>> df = sqlContext.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_extract('str', '(\d+)-(\d+)', 1).alias('d')).collect()
[Row(d=u'100')]

107.pyspark.sql.functions.regexp_replace(str, pattern, replacement)

將與regexp匹配的指定字符串值的所有子字符串替換為rep

>>> df = sqlContext.createDataFrame([('100-200',)], ['str'])
>>> df.select(regexp_replace('str', '(\d+)', '--').alias('d')).collect()
[Row(d=u'-----')]

108.pyspark.sql.functions.repeat(col, n)

重復一個字符串列n次,並將其作為新的字符串列返回

>>> df = sqlContext.createDataFrame([('ab',)], ['s',])
>>> df.select(repeat(df.s, 3).alias('s')).collect()
[Row(s=u'ababab')]

109.pyspark.sql.functions.reverse(col)

反轉字符串列並將其作為新的字符串列返回

110.pyspark.sql.functions.rint(col)

返回值最接近參數的double值,等於一個數學整數。

111.pyspark.sql.functions.round(col, scale=0)

如果scale> = 0,將e的值舍入為小數點的位數,或者在scale <0的時候將其舍入到整數部分。

>>> sqlContext.createDataFrame([(2.546,)], ['a']).select(round('a', 1).alias('r')).collect()
[Row(r=2.5)]

112.pyspark.sql.functions.rowNumber()

窗口函數:.. note:1.6中不推薦使用,而是使用row_number

113.pyspark.sql.functions.row_number()

窗口函數:返回窗口分區內從1開始的連續編號。

114.pyspark.sql.functions.rpad(col, len, pad)

右鍵將字符串列填充到寬度為len的pad

>>> df = sqlContext.createDataFrame([('abcd',)], ['s',])
>>> df.select(rpad(df.s, 6, '#').alias('s')).collect()
[Row(s=u'abcd##')]

115.pyspark.sql.functions.rtrim(col)

從右端修剪指定字符串值的空格

116.pyspark.sql.functions.second(col)

將給定日期的秒數提取為整數

>>> df = sqlContext.createDataFrame([('2015-04-08 13:08:15',)], ['a'])
>>> df.select(second('a').alias('second')).collect()
[Row(second=15)]

117.pyspark.sql.functions.sha1(col)

返回SHA-1的十六進制字符串結果

>>> sqlContext.createDataFrame([('ABC',)], ['a']).select(sha1('a').alias('hash')).collect()
[Row(hash=u'3c01bdbb26f358bab27f267924aa2c9a03fcfdb8')]

118.pyspark.sql.functions.sha2(col, numBits)

返回SHA-2系列散列函數(SHA-224,SHA-256,SHA-384和SHA-512)的十六進制字符串結果。 numBits表示結果的所需位長度,其值
必須為224,256,384,512或0(相當於256)

>>> digests = df.select(sha2(df.name, 256).alias('s')).collect()
>>> digests[0]
Row(s=u'3bc51062973c458d5a6f2d8d64a023246354ad7e064b1e4e009ec8a0699a3043')
>>> digests[1]
Row(s=u'cd9fb1e148ccd8442e5aa74904cc73bf6fb54d1d54d333bd596aa9bb4bb4e961')

119.pyspark.sql.functions.shiftLeft(col, numBits)

移動給定值numBits左側

>>> sqlContext.createDataFrame([(21,)], ['a']).select(shiftLeft('a', 1).alias('r')).collect()
[Row(r=42)]

120.pyspark.sql.functions.shiftRight(col, numBits)

將給定值numBits右移

>>> sqlContext.createDataFrame([(42,)], ['a']).select(shiftRight('a', 1).alias('r')).collect()
[Row(r=21)]

121.pyspark.sql.functions.shiftRightUnsigned(col, numBits)

無符號移位給定值numBits的權利

>>> df = sqlContext.createDataFrame([(-42,)], ['a'])
>>> df.select(shiftRightUnsigned('a', 1).alias('r')).collect()
[Row(r=9223372036854775787)]

122.pyspark.sql.functions.signum(col)

計算給定值的符號

123.pyspark.sql.functions.sin(col)

計算給定值的正弦值

124.pyspark.sql.functions.sinh(col)

 計算給定值的雙曲正弦值

125.pyspark.sql.functions.size(col)

集合函數:返回存儲在列中的數組或映射的長度
參數:col – 列或表達式名稱

>>> df = sqlContext.createDataFrame([([1, 2, 3],),([1],),([],)], ['data'])
>>> df.select(size(df.data)).collect()
[Row(size(data)=3), Row(size(data)=1), Row(size(data)=0)]

126.pyspark.sql.functions.skewness(col)

聚合函數:返回組中值的偏度

127.pyspark.sql.functions.sort_array(col, asc=True)

集合函數:按升序對給定列的輸入數組進行排序。
參數:col – 列或表達式名稱

>>> df = sqlContext.createDataFrame([([2, 1, 3],),([1],),([],)], ['data'])
>>> df.select(sort_array(df.data).alias('r')).collect()
[Row(r=[1, 2, 3]), Row(r=[1]), Row(r=[])]
>>> df.select(sort_array(df.data, asc=False).alias('r')).collect()
[Row(r=[3, 2, 1]), Row(r=[1]), Row(r=[])]

128.pyspark.sql.functions.soundex(col)

返回字符串的SoundEx編碼

>>> df = sqlContext.createDataFrame([("Peters",),("Uhrbach",)], ['name'])
>>> df.select(soundex(df.name).alias("soundex")).collect()
[Row(soundex=u'P362'), Row(soundex=u'U612')]

129.pyspark.sql.functions.sparkPartitionId()

注意在1.6中不推薦使用spark_partition_id。

130.pyspark.sql.functions.spark_partition_id()

Spark任務的分區ID列
請注意,這是不確定的,因為它取決於數據分區和任務調度

>>> df.repartition(1).select(spark_partition_id().alias("pid")).collect()
[Row(pid=0), Row(pid=0)]

131.pyspark.sql.functions.split(str, pattern)

將模式分割(模式是正則表達式)。
注:pattern是一個字符串表示正則表達式。

>>> df = sqlContext.createDataFrame([('ab12cd',)], ['s',])
>>> df.select(split(df.s, '[0-9]+').alias('s')).collect()
[Row(s=[u'ab', u'cd'])]

132.pyspark.sql.functions.sqrt(col)

計算指定浮點值的平方根

133.pyspark.sql.functions.stddev(col)

聚合函數:返回組中表達式的無偏樣本標准差

134.pyspark.sql.functions.stddev_pop(col)

聚合函數:返回一個組中表達式的總體標准差

135.pyspark.sql.functions.stddev_samp(col)

聚合函數:返回組中表達式的無偏樣本標准差

136.pyspark.sql.functions.struct(*cols)

創建一個新的結構列。
列:cols – 列名稱(字符串)列表或列表達式列表

>>> df.select(struct('age', 'name').alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]
>>> df.select(struct([df.age, df.name]).alias("struct")).collect()
[Row(struct=Row(age=2, name=u'Alice')), Row(struct=Row(age=5, name=u'Bob'))]

137.pyspark.sql.functions.substring(str, pos, len)

子字符串從pos開始,長度為len,當str是字符串類型時,或者返回從字節pos開始的字節數組的片段,當str是二進制類型時,長度
為len

>>> df = sqlContext.createDataFrame([('abcd',)], ['s',])
>>> df.select(substring(df.s, 1, 2).alias('s')).collect()
[Row(s=u'ab')]

138.pyspark.sql.functions.substring_index(str, delim, count)

在計數定界符delimiter之前,返回字符串str的子串。 如果count是正數,則返回最后一個分隔符左邊的數字(從左數起)。 如果
計數為負數,則返回最后一個分隔符右邊的數字(從右數起)。 substring_index搜索delim時執行區分大小寫的匹配

 

>>> df = sqlContext.createDataFrame([('a.b.c.d',)], ['s'])
>>> df.select(substring_index(df.s, '.', 2).alias('s')).collect()
[Row(s=u'a.b')]
>>> df.select(substring_index(df.s, '.', -3).alias('s')).collect()
[Row(s=u'b.c.d')]

139.pyspark.sql.functions.sum(col)

聚合函數:返回表達式中所有值的總和。

140.pyspark.sql.functions.sumDistinct(col)

聚合函數:返回表達式中不同值的總和

141.pyspark.sql.functions.tan(col)

計算給定值的正切值

142.pyspark.sql.functions.tanh(col)

計算給定值的雙曲正切

143.pyspark.sql.functions.toDegrees(col)

將以弧度度量的角度轉換為以度數度量的近似等效角度。

144.pyspark.sql.functions.toRadians(col)

將以度數度量的角度轉換為以弧度測量的近似等效角度

145.pyspark.sql.functions.to_date(col)

將StringType或TimestampType的列轉換為DateType

>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_date(df.t).alias('date')).collect()
[Row(date=datetime.date(1997, 2, 28))]

146.pyspark.sql.functions.to_utc_timestamp(timestamp, tz)

假定給定的時間戳在給定的時區並轉換為UTC

>>> df = sqlContext.createDataFrame([('1997-02-28 10:30:00',)], ['t'])
>>> df.select(to_utc_timestamp(df.t, "PST").alias('t')).collect()
[Row(t=datetime.datetime(1997, 2, 28, 18, 30))]

147.pyspark.sql.functions.translate(srcCol, matching, replace)

一個函數通過匹配的字符轉換srcCol中的任何字符。 替換中的字符對應於匹配的字符。當字符串中的任何字符與匹配中的字符匹配
時,翻譯將發生

>>> sqlContext.createDataFrame([('translate',)], ['a']).select(translate('a', "rnlt", "123").alias('r')).collect()
[Row(r=u'1a2s3ae')]

148.pyspark.sql.functions.trim(col)

修剪指定字符串列的兩端空格。

149.pyspark.sql.functions.trunc(date, format)

返回截斷到格式指定單位的日期

參數: format – ‘year’, ‘YYYY’, ‘yy’ or ‘month’, ‘mon’, ‘mm’

>>> df = sqlContext.createDataFrame([('1997-02-28',)], ['d'])
>>> df.select(trunc(df.d, 'year').alias('year')).collect()
[Row(year=datetime.date(1997, 1, 1))]
>>> df.select(trunc(df.d, 'mon').alias('month')).collect()
[Row(month=datetime.date(1997, 2, 1))]

150.pyspark.sql.functions.udf(f, returnType=StringType)

創建一個表示用戶定義函數(UDF)的列表達式。

>>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df.select(slen(df.name).alias('slen')).collect()
[Row(slen=5), Row(slen=3)]

151.pyspark.sql.functions.unbase64(col)

解碼BASE64編碼的字符串列並將其作為二進制列返回

152.pyspark.sql.functions.unhex

十六進制的反轉。 將每對字符解釋為十六進制數字,並轉換為數字的字節表示形式

>>> sqlContext.createDataFrame([('414243',)], ['a']).select(unhex('a')).collect()
[Row(unhex(a)=bytearray(b'ABC'))]

153.pyspark.sql.functions.unix_timestamp(timestamp=None, format='yyyy-MM-dd HH:mm:ss')

使用默認時區和默認語言環境,將具有給定模式的時間字符串(默認為'yyyy-MM-dd HH:mm:ss')轉換為Unix時間戳(以秒為單位
),如果失敗則返回null。

如果時間戳記為“無”,則返回當前時間戳。

154.pyspark.sql.functions.upper(col)

將字符串列轉換為大寫

155.pyspark.sql.functions.var_pop(col)

聚合函數:返回組中值的總體方差

156.pyspark.sql.functions.var_samp(col)

聚合函數:返回組中值的無偏差

157.pyspark.sql.functions.variance(col)

聚合函數:返回組中值的總體方差

158.pyspark.sql.functions.weekofyear(col)

將一個給定日期的星期數解壓為整數。

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(weekofyear(df.a).alias('week')).collect()
[Row(week=15)]

159.pyspark.sql.functions.when(condition, value)

評估條件列表並返回多個可能的結果表達式之一。 如果不調用Column.otherwise(),則不匹配條件返回None

參數:condition – 一個布爾的列表達式.
        value – 一個文字值或一個Column表達式

>>> df.select(when(df['age'] == 2, 3).otherwise(4).alias("age")).collect()
[Row(age=3), Row(age=4)]

>>> df.select(when(df.age == 2, df.age + 1).alias("age")).collect()
[Row(age=3), Row(age=None)]

160.pyspark.sql.functions.year(col)

將給定日期的年份提取為整數

>>> df = sqlContext.createDataFrame([('2015-04-08',)], ['a'])
>>> df.select(year('a').alias('year')).collect()
[Row(year=2015)]

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM