PySpark—DataFrame筆記


本人CSDN同篇文章:PySpark—DataFrame筆記
 DataFrame基礎 + 示例,為了自查方便匯總了關於PySpark-dataframe相關知識點,集合了很多篇博客和知乎內容,結合了自身實踐,加上了更多示例和講解方便理解,本文內容較多配合目錄看更方便。

 如有任何問題或者文章錯誤歡迎大家留言批評指正,感謝閱讀。

什么是DataFrame?

DataFrames通常是指本質上是表格形式的數據結構。它代表行,每個行都包含許多觀察值。
行可以具有多種數據格式(異構),而列可以具有相同數據類型(異構)的數據。
DataFrame通常除數據外還包含一些元數據。例如,列名和行名。
我們可以說DataFrames是二維數據結構,類似於SQL表或電子表格。
DataFrames用於處理大量結構化和半結構化數據

連接本地spark

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName('my_app_name') \
    .getOrCreate()

Spark初始化設置

from pyspark.sql import SparkSession

# SparkSession 配置
spark = SparkSession.builder \
    .appName("My test") \
    .getOrCreate()
# spark.conf.set("spark.executor.memory", "1g")
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sc = spark.sparkContext
sc.setLogLevel("WARN")   

SparkSession 介紹

參考文章:
SparkSession思考與總結:https://blog.csdn.net/yyt8582/article/details/81840031
SparkSession的認識:https://www.cnblogs.com/zzhangyuhang/p/9039695.html
spark配置:https://spark.apache.org/docs/latest/configuration.html

 (1)為何出現SparkSession

 SparkSession 本質上是SparkConf、SparkContext、SQLContext、HiveContext和StreamingContext這些環境的集合,避免使用這些來分別執行配置、Spark環境、SQL環境、Hive環境和Streaming環境。SparkSession現在是讀取數據、處理元數據、配置會話和管理集群資源的入口。

 (2)SparkSession創建RDD

from pyspark.sql.session import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.master("local") \
        .appName("My test") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    sc = spark.sparkContext

    data = [1, 2, 3, 4, 5, 6, 7, 8, 9]
    rdd = sc.parallelize(data)

 (3)SparkSession實例化參數:

 通過靜態類Builder來實例化。Builder 是 SparkSession 的構造器。 通過 Builder, 可以添加各種配置。可以通SparkSession.builder 來創建一個 SparkSession 的實例,並通過 stop 函數來停止 SparkSession。Builder又有很多方法,包括:

Builder 的方法如下:

(1)appName函數
appName(String name)
用來設置應用程序名字,會顯示在Spark web UI中

(2)master函數
master(String master)
設置Spark master URL 連接,比如"local" 設置本地運行,"local[4]"本地運行4cores,或則"spark://master:7077"運行在spark standalone 集群。

(3)config函數
這里有很多重載函數。其實從這里我們可以看出重載函數,是針對不同的情況,使用不同的函數,但是他們的功能都是用來設置配置項的。
spark.some.config.option和some-value是configuation性質的鍵值對完成。例如spark configuration properties和yarn properties等
  1、.config(SparkConf conf)
  根據給定的SparkConf設置配置選項列表。
  2、config(String key, boolean value)
  設置配置項,針對值為boolean的
  3、config(String key, double value)
  設置配置項,針對值為double的
  4、config(String key, long value)
  設置配置項,針對值為long 的
  5、config(String key, String value)
  設置配置項,針對值為String 的

(4)getOrCreate函數
getOrCreate()
獲取已經得到的 SparkSession,或則如果不存在則創建一個新的基於builder選項的SparkSession

(5)enableHiveSupport函數
表示支持Hive,包括 鏈接持久化Hive metastore, 支持Hive serdes, 和Hive用戶自定義函數

(6)withExtensions函數
withExtensions(scala.Function1<SparkSessionExtensions,scala.runtime.BoxedUnit> f)
這允許用戶添加Analyzer rules, Optimizer rules, Planning Strategies 或者customized parser.這一函數我們是不常見的。

DF創建

 (1)直接創建

# 直接創建Dataframe
df = spark.createDataFrame([
        (1, 144.5, 5.9, 33, 'M'),
        (2, 167.2, 5.4, 45, 'M'),
        (3, 124.1, 5.2, 23, 'F'),
        (4, 144.5, 5.9, 33, 'M'),
        (5, 133.2, 5.7, 54, 'F'),
        (3, 124.1, 5.2, 23, 'F'),
        (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender']) 

 (2)從字典創建

df = spark.createDataFrame([{'name':'Alice','age':1},
    {'name':'Polo','age':1}]) 

 (3)指定schema創建

schema = StructType([
    StructField("id", LongType(), True),   
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])
df = spark.createDataFrame(csvRDD, schema)

 (4)讀文件創建

airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')

 (5)從pandas dataframe創建

import pandas as pd
from pyspark.sql import SparkSession

colors = ['white','green','yellow','red','brown','pink']
color_df=pd.DataFrame(colors,columns=['color'])
color_df['length']=color_df['color'].apply(len)

color_df=spark.createDataFrame(color_df)
color_df.show()

DF的架構查看

df.printSchema()

describe統計屬性

# 例如特定列中的行總數,其均值,標准差,列的最小值和最大值/空值計數
df.describe(["age"]).show()

dtypes查看字段類型

# 查看列的類型 ,同pandas
color_df.dtypes

# [('color', 'string'), ('length', 'bigint')]

查看列名/行數

# 查看有哪些列 ,同pandas
df.columns
# ['color', 'length']

# 行數
df.count()

# 列數
len(df.columns)

distinct查找列唯一值

df.select('id').distinct()
  .rdd.map(lambda r: r[0]).collect()

show顯示

# show和head函數顯示數據幀的前N行
df.show(5)
df.head(5)

統計分析

 (1)頻繁項目

# 查找每列出現次數占總的30%以上頻繁項目
df.stat.freqItems(["id", "gender"], 0.3).show()
+------------+----------------+
|id_freqItems|gender_freqItems|
+------------+----------------+
|      [5, 3]|          [M, F]|
+------------+----------------+

 (2)交叉表

# 分組統計,交叉分析
# 計算給定列的成對頻率表
df.crosstab('Age', 'Gender').show()
Output:
+----------+-----+------+
|Age_Gender|    F|     M|
+----------+-----+------+
|      0-17| 5083| 10019|
|     46-50|13199| 32502|
|     18-25|24628| 75032|
|     36-45|27170| 82843|
|       55+| 5083| 16421|
|     51-55| 9894| 28607|
|     26-35|50752|168835|
+----------+-----+------

select選擇和切片篩選

 (1)列的選擇

# 選擇一列的幾種方式,比較麻煩,不像pandas直接用df['cols']就可以了
# 需要在filter,select等操作符中才能使用
color_df.select('length').show()
color_df.select(color_df.length).show()
color_df.select(color_df[0]).show()
color_df.select(color_df['length']).show()
color_df.filter(color_df['length']>=4).show()   # filter方法

 (2)選擇幾列的方法

color_df.select('length','color').show()
# 如果是pandas,似乎要簡單些
color_df[['length','color']]

 (3)多列選擇和切片

# 3.多列選擇和切片
color_df.select('length','color')
        .select(color_df['length']>4).show()

 (4)between 范圍選擇

color_df.filter(color_df.length.between(4,5) )
        .select(color_df.color.alias('mid_length')).show()

 (5)聯合篩選

# 這里使用一種是 color_df.length, 另一種是color_df[0]
color_df.filter(color_df.length>4)
        .filter(color_df[0]!='white').show()

 (6)filter運行類SQL

color_df.filter("color='green'").show()

color_df.filter("color like 'b%'").show()

 (7)where方法的SQL

color_df.where("color like '%yellow%'").show()

 (8)直接使用SQL語法

# 首先dataframe注冊為臨時表,然后執行SQL查詢
color_df.createOrReplaceTempView("color_df")
spark.sql("select count(1) from color_df").show()

drop刪除一列

# 刪除一列
color_df.drop('length').show()

# pandas寫法
df.drop(labels=['a'],axis=1)

withColumn新增/修改列

withColumn(colName, col)
通過為原數據框添加一個新列或替換已存在的同名列而返回一個新數據框。colName —— 是一個字符串, 為新列的名字。必須是已存在的列的名字
col —— 為這個新列的 Column 表達式。必須是含有列的表達式。如果不是它會報錯 AssertionError: col should be Column

 (1)新增一列

# 列名可以是原有列,也可以是新列
df.withColumn('page_count', df.page_count+100)
df.withColumn('new_page_count', df.page_count+100)

 (2)lit新增一列常量

# lit新增一列常量
import pyspark.sql.functions as F
df = df.withColumn('mark', F.lit(1))

withColumnRenamed更改列名:

 (1)直接修改

# 修改單個列名
new_df = df.withColumnRenamed('old_name', 'new_name')

 (2)聚合后修改

一、withColumnRenamed()方式修改列名:
# 重新命名聚合后結果的列名(需要修改多個列名就跟多個:withColumnRenamed)
# 聚合之后不修改列名則會顯示:count(member_name)
df_res.agg({'member_name': 'count', 'income': 'sum', 'num': 'sum'})
      .withColumnRenamed("count(member_name)", "member_num").show()

二、利用pyspark.sql中的functions修改列名:
from pyspark.sql import functions as F
df_res.agg(
    F.count('member_name').alias('mem_num'),
    F.sum('num').alias('order_num'),
    F.sum("income").alias('total_income')
).show()

cast修改列數據類型

from pyspark.sql.types import IntegerType

# 下面兩種修改方式等價
df = df.withColumn("height", df["height"].cast(IntegerType()))
df = df.withColumn("weight", df.weight.cast('int'))
print(df.dtypes)

sort排序

 (1)單字段排序

# spark排序
color_df.sort('color',ascending=False).show()

# pandas的排序
df.sort_values(by='b')

 (2)多字段排序

color_df.filter(color_df['length']>=4)
        .sort('length', 'color', ascending=False).show()

 (3)混合排序

color_df.sort(color_df.length.desc(),color_df.color.asc())                               
        .show()

 (4)orderBy排序

color_df.orderBy('length','color').show()

toDF

toDF(*cols)
Parameters:
  cols – list of new column names (string)
  
# 返回具有新指定列名的DataFrame
df.toDF('f1', 'f2')

DF與RDD互換

rdd_df = df.rdd	  # DF轉RDD
df = rdd_df.toDF()  # RDD轉DF

DF和Pandas互換

pandas_df = spark_df.toPandas()	
spark_df = sqlContext.createDataFrame(pandas_df)

union合並+去重:

nodes_cust = edges.select('tx_ccl_id', 'cust_id') # 客戶編號 
nodes_cp = edges.select('tx_ccl_id', 'cp_cust_id') # 交易對手編號 
nodes_cp = nodes_cp.withColumnRenamed('cp_cust_id', 'cust_id') # 統一節點列名 
nodes = nodes_cust.union(nodes_cp).dropDuplicates(['cust_id'])

count行數/列數

# 行數
df.count()

# 列數
len(df.columns)

缺失值

 (1)計算列中的空值數目

# 計算一列空值數目
df.filter(df['col_name'].isNull()).count()

# 計算每列空值數目
for col in df.columns:
    print(col, "\t", "with null values: ", 
          df.filter(df[col].isNull()).count())

 (2)刪除有缺失值的行

# 1、刪除有缺失值的行
df2 = df.dropna()

# 2、或者
df2 = df.na.drop()

 (3)平均值填充缺失值

from pyspark.sql.functions import when
import pyspark.sql.functions as F

# 計算各個數值列的平均值
def mean_of_pyspark_columns(df, numeric_cols):
    col_with_mean = []
    for col in numeric_cols:
        mean_value = df.select(F.avg(df[col]))
        avg_col = mean_value.columns[0]
        res = mean_value.rdd.map(lambda row: row[avg_col]).collect()
        col_with_mean.append([col, res[0]])
    return col_with_mean

# 用平均值填充缺失值
def fill_missing_with_mean(df, numeric_cols):
    col_with_mean = mean_of_pyspark_columns(df, numeric_cols)
    for col, mean in col_with_mean:
        df = df.withColumn(col, when(df[col].isNull() == True, F.lit(mean)).otherwise(df[col]))
    return df

if __name__ == '__main__':
    # df需要自行創建
    numeric_cols = ['age2', 'height2']  # 需要填充空值的列
    df = fill_missing_with_mean(df, numeric_cols)  # 空值填充
    df.show()

替換值

 (1)replace 全量替換

# 替換pyspark dataframe中的任何值,而無需選擇特定列
df = df.replace('?',None)
df = df.replace('ckd \t','ckd')

 (2)functions 部分替換

# 只替換特定列中的值,則不能使用replace.而使用pyspark.sql.functions
# 用classck的notckd替換no
import pyspark.sql.functions as F
df = df.withColumn('class',
                   F.when(df['class'] == 'no', F.lit('notckd'))
                    .otherwise(df['class']))

groupBy + agg 聚合

 (1)agg

agg(self, *exprs)計算聚合並將結果返回為:`DataFrame`
可用的聚合函數有“avg”、“max”、“min”、“sum”、“count”。
:param exprs:從列名(字符串)到聚合函數(字符串)的dict映射,
或:類:`Column`的列表。

# 官方接口示例
>>> gdf = df.groupBy(df.name)
>>> sorted(gdf.agg({"*": "count"}).collect())
[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]

>>> from pyspark.sql import functions as F
>>> sorted(gdf.agg(F.min(df.age)).collect())
[Row(name=u'Alice', min(age)=2), Row(name=u'Bob', min(age)=5)]

 (2)sum

# 獲得兩列總分數和總人數,groupBy可以根據多列分組
df = df.groupBy('anchor_id')
	   .agg({"live_score": "sum", "live_comment_count": "sum"})
	   .withColumnRenamed("sum(live_score)", "total_score")
	   .withColumnRenamed("sum(live_comment_count)", "total_people")

 (3)avg

# avg方法計算平均得分
df = df.groupBy("course_id")
       .agg({"score": "avg"})
       .withColumnRenamed("avg(score)", "avg_score")

 (4)count

# count方法計算資源個數
df = df.groupBy("course_id")
       .agg({"comment": "count"})
       .withColumnRenamed("count(comment)", "comment_count")

 (5)max/min

# max取最大值min取最小值
df = df.groupBy("org_id")
       .agg({"publish_date": "max"})
       .withColumnRenamed("max(publish_date)", "active_time")

 (6)collect_list()

# collect_list()將groupBy的數據處理成列表
from pyspark.sql import functions as F
edges.show()
df = edges.groupBy("tx_ccl_id").agg(F.collect_list("cust_id"))
          .withColumnRenamed("collect_list(cust_id)", "comment_list")
df.show()
# edge.show():結果
+-------+----------+-----------+---------+
|cust_id|cp_cust_id|drct_tx_amt|tx_ccl_id|
+-------+----------+-----------+---------+
|     18|        62|  7646.5839|        0|
|     88|        41|  7683.6484|        0|
|     90|        68| 16184.5801|        0|
|     95|         5| 11888.3697|        0|
………………………………………………………………………………………………………………
# df.show():結果
+---------+---------------------+
|tx_ccl_id|collect_list(cust_id)|
+---------+---------------------+
|        0| [18, 88, 90, 95, ...|
|        1| [1077, 1011, 1004...|

join 連接

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

join(other, on=None, how=None)
# 使用給定的連接表達式與另一個DataFrame連接
Parameters:
other – 連接的右端
on – 聯接列名的字符串、列名列表、聯接表達式(列)或列列表。如果on是指示聯接列名稱的字符串或字符串列表,則該列必須存在於兩邊,這將執行equi聯接。
how – str, 默認inner連接. 必須是以下的其中一個: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

 (1)根據單個字段連接

# 外連接
>>> df.join(df2, df.name == df2.name, 'outer')
      .select(df.name, df2.height).collect()
[Row(name=None, height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]

# 外連接(與上個示例等價)
>>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
[Row(name='Tom', height=80), Row(name='Bob', height=85), Row(name='Alice', height=None)]

 (2)根據多個字段連接

# 根據多個字段連接
>>> cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name='Alice', age=2), Row(name='Bob', age=5)]

# 根據多個字段連接(與上個示例等價)
>>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
[Row(name='Bob', age=5)]

差集/並集/交集

# 創建數據
df = spark.createDataFrame((
      (1, "asf"),
      (2, "2143"),
      (3, "rfds")
    )).toDF("label", "sentence")

df2 = spark.createDataFrame((
      (1, "asf"),
      (2, "2143"),
      (4, "f8934y")
    )).toDF("label", "sentence")

 (1)差集

newDF = df.select("sentence")
		  .subtract(df2.select("sentence"))
newDF.show()

+--------+
|sentence|
+--------+
|  f8934y|
+--------+

 (2)交集

newDF = df.select("sentence")
		  .intersect(df2.select("sentence"))
newDF.show()

+--------+
|sentence|
+--------+
|     asf|
|    2143|
+--------+

 (3)並集

newDF = df.select("sentence")
		  .union(df2.select("sentence"))
newDF.show()

+--------+
|sentence|
+--------+
|     asf|
|    2143|
|  f8934y|
|     asf|
|    2143|
|    rfds|
+--------+

 (4)並集+去重

newDF = df.select("sentence")
          .union(df2.select("sentence")).distinct()
newDF.show()

+--------+
|sentence|
+--------+
|    rfds|
|     asf|
|    2143|
|  f8934y|
+--------+

UDF自定義函數

# 創建用戶自定義函數
# UDF對表中的每一行進行函數處理,返回新的值
udf(f=None, returnType=StringType)
Parameters:
  f – python函數(如果用作獨立函數)
  returnType – 用戶定義函數的返回類型。

示例如下:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType

# 自定義函數1
def to_upper(s):
    if s is not None:
        return s.upper()

# 自定義函數2
def add_one(x):
    if x is not None:
        return x + 1

# 注冊udf函數
slen_udf = udf(lambda s: len(s), IntegerType())
to_upper_udf = udf(to_upper, StringType())
add_one_udf = udf(add_one, IntegerType())

if __name__ == '__main__':
	df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
	# 法一:
	# df.select(slen("name").alias("slen(name)"), 
				to_upper_udf(df["name"]), 
				add_one_udf(df["age"])).show()  # 與下面一句等價
	df.select(slen_udf("name").alias("slen(name)"), 
			  to_upper_udf("name"), 
			  add_one_udf("age")).show()

	# 法二:
	df = df.withColumn("slen(name)", slen_udf(df["name"]))
	df = df.withColumn("age", add_one_udf(df["age"]))
	df = df.withColumn("name", to_upper_udf("name"))
	df.show()

explode分割

# 為給定數組或映射中的每個元素返回一個新行
from pyspark.sql.functions import split, explode

df = sc.parallelize([(1, 2, 3, 'a b c'),
                     (4, 5, 6, 'd e f'),
                     (7, 8, 9, 'g h i')])
        .toDF(['col1', 'col2', 'col3', 'col4'])
df.withColumn('col4', explode(split('col4', ' '))).show()
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   a|
|   1|   2|   3|   b|
|   1|   2|   3|   c|
|   4|   5|   6|   d|
|   4|   5|   6|   e|
|   4|   5|   6|   f|
|   7|   8|   9|   g|
|   7|   8|   9|   h|
|   7|   8|   9|   i|
+----+----+----+----+

# 示例二
from pyspark.sql import Row
from pyspark.sql.functions import explode

eDF = spark.createDataFrame([Row(
    a=1, 
    intlist=[1, 2, 3], 
    mapfield={"a": "b"})])
eDF.select(explode(eDF.intlist).alias("anInt")).show()
+-----+
|anInt|
+-----+
|    1|
|    2|
|    3|
+-----+

DF和python變量互轉

 在sparkSQL編程的時候,經常需要獲取DataFrame的信息,然后python做其他的判斷或計算,比如獲取dataframe的行數以判斷是否需要等待,獲取dataframe的某一列或第一行信息以決定下一步的處理,等等。

##&ensp;(1)獲取第一行的值
```python
# 獲取第一行的值,返回普通python變量
# 由於 first() 返回的是 Row 類型,可以看做是dict類型,
# 在只有一列的情況下可以用 [0] 來獲取值。
value = df.select('columns_name').first()[0] 

 (2)獲取第一行的多個值

#獲取第一行的多個值,返回普通python變量
# first() 返回的是 Row 類型,可以看做是dict類型,用 row.col_name 來獲取值
row = df.select('col_1', 'col_2').first()
col_1_value = row.col_1
col_2_value = row.col_2

 (3)獲取一列/多列的所有值

# 獲取一列的所有值,或者多列的所有值
# collect()函數將分布式的dataframe轉成local類型的 list-row格式
rows= df.select('col_1', 'col_2').collect()
value = [[ row.col_1, row.col_2 ] for row in rows ]

不常用的一些

 (1)getField

# 在StructField中通過名稱獲取字段。
from pyspark.sql import Row
df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df.select(df.r.getField("b")).show()  # 用法與下面等價
df.select(df.r.a).show()

 (2)isNotNull

# 如果當前表達式不為空,則為true
from pyspark.sql import Row
df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
df.filter(df.height.isNotNull()).collect()
# [Row(height=80, name='Tom')]

 (3)isNull

# 如果當前表達式為空,則為true
from pyspark.sql import Row
df = spark.createDataFrame([Row(name='Tom', height=80), Row(name='Alice', height=None)])
df.filter(df.height.isNull()).collect()
# [Row(height=None, name='Alice')]

 (4)isin

# 如果自變量的求值包含該表達式的值,則該表達式為true
df[df.name.isin("Bob", "Mike")].collect()
# [Row(age=5, name='Bob')]
df[df.age.isin([1, 2, 3])].collect()
# [Row(age=2, name='Alice')]

 (5)like

# Column根據SQL LIKE匹配返回布爾值。
df.filter(df.name.like('Al%')).collect()
# [Row(age=2, name='Alice')]

 (6)otherwise

otherwise(value)
# 計算條件列表,並返回多個可能的結果表達式之一,如果otherwise()未調用,則為不匹配的條件返回None

from pyspark.sql import functions as F
df.select(df.name, F.when(df.age > 3, 1).otherwise(0)).show()
+-----+-------------------------------------+
| name|CASE WHEN (age > 3) THEN 1 ELSE 0 END|
+-----+-------------------------------------+
|Alice|                                    0|
|  Bob|                                    1|
+-----+-------------------------------------+

 (7)when

when(condition, value)
Parameters:
  condition – 布爾Column表達式
  value – 文字值或Column表達式
# 計算條件列表,並返回多個可能的結果表達式之一.如果otherwise()未調用,則為不匹配的條件返回None

from pyspark.sql import functions as F
>>> df.select(df.name, F.when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0)).show()
+-----+------------------------------------------------------------+
| name|CASE WHEN (age > 4) THEN 1 WHEN (age < 3) THEN -1 ELSE 0 END|
+-----+------------------------------------------------------------+
|Alice|                                                          -1|
|  Bob|                                                           1|
+-----+------------------------------------------------------------+

后續需要整理學習:

PySpark SQL常用語法:https://www.jianshu.com/p/177cbcb1cb6f
PySpark︱DataFrame操作指南:增/刪/改/查/合並/統計與數據處理:
https://blog.csdn.net/sinat_26917383/article/details/80500349?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2

參考文章:

pyspark官方文檔:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
關於spark的博客集合:https://blog.csdn.net/qq_32023541/column/info/19893
pyspark配置config:https://www.cnblogs.com/Tw1st-Fate/p/11094344.html

DataFrame基礎:https://blog.csdn.net/suzyu12345/article/details/79673493
DataFrame:https://www.jianshu.com/p/cb0fec7a4f6d
列累積求和:https://blog.csdn.net/XnCSD/article/details/90676259
dataframe,排序並排名:https://blog.csdn.net/a1272899331/article/details/90268141

pyspark sql使用總結:https://blog.csdn.net/weixin_44053979/article/details/89296224
pyspark 分組取前幾個:https://blog.csdn.net/weixin_40161254/article/details/88817225
Dataframe使用的坑 與 經歷:https://cloud.tencent.com/developer/article/1435995
Pandas 和 PySpark 的 DataFrame 相互轉換:http://fech.in/2018/pyspark_and_pandas/
讀寫dataframe:https://blog.csdn.net/suzyu12345/article/details/79673473#31-寫到csv
DataFrame操作指南:增/刪/改/查/合並/統計與數據處理:
https://blog.csdn.net/sinat_26917383/article/details/80500349?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM