Pyspark常用API總結


DF 類似於二維表的數據結果

mame

age

狗山石 23

 

獲取df的列名: df.columns

顯示當前值 打印 df.show() show(2) show括號里面傳入參數可以顯示查看幾行 show(2,False)  False 是否全部顯示 False 不隱藏
獲取前10行數據 df.limit(10) 里面傳遞的一個整形 后面加上show() 可以打印

獲取列值key df.select(["key"]) 傳入參數寫法 df.select([df[x] for x in keys]) 后面加上show() 可以打印

將每一行轉化為json 並將行名,命名為wang df.select(to_json(struct([df["key"]])).alias("wang")).show() 

把df格式轉化列表 db.collect()
計算總數 db.count()
取出 db.take() 里面必須傳入參數 除去2個
設置分區個數 db.repartition(5) 設置有5個partition

對 partition進行單獨處理 db.foreachPartition(f) f 是一個函數
def f(iterator):
for x in iterator:
    print(x) # 讀取每個x,即每一條數據
    print(x.asDict()) # 把 row的數據轉化為 字典類型

news_data_rdd = df.rdd.mapPartitions(f).cache()  

news_data_rdd = df.rdd.mapPartitions(lambda iterator: insert_from_memory(iterator, cur_index_name)).cache()

使用 mapPartitions 必須在此啟動 news_data_rdd.count() 啟動 news_data_rdd

 

 

 df.select() 操作

from pyspark.sql.functions import to_json, struct,concat

# 將每一行轉化為json 並將行名,命名為wang
df.select(to_json(struct([df["key"]])).alias("wang"))
# 將每一行轉化為字符串 並將行名,命名為data

df.select(concat(*df.columns).alias('data'))

# 在窗口調試后面加上 show() 可以打印

 

 

 

 

 

 df.select() 操作  # 在窗口調試后面加上 show() 可以打印

df.select(["*"]) # 選擇全部數據
df.select(["name"]) # 選擇對應列操作

df 的寫入操作

df.select(to_json(struct(["key","json"])).alias("value")).write.format("kafka").option("kafka.bootstrap.servers",','.join(["emr2-header-1.ipa.aidigger.com:6667", "emr2-header-2.ipa.
aidigger.com:6667"])).option("topic","text").save()

 

df.write 寫入操作

寫入kafka  
to_json(struct(["key","json"])).alias("value")  把df轉化為json格式

df.select(to_json(struct(["key","json"])).alias("value")).write.format("kafka").option("kafka.bootstrap.servers",','.join(["ip", "ip
"])).option("topic","主題名字").save()

from pyspark.sql.functions import to_json, struct,concat
df.select(concat(*df.columns).alias('data')).show()


 

 

 

 

 

 

 

 

 

 

 

 

 

 

收藏的博客 

PySpark SQL常用語法 df   https://www.jianshu.com/p/177cbcb1cb6f

使用PySpark將kafka數據寫入ElasticSearch  https://blog.csdn.net/qq_37050993/article/details/90606527

Pyspark DataFrame讀寫  https://www.jianshu.com/p/d1f6678db183

pyspark讀寫操作  https://blog.csdn.net/zyj20200/article/details/81697786#33-%E5%86%99%E5%88%B0hive

pyspark系列--日期函數 https://blog.csdn.net/suzyu12345/article/details/79673569

 

pyspark系列  https://blog.csdn.net/suzyu12345/category_6653162.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM