1 聯表
df1.join(df2,連接條件,連接方式)
如:df1.join(df2,[df1.a==df2.a], "inner").show()
連接方式:字符串類型, 如 "left" , 常用的有:inner, cross, outer, full, full_outer, left, left_outer, right, right_outer; 默認是 inner
連接條件: df1["a"] == df2["a"] 或 "a" 或 df1.a == df2.a , 如有多個條件的情況 如,[df1["a"] == df2["a"] ,df1["b"] == df2["b"] ] 或 (df.a > 1) & (df.b > 1)
需要注意的:
如果使用 "a" 進行連接,則會自動合並相同字段,只輸入一個。如 df1.join(df2,"a","left") 只輸出df1的 a字段,df2 的 a 字段是去掉了。
2 udf使用
需添加引用
from pyspark.sql.functions import udf
from pyspark.sql import functions as F
有兩種方式:
第一種
def get_tablename(a):
return "name"
get_tablename_udf = F.udf(get_tablename)
第二種
@udf
def get_tablename_udf (a):
return "name"
兩種方式的調用是一樣的
df.withColumn("tablename", get_tablename_udf (df[a"]))
3 分組
使用groupBy方法
單個字段:df.groupBy("a") 或 df.groupBy(df.a)
多個字段:df.groupBy([df.a, df.b]) 或 df.groupBy(["a", "b"])
需要注意的:
groupBy方法后面 一定要跟字段輸出方法,如:agg()、select()等
4 查詢條件
使用 filter() 或 where() ,兩者一樣的。
單條件: df.filter(df.a > 1) 或 df.filter("a > 1")
多條件:df.filter("a > 1 and b > 0 ") 或 df.filter((df.a > 1) & (df.b ==0)) 或 df.filter((df.a > 1) | (df.b ==0))
5 替換null值
使用 fillna() 或 fill()方法
df.fillna({"a":0, "b":""})
df.na.fill({"a":0, "b":""})
6 排序
使用 orderBy() 或 sort()方法
df.orderBy(df.a.desc())
df.orderBy(df["age"].desc(), df["name"].desc())
df.orderBy(["age", "name"], ascending=[0, 1])
df.orderBy(["age", "name"], ascending=False)
需要注意的:
ascending 默認為True 升序, False 降序
7 新增列
使用 withColumn() 或 alias()方法
df.withColumn("b",F.lit(999))
df.withColumn("b",df.a)
df.withColumn("b",df.a).withColumn("m","m1")
df.agg(F.lit(ggg).alias("b"))
df.select(F.lit(ggg).alias("b"))
需要注意的:
withColumn方法會覆蓋df里面原有的同名的列
8 重命名列名
使用 withColumnRenamed() 方法
df.withColumnRenamed("a","a1").withColumnRenamed("m","m1")
需要注意的點:
確定要重命名的列在df里面存在
9 創建新的DataFrame
使用createDataFrame()方法
第一種:spark.createDataFrame([(列1的數據, 列2的數據)], ['列名1', '列名2'])
第二種:spark.createDataFrame([{"列1":數據,“列2”:數據},{……}])
第三種:spark.createDataFrame([(列1的數據, 列2的數據)], '列名1: int, 列名2:string')
需要注意的:
數據集和列集合 個數要一致
spark為 SparkSession 對象, 例如:spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
10 並集
使用union() 或 unionAll() 方法
df.union(df1)
需要注意的:
這兩個方法都不會主動消除重復項的,如需要,在后面跟distinct() 如:df.union(df1).distinct()
這兩個方法都是按照數據列的擺放順序進行合並,而不是根據列名
兩個結果集的列 數量要保證一樣大小
11 交集
使用 intersect()方法
df1.select("a").intersect(df2.select("a"))
返回 df1和df2 中 相同的a 字段
12 差集
使用 subtract()方法
df1.select("a").subtract(df2.select("a"))
返回 df1 有,而df2 沒有的 a 字段值。
需要注意的:
取的是df1的數據
13 判斷是否NULL值
使用isNull()方法 或 sql語句
df.where(df["a"].isNull())
df.where("a is null")
14 在計算條件中加入判斷
使用when() 方法
df.select(when(df.age == 2, 1).alias("age"))
age列的值:當滿足when條件,則輸出1 ,否則,輸出NULL
多個條件 :when((df.age == 2) & (df.name == '"name") , 1)
15 獲取前N條
使用 limit() 方法
結合orderBy使用
df = df.orderBy(df["PayAmount"].desc()).limit(500)
15 進行排名
使用 rank().over() 方法
結合Window.orderBy()
from pyspark.sql.window import Window
df = pay_df.select("PayAmount", F.rank().over(Window.orderBy(pay_df["PayAmount"].desc())).alias('rank_id'))
16 刪除列
使用 drop() 方法
df = df.drop("a")
也可以用select() 輸出想要的列,從而達到刪除效果
16 刪除重復項
使用dropDuplicates() 或 distinct() 方法
df =df.dropDuplicates() //所有列去重
df =df.dropDuplicates(["a", "b"]) //指定列去重,其他列按順序取第一行值
df =df.distinct() //所有列去重
17 包含某個字符
使用 contains() 方法
df = df.where(df["a"].contains("hello")) //查找 a字段中 包含了hello字符 的所有記錄
18 轉大小寫
使用 upper() 或 lower() 方法
df = df.select(F.upper(df["a"]).alias("A") ) // 將a字段值轉為大寫
19 分組求總數
countDistinct()
count()
比如:
df = spark.createDataFrame([{"a": 1, "c":1}, {"a": 2, "c":1}])
df.groupBy('a').agg(
F.count('a').alias('c1'),
F.countDistinct('a').alias('c2')
)
c1 = 2
c2 = 1