sparkSession 讀取 csv
1. 利用 sparkSession 作為 spark 切入點
2. 讀取 單個 csv 和 多個 csv
from pyspark.sql import SparkSession from pyspark.sql import SQLContext if __name__ == '__main__': scSpark = SparkSession \ .builder \ .appName("reading csv") \ .getOrCreate() # getOrCreate 獲取現有 SparkSession 或者 新建一個 SparkSession print(scSpark.sparkContext) data_file = 'csvdata/*.csv' # 讀取 csvdata 文件夾下所有 csv 文件,但是這些 csv 文件格式必須相同,也就是 列 相同 # data_file = 'xx.csv' # 讀取單個 csv 文件 sdfData = scSpark.read.csv(data_file, header=True, sep=",").cache() # .cache 緩存返回data,從而提高性能 print('Total Records = {}'.format(sdfData.count())) sdfData.show()
讀取一個文件夾下多個 csv 時,務必保持 csv 格式相同,否則會警告,但不報錯
19/10/15 02:29:32 WARN CSVDataSource: Number of column in CSV header is not equal to number of fields in the schema: Header length: 2, schema size: 4 19/10/15 02:29:32 WARN CSVDataSource: CSV header does not conform to the schema.
各種模式讀取文件
一個小項目:計算 GPS 頻次
原數據長這樣
1,108.99564,34.33999999,1 2,108.99564,34.3399087138298,1 3,108.99564,34.3398174376596,1 4,108.99564,34.3397261614894,1 5,108.99564,34.3396348853192,1 6,108.99564,34.3395436091489,1 7,108.99564,34.3394523329787,1 8,108.99564,34.3393610568085,1
Standalone 模式讀取本地文件
在 local 模式下,文件無特殊要求
在 standalone 模式下讀取本地文件,這個文件必須在每個節點上都存在,且路徑一致
簡易代碼
from __future__ import division from pyspark import SparkContext max_lng = 136 min_lng = 73 max_lat = 54 min_lat = 3 lng_stage = 1000 lat_stage = 1000 lng_step=(max_lng - min_lng) / (lng_stage) lat_step=(max_lat - min_lat) / (lat_stage) def mymap(data): # print(data, 1111111111111111111111111111111111) return data.split(',') def mygroup(data): # 對經緯度分區,打標簽 # print(data[1], type(data[1])) label_lng = round(float(data[1]) / lng_step, 1) label_lat = round(float(data[2]) / lat_step, 1) return (label_lng, label_lat) def mapkey(data): # 把標簽還原成經緯度 return data[0][0] * lng_step, data[0][1] * lat_step, data[1] # sc = SparkContext('local', 'gpsfreq') # 本地模式 sc = SparkContext('spark://hadoop10:7077', 'gpsfreq') # standalone 模式 rdd = sc.textFile('dwd.csv', 100) # standalone 模式 讀取本地文件,必須在每個節點上都有這個文件,且路徑一致 # print rdd.map(mymap).collect() print rdd.map(mymap).groupBy(mygroup).mapValues(len).map(mapkey).collect()
輸出長這樣
[(111.8943, 32.629799999999996, 11), (109.2861, 34.2006, 42), (109.0467, 35.2155, 77), (111.72420000000001, 33.1755, 11), (111.636, 32.8542, 7)]
Standalone 模式讀取 hdfs
需要啟動 hadoop,並把數據傳到 hdfs
hadoop fs -mkdir /spark
hadoop fs -put dwd.csv /spark
第一節中的代碼只需改動一句
rdd = sc.textFile('hdfs://hadoop10:9000/spark/dwd.csv')
spark on yarn 模式讀取 hdfs
yarn 模式只能讀取分布式數據,如 hdfs
異常記錄
1. 讀取本地文件會出現 文件不存在
2. 如下異常
Caused by: org.apache.spark.SparkException: Error from python worker: /usr/bin/python: No module named pyspark PYTHONPATH was: /usr/lib/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/filecache/46/__spark_libs__6155997154509109577.zip/spark-core_2.11-2.4.4.jar
解決方法
conf = SparkConf().setAppName('gps').setMaster('yarn') # 下面兩句解決 No module named pyspark conf.set('spark.yarn.dist.files','file:/usr/lib/spark/python/lib/pyspark.zip,file:/usr/lib/spark/python/lib/py4j-0.10.7-src.zip') conf.setExecutorEnv('PYTHONPATH','pyspark.zip:py4j-0.10.7-src.zip') sc = SparkContext(conf=conf) # yarn 模式必須讀取 集群 中的文件,不能讀本地 rdd = sc.textFile('hdfs://hadoop10:9000/spark/dwd.csv')
我重啟機器后發現沒有這兩句竟然也可以,所以這里作為參考
spark-submit 命令:即使我在代碼里指定了 master 為 yarn,spark-submit 運行時仍需指定 master,否則報錯 No module named pyspark 【上面第二個異常】
bin/spark-submit --master yarn gpsfreq.py # 參數必須在 py 文件前面
python 命令:如果是 python 命令執行,無需額外指定 master
python gpsfreq.py
我的理解是 spark 無法找到 pyspark,因為 spark 並沒有在任何地方和 pyspark 關聯;
而 python 可以找到 pyspark,因為在 PYTHONPATH 中設置了 pyspark 的搜索路徑
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/pyspark:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip:$PYTHONPATH
參考資料:
http://spark.apache.org/docs/latest/api/python/index.html 官網
https://www.360kuai.com/pc/99c84010bb76fd1fa?cota=4&kuai_so=1&sign=360_57c3bbd1&refer_scene=so_1
https://blog.csdn.net/cjhnbls/article/details/79254188