問題重述
能夠讀取給定的數據文件
出租車GPS數據文件(taxi_gps.txt)
北京區域中心坐標及半徑數據文件(district.txt)
能夠輸出以下統計信息
A:該出租車GPS數據文件(taxi_gps.txt)包含多少量車?
B:北京每個城區的車輛位置點數(每輛車有多個位置點,允許重復)
開發環境:
開發軟件 Pycharm
開發語言:Python
系統macOS Mojave
Spark版本 spark-2.1.1-bin-hadoop2.7
Scala版本Scala-2.10.4
Python版本Python3.7
實驗原理
輸出A:
以第一列統計車輛數,去重
輸出B:
1.從(district.txt)文件中取第一個區的記錄,獲得其名稱D1、中心坐標M(x0,y0)和半徑r;
2.從(taxi_gps.txt)中獲取第一條位置點記錄,獲得其坐標N(x1,y1)
3.利用歐幾里得距離計算公式計算點M和N的距離dis,如果dis<r,則認為該位置記錄屬於區域D1;得到<D1,1>
4.繼續從2開始循環,獲得第二個位置記錄;直至所有記錄遍歷完。
5.繼續從1開始循環,獲得第二個區的記錄
數據說明
待統計區域中心數據格式
區域名稱:北京城區拼音,例:haidianqu, chaoyangqu, dongchengqu
區域中心GPS經度:格式ddd.ddddddd,以度為單位。
區域中心GPS緯度:格式dd.ddddddd,以度為單位。
區域半徑:格式dd,以km為單位
出租車GPS數據格式說明
數據以ASCII文本表示,以逗號為分隔符,以回車換行符(0x0D 0x0A)結尾。數據項及順序:車輛標識、觸發事件、運營狀態、GPS時間、GPS經度、GPS緯度,、GPS速度、GPS方向、GPS狀態
車輛標識:6個字符
觸發事件:0=變空車,1=變載客,2=設防,3=撤防,4=其它
運營狀態:0=空車,1=載客,2=駐車,3=停運,4=其它
GPS時間:格式yyyymmddhhnnss,北京時間
GPS經度:格式ddd.ddddddd,以度為單位。
GPS緯度:格式dd.ddddddd,以度為單位。
GPS速度:格式ddd,取值000-255內整數,以公里/小時為單位。
GPS方位:格式ddd,取值000-360內整數,以度為單位。
GPS狀態:0=無效,1=有效
結束串:回車符+換行符
實驗步驟
A:該出租車GPS數據文件(taxi_gps.txt)包含多少量車?
- 初始化spark
sc = SparkContext('local', 'wordcount')
導入我們的GPS數據,並將其創建為RDD(彈性分布式數據)。
input = sc.textFile("file:/Users/wangxingwu/Desktop/spark樣例程序/python-Word_Count/pycharm/Word_Count/taxi_gps.txt")
- 利用split()函數對PDD數據進行拆分,並第一列數據導入新的RDD中,已知我們的數據由“,”分隔。
- 利用distinct()函數對新的RDD數據進行去重復,而后利用count()函數進行RDD數據的統計,即可得到所有出租車的數量(不重復)。
words = input.flatMap(lambda line: line.split(',')[0:1])#取第一列元素 words = words.distinct()# print("數據中包含的汽車個數為:",words.count())
4.除了3中的方法,還可以通過wordcount的事例程序進行處理,我們將新的RDD進行詞頻統計(Map+Reduce),最后統計單詞(字符串)數量即可。
counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y) print("利用wordcount計算的汽車數量:",counts.count())
實驗結果
通過進行記數,我們得到汽車的數量為11264輛。
代碼
#!/usr/bin/env python # _*_ coding:utf-8 _*_ from pyspark import SparkContext #輸入路徑 sc = SparkContext('local', 'wordcount') # 讀取文件 input = sc.textFile("file:/Users/wangxingwu/Desktop/spark樣例程序/python-Word_Count/pycharm/Word_Count/taxi_gps.txt") # 切分單詞 words = input.flatMap(lambda line: line.split(',')[0:1])#取第一列元素 #print(words.collect()) print("數據中包含的汽車個數為:",words.count()) words = words.distinct()#不重復 print("去重后包含汽車的個數為:",words.count()) # 轉換成鍵值對並計數 counts = words.map(lambda word: (word, 1)).reduceByKey(lambda x, y: x + y) print("利用wordcount計算的汽車數量:",counts.count()) # 輸出結果 counts.foreach(print)
B:北京每個城區的車輛位置點數(每輛車有多個位置點,允許重復)
- 初始化spark
sc = SparkContext('local', 'wordcount')
讀取兩個文件的數據,創建生成2個RDD
input1 = sc.textFile("file:/Users/wangxingwu/Desktop/spark樣例程序/python-Word_Count/pycharm/Word_Count/taxi_gps.txt") input2 = sc.textFile("file:/Users/wangxingwu/Desktop/spark樣例程序/python-Word_Count/pycharm/Word_Count/district.txt")
通過flatMap()函數進行以“,”為分隔符的拆分,其中在GPS文件中只需要每行數據的經緯度數據。
distance0= input1.map(lambda line: line.split(',')[4:6])#取第5,6列元素,汽車坐標 distance1= input2.map(lambda line: line.split(','))#取
2. 因為要定位汽車GPS的位置所屬街區,我們需要對每條數據進行所有街區的計算分析當兩點距離小於區域半徑則該點屬於該區域。所有我將兩表進行笛卡爾積(cartesian())的運算。
distance = distance0.cartesian(distance1)
3. 我們現在有了連接好的表,現在需要對其進行運算判斷,這里首先需要編寫兩點GPS坐標距離公式(具體函數不是重點暫不贅述詳見代碼)。
4. 難點來了,就是對RDD數據的操作、以及Python中,lambda的語法:
首先我們要明確一點,RDD數據支持兩種類型操作:轉化操作(transform)、行動操作(action)
我們首先需要對之前笛卡爾積后的RDD的數據帶入函數中運算,且在區域內的我將其標記為1,否之為0
而后希望每條數據(x0,y0,x,y,0 or 1)=>(region,1),而后進行wordcount即可統計出每個地區的汽車數量。
在轉化上面我們就要用到map()函數進行RDD數據轉換,通過Python中,lambda的語句即可實現,而后我們進行RDD數據的行動操作,這里我需要剔除所有的(region,0)數據,利用函數filter(lambda x:x[1]==1)。
現在基本上我們工作只需要最后的一步reduceByKey(lambda x,y:x+y),這個函數就是對每一個key值相同的元素進行一種求和。
counts = distance.map(lambda d0:(d0[1][0], include_number(get_distance_hav(d0[0],d0[1][1:3]),float(d0[1][3])))).filter(lambda x:x[1]==1).reduceByKey(lambda x,y:x+y
實驗結果
代碼
#!/usr/bin/env python # _*_ coding:utf-8 _*_ from pyspark import SparkContext from math import sin,asin,cos,radians,fabs,sqrt EARTH_RADIUS=6371#地球半徑 def hav(theta): s = sin(theta / 2) return s * s def get_distance_hav(x,y): "用haversine公式計算球面兩點間的距離。" # 經緯度轉換成弧度 lat0=float(x[0]) lng0=float(x[1]) lat1=float(y[0]) lng1=float(y[1]) lat0 = radians(lat0) lat1 = radians(lat1) lng0 = radians(lng0) lng1 = radians(lng1) dlng = fabs(lng0 - lng1) dlat = fabs(lat0 - lat1) h = hav(dlat) + cos(lat0) * cos(lat1) * hav(dlng) distance = 2 * EARTH_RADIUS * asin(sqrt(h)) return distance def include_number(distance,r): if distance<=r: return 1 else: return 0 sc = SparkContext('local', 'wordcount') # 讀取文件 input1 = sc.textFile("file:/Users/wangxingwu/Desktop/spark樣例程序/python-Word_Count/pycharm/Word_Count/taxi_gps.txt") input2 = sc.textFile("file:/Users/wangxingwu/Desktop/spark樣例程序/python-Word_Count/pycharm/Word_Count/district.txt") # 切分單詞 distance0= input1.map(lambda line: line.split(',')[4:6])#取第5,6列元素,汽車坐標 distance1= input2.map(lambda line: line.split(','))#取 distance = distance0.cartesian(distance1)#兩表進行笛卡爾積 # 轉換成鍵值對並計數 counts = distance.map(lambda d0:(d0[1][0], include_number(get_distance_hav(d0[0],d0[1][1:3]),float(d0[1][3])))).filter(lambda x:x[1]==1).reduceByKey(lambda x,y:x+y) counts.foreach(print)