pyspark 內容介紹(一)


 

pyspark 包介紹

內容

PySpark是針對Spark的Python API。根據網上提供的資料,現在匯總一下這些類的基本用法,並舉例說明如何具體使用。也是總結一下經常用到的這些公有類的使用方式。方便初學者查詢及使用。

Public 類們:

  • SparkContext:

    Spark 功能的主入口。

  • RDD:

    彈性分布式數據集,就是在Spark中的基礎抽象

  • Broadcast:

    一個在task之間重用的廣播變量。

  • Accumulator:

    一個“add-only” 共享變量,task只能增加值。

  • SparkConf:

    用於配置Spark.

  • SparkFiles:

    在job中訪問文件。

  • StorageLevel:

    更細粒度的緩存持久化級別。

     

將分為兩篇介紹這些類的內容,這里首先介紹SparkConf
1. class pyspark. SparkConf ( loadDefaults=True, _jvm=None, _jconf=None )

配置一個Spark應用,一般用來設置各種Spark的鍵值對作為參數。

大多數時候,使用SparkConf()來創建SparkConf對象,也用於載入來自spark.* Java系統的屬性值。此時,在SparkConf對象上設置的任何參數都有高於系統屬性的優先級。

對於單元測試,也能調用SparkConf(false)來略過額外的配置,無論系統屬性是什么都可以獲得相同的配置。

這個類中的設值方法都是支持鏈式結構的,例如,你可以這樣編寫配置conf.setMaster(“local”).setAppName(“My app”)

注意:

一旦SparkConf對象被傳遞給Spark,它就被復制並且不能被其他人修改。

contains ( key )

配置中是否包含一個指定鍵。

get ( key, defaultValue=None )

獲取配置的某些鍵值,或者返回默認值。

getAll ( )

得到所有的鍵值對的list。

set ( key, value )

設置配置屬性。

setAll ( pairs )

通過傳遞一個鍵值對的list,為多個參數賦值。

etAppName ( value )

設置應用名稱

setExecutorEnv ( key=None, value=None, pairs=None )

設置環境變量復制給執行器。

setIfMissing ( key, value )

如果沒有,則設置一個配置屬性。

setMaster ( value )

設置主連接地址。

setSparkHome ( value )

設置工作節點上的Spark安裝路徑。

toDebugString ( )

返回一個可打印的配置版本。

2. class pyspark. SparkContext ( master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'> )

Spark功能的主入口,SparkContext 代表到Spark 集群的連接,並且在集群上能創建RDD和broadcast。

PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
accumulator ( value, accum_param=None )

用指定的初始化值創建一個Accumulator累加器。使用AccumulatorParam對象定義如何添加數據類型的值。默認AccumulatorParams為整型和浮點型。如果其他類型需要自定義。

addFile ( path, recursive=False )

使用在每個節點上的Spark job添加文件下載。這里path 參數可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者URI。

在Spark的job中訪問文件,使用L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>}可以找到下載位置。

如果遞歸選項被設置為“TRUE”則路徑能被指定。當前路徑僅僅支持Hadoop文件系統。

 1 >>> from pyspark import SparkFiles
 2 >>> path = os.path.join(tempdir, "test.txt")
 3 >>> with open(path, "w") as testFile:
 4 ...    _ = testFile.write("100") 
 5 >>> sc.addFile(path)
 6 >>> def func(iterator):
 7 ...    with open(SparkFiles.get("test.txt")) as testFile:
 8 ...        fileVal = int(testFile.readline())
 9 ...        return [x * fileVal for x in iterator]
10 >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect()
11 [100, 200, 300, 400]

 

addPyFile ( path )

為所有將在SparkContext上執行的任務添加一個a.py或者.zip的附件。這里path 參數可以使本地文件也可以使在HDFS中的文件,也可以是HTTP、HTTPS或者FTP URI。

applicationId

Spark應用的唯一ID,它的格式取決於調度器實現。

  • 本地模式下像這樣的ID‘local-1433865536131’
  • 模式下像這樣的ID‘application_1433865536131_34483’
>>> sc.applicationId  
u'local-...'

 

binaryFiles ( path, minPartitions=None )

注意

  • 從HDFS上讀取二進制文件的路徑,本地文件系統(在所有節點上都可用),或者其他hadoop支持的文件系統URI黨組偶一個二進制數組。每個文件作為單獨的記錄,並且返回一個鍵值對,這個鍵就是每個文件的了路徑,值就是每個文件的內容。
  • 小文件優先選擇,大文件也可以,但是會引起性能問題。
binaryRecords ( path, recordLength )
  • path – 輸入文件路徑
  • recordLength – 分割記錄的長度(位數)
注意

從平面二進制文件中載入數據,假設每個記錄都是一套指定數字格式的數字(ByteBuffer),並且每個記錄位數的數是恆定的。

broadcast ( value )

廣播一個制度變量到集群,返回一個L{Broadcast<pyspark.broadcast.Broadcast>} 對象在分布式函數中讀取。這個變量將只發一次給每個集群。

cancelAllJobs ( )

