spark教程(七)-文件讀取案例


sparkSession 讀取 csv

1. 利用 sparkSession 作為 spark 切入點

2. 讀取 單個 csv 和 多個 csv

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext


if __name__ == '__main__':
    scSpark = SparkSession \
        .builder \
        .appName("reading csv") \
        .getOrCreate()      # getOrCreate 獲取現有 SparkSession 或者 新建一個 SparkSession

    print(scSpark.sparkContext)

    data_file = 'csvdata/*.csv'    # 讀取 csvdata 文件夾下所有 csv 文件,但是這些 csv 文件格式必須相同,也就是 列 相同
    # data_file = 'xx.csv'           # 讀取單個 csv 文件
    sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache() # .cache  緩存返回data,從而提高性能
    print('Total Records = {}'.format(sdfData.count()))
    sdfData.show()

 

讀取一個文件夾下多個 csv 時,務必保持 csv 格式相同,否則會警告,但不報錯

19/10/15 02:29:32 WARN CSVDataSource: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 2, schema size: 4

19/10/15 02:29:32 WARN CSVDataSource: CSV header does not conform to the schema.

 

各種模式讀取文件

一個小項目:計算 GPS 頻次

原數據長這樣

1,108.99564,34.33999999,1
2,108.99564,34.3399087138298,1
3,108.99564,34.3398174376596,1
4,108.99564,34.3397261614894,1
5,108.99564,34.3396348853192,1
6,108.99564,34.3395436091489,1
7,108.99564,34.3394523329787,1
8,108.99564,34.3393610568085,1

 

Standalone 模式讀取本地文件

在 local 模式下,文件無特殊要求

在 standalone 模式下讀取本地文件,這個文件必須在每個節點上都存在,且路徑一致

 

簡易代碼

from __future__ import division
from pyspark import SparkContext

max_lng = 136
min_lng = 73
max_lat = 54
min_lat = 3
lng_stage = 1000
lat_stage = 1000

lng_step=(max_lng - min_lng) / (lng_stage)
lat_step=(max_lat - min_lat) / (lat_stage)

def mymap(data):
    # print(data, 1111111111111111111111111111111111)
    return data.split(',')

def mygroup(data):
    # 對經緯度分區,打標簽
    # print(data[1], type(data[1]))
    label_lng = round(float(data[1]) / lng_step, 1)
    label_lat = round(float(data[2]) / lat_step, 1)
    return (label_lng, label_lat)

def mapkey(data):
    # 把標簽還原成經緯度
    return data[0][0] * lng_step, data[0][1] * lat_step, data[1]


# sc = SparkContext('local', 'gpsfreq')       # 本地模式
sc = SparkContext('spark://hadoop10:7077', 'gpsfreq')     # standalone 模式
rdd = sc.textFile('dwd.csv', 100)       # standalone 模式 讀取本地文件,必須在每個節點上都有這個文件,且路徑一致

# print rdd.map(mymap).collect()
print rdd.map(mymap).groupBy(mygroup).mapValues(len).map(mapkey).collect()

 

輸出長這樣

[(111.8943, 32.629799999999996, 11), (109.2861, 34.2006, 42), (109.0467, 35.2155, 77), (111.72420000000001, 33.1755, 11), (111.636, 32.8542, 7)]

 

Standalone 模式讀取 hdfs

需要啟動 hadoop,並把數據傳到 hdfs

hadoop fs -mkdir /spark
hadoop fs -put dwd.csv /spark

第一節中的代碼只需改動一句

rdd = sc.textFile('hdfs://hadoop10:9000/spark/dwd.csv')

 

spark on yarn 模式讀取 hdfs

yarn 模式只能讀取分布式數據,如 hdfs 

 

異常記錄

1. 讀取本地文件會出現  文件不存在

2. 如下異常

Caused by: org.apache.spark.SparkException: 
Error from python worker:
  /usr/bin/python: No module named pyspark
PYTHONPATH was:
  /usr/lib/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/46/__spark_libs__6155997154509109577.zip/spark-core_2.11-2.4.4.jar

 

解決方法

conf = SparkConf().setAppName('gps').setMaster('yarn')
# 下面兩句解決 No module named pyspark
conf.set('spark.yarn.dist.files','file:/usr/lib/spark/python/lib/pyspark.zip,file:/usr/lib/spark/python/lib/py4j-0.10.7-src.zip')
conf.setExecutorEnv('PYTHONPATH','pyspark.zip:py4j-0.10.7-src.zip')

sc = SparkContext(conf=conf)
# yarn 模式必須讀取 集群 中的文件,不能讀本地
rdd = sc.textFile('hdfs://hadoop10:9000/spark/dwd.csv')

我重啟機器后發現沒有這兩句竟然也可以,所以這里作為參考 

 

spark-submit 命令:即使我在代碼里指定了 master 為 yarn,spark-submit 運行時仍需指定 master,否則報錯 No module named pyspark  【上面第二個異常】

bin/spark-submit --master yarn gpsfreq.py    # 參數必須在 py 文件前面

python 命令:如果是 python 命令執行,無需額外指定 master

python gpsfreq.py

 

我的理解是 spark 無法找到 pyspark,因為 spark 並沒有在任何地方和 pyspark 關聯;

而 python 可以找到 pyspark,因為在 PYTHONPATH 中設置了 pyspark 的搜索路徑

export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip:$PYTHONPATH

 

 

參考資料:

http://spark.apache.org/docs/latest/api/python/index.html  官網

https://www.360kuai.com/pc/99c84010bb76fd1fa?cota=4&kuai_so=1&sign=360_57c3bbd1&refer_scene=so_1 

https://blog.csdn.net/cjhnbls/article/details/79254188


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM