spark官方文檔 翻譯之 pyspark.sql module


pyspark.sql module

Module Context --spark SQL 、dataFrames的重要類

pyspark.sql.SQLContext --DataFrame 和 SQL 功能的主要進入點

pyspark.sql.DataFrame --以列命令的分布式數據集合

pyspark.sql.Column --DataFrame中以列表示的數據,也就是一列數據

pyspark.sql.Row --DataFrame中以行表示的數據,也就是一行數據

pyspark.sql.HiveContext --對Apache Hive中存儲數據進行訪問的主要進入點

pyspark.sql.GroupedData--DataFrame.groupBy() 返回的聚合方法

pyspark.sql.DataFrameNaFunctions --處理丟失數據的方法(null values).

pyspark.sql.DataFrameStatFunctions --統計功能的方法

pyspark.sql.functions  --DataFrame可用的內置函數列表

pyspark.sql.types --可用的數據類型列表。

pyspark.sql.Window --處理窗口功能


 

class pyspark.sql.SparkSession(sparkContext, jsparkSession=None)
  spark程序 Dataset 和DataFrame API 的入口
  一個用於創建DataFrame,以表的形式記錄DataFrame,在表上執行SQL,存儲表讀取文件的sparkSession。用下面的方式創建sparkSession:

>>> spark = SparkSession.builder \
...     .master("local") \
...     .appName("Word Count") \
...     .config("spark.some.config.option", "some-value") \
...     .getOrCreate()

  class Builder
    SparkSession的創建
     appName(name)
       為應用程序設置一個名字,這個名字將顯示在spark的web UI上。
       如果不設置名字,將隨機產生一個名字
       Parameters: name – 應用程序名
       New in version 2.0.

  config(key=None, value=None, conf=None)
    設置配置選項,使用此方式設置的配置將自動應用到SparkConf和SparkSession的配置中

    對現存的SparkConf,配置conf參數

>>> from pyspark.conf import SparkConf
>>> SparkSession.builder.config(conf=SparkConf())
<pyspark.sql.session...

    對於(key, value)鍵值對,可以省略參數名

>>> SparkSession.builder.config("spark.some.config.option", "some-value")
<pyspark.sql.session...

    Parameters:
      key – 配置屬性的鍵名稱(string)
      value – 配置屬性的值
      conf – SparkConf的一個實例
    New in version 2.0.

    enableHiveSupport()
      提供hive支持,包括連通一個持久化的hive元存儲,支持hive並行轉換,hive用戶自定義函數。
    New in version 2.0.

    getOrCreate()
      得到一個現存在的SparkSession,如果沒有現存的SparkSession,創建一個基於選項的新的SparkSession
      這個方法首先會檢查是否存在一個管局有效的默認SparkSession,如果有那么返回這個SparkSession,如果沒有創建一個SparkSession,重新分配為全局有效的默認SparkSession。

SparkSession as the global default.
>>> s1 = SparkSession.builder.config("k1", "v1").getOrCreate()
>>> s1.conf.get("k1") == s1.sparkContext.getConf().get("k1") == "v1"
True

    如果返回一個現存的SparkSession,配置的選項將應用到這個現存的SparkSession

>>> s2 = SparkSession.builder.config("k2", "v2").getOrCreate()
>>> s1.conf.get("k1") == s2.conf.get("k1")
True
>>> s1.conf.get("k2") == s2.conf.get("k2")
True

    New in version 2.0.

    master(master)
      設置spark master的連接URL,例如“local”運行在本地,“local[4]”運行在本地使用4個核,“spark://master:7077”運行在spark standalone模式的集群上。
      Parameters: master – spark master的URL
    New in version 2.0.

  SparkSession.builder = <pyspark.sql.session.Builder object at 0x7f8396ea2cd0>

  SparkSession.catalog
    一個接口用戶可能用來創建,刪除,更改,查詢數據庫,表,函數等等。
  New in version 2.0.

  SparkSession.conf
    spark運行時的配置接口
    一個接口用戶可以用來得到或者設置與Spark SQL相關的spark和hadoop配置。當得到配置的value,默認給基本的sparkContext設置此值,如果有的話。
  New in version 2.0.

  SparkSession.createDataFrame(data, schema=None, samplingRatio=None)
    根據一個RDD或者一個列表或者一個pandas.DataFrame創建DataFrame
    當schema是list,每一列的類型將從data推斷
    當schema是None, 它將嘗試推斷schema(列名稱和類型)根據data,應該是一個RDD的行,或namedtuple,還是dict
    當schema是DataType或者datatype string,schema必須匹配真實的data,或exception將在運行時拋出。如果給定的schema不是StructType,
    它將作為它僅有的字段包含在一個StructType中,這個字段名字將是“value”,每條記錄將包含在一個元組中,最后將轉換成行。

    如果schema推斷是必須的,samplingRatio(抽樣比例)將被用來推斷schema的行的比例。如果samplingRatio是None,第一行將被使用。
    Parameters: data – 一個任和一種SQL數據(e.g. row, tuple, int, boolean, etc.)表示的RDD,或者list,或pandas.DataFrame.
          schema – a DataType or a datatype string or a list of column names, default is None. The data type string format equals to DataType.simpleString, except that top level struct type can omit the struct<> and                 atomic types use typeName() as their format,
                例如. 使用byte代替非常小的整數for ByteType. 我們也可以是使用int作為一個短名稱for IntegerType.
          samplingRatio – 用作推斷每行樣本比例
    Returns: DataFrame
    Changed in version 2.0: The schema parameter can be a DataType or a datatype string after 2.0. If it’s not a StructType, it will be wrapped into a StructType and each record will also be wrapped into a tuple.