取消所有已排程的或者正在運行的job。

cancelJobGroup ( groupId )

 

取消指定組的已激活job,查看SparkContext.setJobGroup更多信息。

 

defaultMinPartitions

當不被用戶指定時,默認Hadoop RDDs 為最小分區。

defaultParallelism

當不被用戶指定時,默認並行級別執行。(例如reduce task)

dump_profiles ( path )

轉存配置信息到目錄路徑下。

emptyRDD ( )

創建沒有分區或者元素的RDD。

getConf ( )
getLocalProperty ( key )

在當前線程中得到一個本地設置屬性。

classmethod getOrCreate ( conf=None )
參數:conf – SparkConf (optional)

獲取或者實例化一個SparkContext並且注冊為單例模式對象。

hadoopFile ( path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0 )、

用任意來自HDFS的鍵和值類讀取一個老的Hadoop輸入格式,本地系統(所有節點可用),或者任何支持Hadoop的文件系統的URI。這個機制是與sc.sequenceFile是一樣的。

Hadoop 配置可以作為Python的字典傳遞。這將被轉化成Java中的配置。

參數:

  • path – Hadoop文件路徑
  • inputFormatClass – 輸入的Hadoop文件的規范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可寫鍵類的合格類名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可寫值類的合格類名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (默認為none)
  • valueConverter – (默認為none)
  • conf – Hadoop配置,作為一個字典傳值 (默認為none)
  • batchSize – Python對象的數量代表一個單一的JAVA對象 (默認 0, 表示自動匹配batchSize)
hadoopRDD ( inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0 )

讀取Hadoop輸入格式用任意鍵值類。與上面的類相似。

參數:

  • inputFormatClass – 輸入的Hadoop文件的規范格式(例如 “org.apache.hadoop.mapred.TextInputFormat”)
  • keyClass – 可寫鍵類的合格類名 (例如“org.apache.hadoop.io.Text”)
  • valueClass –可寫值類的合格類名 (e.g. “org.apache.hadoop.io.LongWritable”)
  • keyConverter – (默認為none)
  • valueConverter – (默認為none)
  • conf – Hadoop配置,作為一個字典傳值 (默認為none)
  • batchSize – Python對象的數量代表一個單一的JAVA對象 (默認 0, 表示自動匹配batchSize)
newAPIHadoopFile ( path, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0 )

與上面的功能類似.

newAPIHadoopRDD ( inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None, batchSize=0 )

任意Hadoop的配置作為參數傳遞。

parallelize ( c, numSlices=None )

分配一個本Python集合構成一個RDD。如果輸入代表了一個性能范圍,建議使用xrange。

>>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
[[0], [2], [3], [4], [6]]
>>> sc.parallelize(xrange(0, 6, 2), 5).glom().collect()
[[], [0], [], [2], [4]]

 

pickleFile ( name, minPartitions=None )

載入使用RDD.saveAsPickleFile方法保存的RDD。

>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
>>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

 

range ( start, end=None, step=1, numSlices=None )

創建一個int類型元素組成的RDD,從開始值到結束(不包含結束),里面都是按照步長增長的元素。這就要用到Python內置的函數range()。如果只有一個參數調用,這個參數就表示結束值,開始值默認為0.

參數:

  • start –起始值
  • end – 結束值(不包含)
  • step – 步長(默認: 1)
  • numSlices –RDD分區數量(切片數)

返回值:RDD

>>> sc.range(5).collect()
[0, 1, 2, 3, 4]
>>> sc.range(2, 4).collect()
[2, 3]
>>> sc.range(1, 7, 2).collect()
[1, 3, 5]

 

runJob ( rdd, partitionFunc, partitions=None, allowLocal=False )

執行指定的partitionFunc 在指定的分區,返回一個元素數組。如果不指定分區,則將運行在所有分區上。

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part])
[0, 1, 4, 9, 16, 25]

>>> myRDD = sc.parallelize(range(6), 3)
>>> sc.runJob(myRDD, lambda part: [x * x for x in part], [0, 2], True)
[0, 1, 16, 25]

 


 
  • sequenceFile(path, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, minSplits=None, batchSize=0)
  • 讀取Hadoop 的SequenceFile,機制如下:

    1.一個Java RDD通過SequenceFile或者其他輸入格式創建,需要鍵值的可寫類參數。

    2.序列化

    3.如果失敗,則對每個鍵值調用‘toString’。

    4.在Python上,PickleSerializer用來反序列化。

參數:

path –序列化文件路徑

keyClass – 可用鍵類(例如 “org.apache.hadoop.io.Text”)

valueClass – 可用值類 (例如 “org.apache.hadoop.io.LongWritable”)

keyConverter

valueConverter

minSplits – 數據集最低分割數(默認 min(2, sc.defaultParallelism))

batchSize – 代表一個JAVA對象Python對象的數量 (默認0, 自動)

 

setCheckpointDir(dirName)

 

設定作為檢查點的RDD的目錄,如果運行在集群上,則目錄一定時HDFS路徑。

setJobGroup ( groupId, description, interruptOnCancel=False )

分配一個組ID給所有被這個線程開啟的job。

通常,一個執行單位由多個Spark 的action或者job組成。應用程序可以將所有把所有job組成一個組,給一個組的描述。一旦設置好,Spark的web UI 將關聯job和組。

應用使用SparkContext.cancelJobGroup來取消組。

>>> import threading
>>> from time import sleep
>>> result = "Not Set"
>>> lock = threading.Lock()
>>> def map_func(x):
...     sleep(100)
...     raise Exception("Task should have been cancelled")
>>> def start_job(x):
...     global result
...     try:
...         sc.setJobGroup("job_to_cancel", "some description")
...         result = sc.parallelize(range(x)).map(map_func).collect()
...     except Exception as e:
...         result = "Cancelled"
...     lock.release()
>>> def stop_job():
...     sleep(5)
...     sc.cancelJobGroup("job_to_cancel")
>>> supress = lock.acquire()
>>> supress = threading.Thread(target=start_job, args=(10,)).start()
>>> supress = threading.Thread(target=stop_job).start()
>>> supress = lock.acquire()
>>> print(result)
Cancelled

 

如果對於job組,interruptOnCancel被設定為True,那么那么取消job將在執行線程中調用Thread.interrupt()。這對於確保任務實時停止是有作用的。但是默認情況下,HDFS可以通過標記節點為dead狀態來停止線程。

setLocalProperty ( key, value )

設定本地影響提交工作的屬性,例如Spark 公平調度池。

setLogLevel ( logLevel )

控制日志級別。重寫任何用戶自定義的日志設定。有效的日志級別包括:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN。

classmethod setSystemProperty ( key, value )

設定Java系統屬性,例如spark.executor.memory,這一定要在實例化SparkContext之前被激活。

show_profiles ( )

打印配置信息到標准輸出。

sparkUser ( )

為運行SparkContext 的用戶獲得SPARK_USER

startTime

當SparkContext被發起,則返回新的時間紀元。

statusTracker ( )

Return StatusTracker object

返回StatusTracker對象

stop ( )

關閉SparkContext。

textFile ( name, minPartitions=None, use_unicode=True )

從HDFS中讀取一個text文件,本地文件系統(所有節點可用),或者任何支持Hadoop的文件系統的URI,然后返回一個字符串類型的RDD。

如果用戶use_unicode為False,則strings類型將為str(用utf-8編碼),這是一種比unicode更快、更小的編碼(Spark1.2以后加入)。

>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello world!")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello world!']

 

uiWebUrl

返回由SparkContext的SparkUI實例化開啟的URL。

union ( rdds )

建立RDD列表的聯合。

支持不同序列化格式的RDD的unions()方法,需要使用默認的串行器將它們強制序列化(串行化):

>>> path = os.path.join(tempdir, "union-text.txt")
>>> with open(path, "w") as testFile:
...    _ = testFile.write("Hello")
>>> textFile = sc.textFile(path)
>>> textFile.collect()
[u'Hello']
>>> parallelized = sc.parallelize(["World!"])
>>> sorted(sc.union([textFile, parallelized]).collect())
[u'Hello', 'World!']

 

version

應用運行的Spark的版本。

wholeTextFiles ( path, minPartitions=None, use_unicode=True )

讀取HDFS的文本文件的路徑,這是一個本地文件系統(所有節點可用),或者任何支持Hadoop的文件系統的URI。每個文件被當做一個獨立記錄來讀取,然后返回一個鍵值對,鍵為每個文件的路徑,值為每個文件的內容。

如果用戶use_unicode為False,則strings類型將為str(用utf-8編碼),這是一種比unicode更快、更小的編碼(Spark1.2以后加入)。

舉例說明,如果有如下文件:

hdfs://a-hdfs-path/part-00000
hdfs://a-hdfs-path/part-00001
...
hdfs://a-hdfs-path/part-nnnnn

如果執行 rdd = sparkContext.wholeTextFiles(“hdfs://a-hdfs-path”), 那么rdd 包含:

(a-hdfs-path/part-00000, its content)
(a-hdfs-path/part-00001, its content)
...
(a-hdfs-path/part-nnnnn, its content)

注意

這種情況適合小文件,因為每個文件都會被載入到內存中。消耗很多內存啊!

>>> dirPath = os.path.join(tempdir, "files")
>>> os.mkdir(dirPath)
>>> with open(os.path.join(dirPath, "1.txt"), "w") as file1:
...    _ = file1.write("1")
>>> with open(os.path.join(dirPath, "2.txt"), "w") as file2:
...    _ = file2.write("2")
>>> textFiles = sc.wholeTextFiles(dirPath)
>>> sorted(textFiles.collect())
[(u'.../1.txt', u'1'), (u'.../2.txt', u'2')]

 

本篇接少了兩個類SparkContextSparkConf,下一篇將會介紹其余的幾個類的內容,這是一篇匯總性質的文章主要便於以后使用時知道具體類中的方法調用為剛剛接觸Spark和我差不多人提供參考。還有理解不到位的請多多理解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM