前言
- 操作系統win10
- 時間2019年3月
- Python版本:Python 3.5.2
- java版本:jdk1.8.0_191
- hadoop版本:hadoop 2.8.5
- spark版本:spark 2.3.1 bin hadoop2.7
- 參考網址1
- 參考網址2
- 參考網址3
Python操作Spark
加載相關依賴包
import numpy as np # use in example fourth
from matplotlib import pyplot as plt # use in example fourth
from pyspark import SparkContext # use in example third&fourth
from pyspark.sql import SparkSession # use in example first&second
from pyspark.sql.types import FloatType # use in example second
first example
SparkSession.read() and two different way of print columns etc.
sc = SparkSession.builder.appName('test').master('local').getOrCreate()
df = sc.read.csv('003.csv', header=True) # SparkSession讀取外部csv文件
print(df.columns) # 類似pandas的DataFrame操作
df.printSchema() # show表結構
sc.stop()
['x1', ' x2', ' x3', ' x4', ' x5', ' x6', ' x7']
root
|-- x1: string (nullable = true)
|-- x2: string (nullable = true)
|-- x3: string (nullable = true)
|-- x4: string (nullable = true)
|-- x5: string (nullable = true)
|-- x6: string (nullable = true)
|-- x7: string (nullable = true)
second example
createDataFrame()&printSchema()&show()&collect() etc.
sc = SparkSession.builder.getOrCreate()
data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
transform_data = sc.createDataFrame(data, FloatType())
transform_data.printSchema() # show表結構
transform_data.show() # show整表
print(data)
print(transform_data.collect()) # collect函數將rdd對象轉成list對象
sc.stop()
root
|-- value: float (nullable = true)
+-----+
|value|
+-----+
| 1.0|
| 2.0|
| 3.0|
| 4.0|
| 5.0|
| 6.0|
| 7.0|
| 8.0|
| 9.0|
+-----+
[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
[Row(value=1.0), Row(value=2.0), Row(value=3.0), Row(value=4.0), Row(value=5.0), Row(value=6.0), Row(value=7.0), Row(value=8.0), Row(value=9.0)]
third example
SparkContext.parallelize()&glom()&reduce()&map()&flatMap() etc.
sc = SparkContext.getOrCreate()
data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
parallelize_data = sc.parallelize(data, 3) # 並行化list對象,第二個參數為分區個數(默認1)
# collect函數將rdd對象轉成list對象
print(parallelize_data.collect())
# glom函數用於顯示出RDD對象的分區情況
print(parallelize_data.glom().collect())
# reduce函數是針對RDD對應的列表中的元素,遞歸地選擇第一個和第二個元素進行操作
# 操作的結果作為一個元素用來替換這兩個元素
# (注意:reduce返回的是一個Python可以識別的對象,非RDD對象。)
print(parallelize_data.reduce(lambda a, b: a+b))
# map函數針對RDD對應的列表的每一個元素,進行lambda函數操作,返回的仍然是一個RDD對象
print(parallelize_data.map(lambda x: (x, x**2)).collect())
# flatMap函數直接用多個元素替換單個元素
print(parallelize_data.flatMap(lambda x: (x, x**2)).collect())
sc.stop()
[[1.0, 2.0, 3.0], [4.0, 5.0, 6.0], [7.0, 8.0, 9.0]]
45.0
[(1.0, 1.0), (2.0, 4.0), (3.0, 9.0), (4.0, 16.0), (5.0, 25.0), (6.0, 36.0), (7.0, 49.0), (8.0, 64.0), (9.0, 81.0)]
[1.0, 1.0, 2.0, 4.0, 3.0, 9.0, 4.0, 16.0, 5.0, 25.0, 6.0, 36.0, 7.0, 49.0, 8.0, 64.0, 9.0, 81.0]
fourth example
SparkContext.parallelize()&count()&stats()&mean()&stdev() etc.
sc = SparkContext.getOrCreate()
total = 1000000
dots = sc.parallelize([2.0 * np.random.random(2) - 1.0 for i in range(total)]).cache()
print("Number of random points:", dots.count())
stats = dots.stats()
print(stats)
print('Mean:', stats.mean())
print('stdev:', stats.stdev())
sc.stop()
Number of random points: 1000000
(count: 1000000, mean: [3.97265087e-04 6.62457038e-06], stdev: [0.57692566 0.57738814], max: [0.99999826 0.99999957], min: [-0.99999682 -0.99999845])
Mean: [3.97265087e-04 6.62457038e-06]
stdev: [0.57692566 0.57738814]