>>> a = [('Alice', 1)]
>>> spark.createDataFrame(a).collect()
[Row(_1=u'Alice', _2=1)]
>>> spark.createDataFrame(a, ['name', 'age']).collect()
[Row(name=u'Alice', age=1)]
>>> d = [{'name': 'Alice', 'age': 1}]
>>> spark.createDataFrame(d).collect()
[Row(age=1, name=u'Alice')]
>>> rdd = sc.parallelize(a)
>>> spark.createDataFrame(rdd).collect()
[Row(_1=u'Alice', _2=1)]
>>> df = spark.createDataFrame(rdd, ['name', 'age'])
>>> df.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = spark.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
...    StructField("name", StringType(), True),
...    StructField("age", IntegerType(), True)])
>>> df3 = spark.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]
>>> spark.createDataFrame(df.toPandas()).collect()  
[Row(name=u'Alice', age=1)]
>>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  
[Row(0=1, 1=2)]
>>> spark.createDataFrame(rdd, "a: string, b: int").collect()
[Row(a=u'Alice', b=1)]
>>> rdd = rdd.map(lambda row: row[1])
>>> spark.createDataFrame(rdd, "int").collect()
[Row(value=1)]
>>> spark.createDataFrame(rdd, "boolean").collect() 
Traceback (most recent call last):
    ...
Py4JJavaError: ...

  SparkSession.newSession()
    返回一個新的SparkSession作為新的session,新的session擁有各自的SQLConf,臨時注冊的views and UDFs,但是共享的SparkContext and table會存儲起來
  New in version 2.0.

  SparkSession.range(start, end=None, step=1, numPartitions=None)
    創建一個以單獨的Long類型列名為id的DataFrame,包含元素在一個范圍內從start到end(exclusive獨有的)步長值為step
  Parameters: start – 起始值
        end – 結束值 (exclusive)
        step – 增值步長 (default: 1)
        numPartitions – DataFrame的分區數
  Returns: DataFrame

 

>>> spark.range(1, 7, 2).collect()
[Row(id=1), Row(id=3), Row(id=5)]

  如果只指定一個參數,他將被用於end值

>>> spark.range(3).collect()
[Row(id=0), Row(id=1), Row(id=2)]

  New in version 2.0.

  SparkSession.read
    返回一個DataFrameReader,它可以被用來從DataFrame中讀取數據
  Returns: DataFrameReader
  New in version 2.0.

  SparkSession.readStream
    返回一個DataFrameReader,它可以被用來從一個streaming DataFrame中讀取數據流
  Note Experimental.
  Returns: DataStreamReader
  New in version 2.0.

  SparkSession.sparkContext
    返回一個潛在的SparkContext.
  New in version 2.0.

  SparkSession.sql(sqlQuery)
    返回一個DataFrame,值為給定的SQL查詢語句查詢的結果
  Returns: DataFrame

>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]

  New in version 2.0.

  SparkSession.stop()
    停止潛在的SparkContext
  New in version 2.0.

  SparkSession.streams
    Returns a StreamingQueryManager that allows managing all the StreamingQuery StreamingQueries active on this context.
    Note Experimental.
  Returns: StreamingQueryManager
  New in version 2.0.

  SparkSession.table(tableName)
    返回指定的表作為DataFrame
  Returns: DataFrame

>>> df.createOrReplaceTempView("table1")
>>> df2 = spark.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True

  New in version 2.0.

  SparkSession.udf
    Returns a UDFRegistration for UDF registration.

    Returns: UDFRegistration
  New in version 2.0.

  SparkSession.version
    The version of Spark on which this application is running.
  New in version 2.0.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM