pyspark讀取和存入數據的三種方法


pyspark讀取數據

方法一:從hdfs讀取

# -*- coding: utf-8 -*
from pyspark.sql import SparkSession, HiveContext,DataFrameWriter
import argparse
import time
import numpy as np
import pandas as pd


spark = SparkSession.builder.enableHiveSupport().appName("test").getOrCreate()

start = time.time()



### 數據載入方法1: hdfs上載入parquent格式

input = "/aaa/bbb/ccc"

data = spark.read.parquet(input)

data.show(5)


+-------------------+------+--------------------+
|        START_TIME|amount|           payerCode|
+-------------------+------+--------------------+
|2019-06-28 21:04:37|  10.7|692200000XXXXXXX|   
|2018-11-24 20:15:40|  19.9|602200000XXXXXXX|    
|2019-06-19 12:33:14|   2.0|692200000XXXXXXX|  
|2019-07-03 23:04:12|  5.27|622200000XXXXXXX|  
|2018-11-26 21:26:30|   2.0|622200000XXXXXXX|   
+-------------------+------+--------------------+


## pyspark讀取數據方法二:從hive中讀取

方法二:數據從數據庫讀取

####### 生成查詢的SQL語句,這個跟hive的查詢語句一樣,所以也可以加where等條件語句
hive_context= HiveContext(spark)
hive_read = "select * from {}.{}".format(hive_database, hive_table2)
  
####### 通過SQL語句在hive中查詢的數據直接是dataframe的形式
read_df = hive_context.sql(hive_read)


read_df.show(5)



+-------------------+------+--------------------+
|        START_TIME|amount|           payerCode|
+-------------------+------+--------------------+
|2019-06-28 21:04:37|  10.7|692200000XXXXXXX|   
|2018-11-24 20:15:40|  19.9|602200000XXXXXXX|    
|2019-06-19 12:33:14|   2.0|692200000XXXXXXX|  
|2019-07-03 23:04:12|  5.27|622200000XXXXXXX|  
|2018-11-26 21:26:30|   2.0|622200000XXXXXXX|   
+-------------------+------+--------------------+

方法3:讀取hdfs上的csv文件

 tttt = spark.read.csv(filepath,header=’true’,inferSchema=’true’,sep=’,’) 

pyspark數據存儲

方法1: 以parquent格式存儲到hdfs

data1.write.mode(SaveMode.Overwrite).parquet(output)

方法2:以Table的格式存入hive數據庫

##### 數據存入數據庫

hive_database = "testt0618"

data1 = data.limit(10)

1: 用saveAsTable()方法存入hive數據庫


hive_table1 = "ii"

data1.write.format("hive").mode("overwrite").saveAsTable('{}.{}'.format(hive_database, hive_table1))

2:利用sql語句存入hive數據庫

hive_table2 = "lll"

data1.registerTempTable('test_hive')
sqlContext.sql("create table {}.{} select * from test_hive".format(hive_database, hive_table2))

方法3:以csv格式存儲到hdfs

output = “/aaa/bbb/ccc”
data1.coalesce(1).write.option("sep", "#").option("header", "true").csv(output + "_text",mode='overwrite')


轉自:https://www.it610.com/article/1290521545663389696.htm


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM