一、項目簡介
這是一個車輛監控項目。主要實現了三個功能:
1.計算每一個區域車流量最多的前3條道路。
2.計算道路轉換率
3.實時統計道路擁堵情況(當前時間,卡口編號,車輛總數,速度總數,平均速度)
二、項目結構
├─TrafficBySparkAndKafka
├─data
└─src
├─main
│ ├─java
│ │ └─vip
│ │ ├─producedate2hive(模擬數據到文件和Hive)
│ │ ├─shuai7boy
│ │ │ └─trafficTemp
│ │ │ ├─areaRoadFlow(每個區域top3道路速度統計。道路轉換率。)
│ │ │ ├─conf (獲取配置文件幫助類)
│ │ │ ├─constant (接口靜態類,防止硬編碼)
│ │ │ ├─dao
│ │ │ │ ├─factory (工廠類)
│ │ │ │ └─impl (接口實現類)
│ │ │ ├─domain (屬性定義類)
│ │ │ ├─jdbc (jdbc幫助類)
│ │ │ ├─rtmroad(實時統計道路擁堵情況)
│ │ │ ├─skynet
│ │ │ └─util (幫助類)
│ │ └─spark
│ │ └─spark
│ │ └─test (模擬實時數據)
│ ├─resources
│ └─scala
│ └─top
│ └─shuai7boy
│ └─trafficTemp
│ └─areaRoadFlow (利用scala和java互調用,實現top3道路速度統計)
│
└─test
└─java
三、數據源
數據源類型:
monitor_flow_action(每個攝像頭的監控數據)
當天日期 卡口編號 攝像頭編號 車牌號 拍攝時間 車速 道路編號 區域編號
2020-05-08 0001 34287 京M80025 2020-05-08 05:35:58 57 25 03
2020-05-08 0005 99132 京M80025 2020-05-08 05:51:28 149 50 04
monitor_camera_info(卡口和攝像頭對應編號)
0006 00443
0006 25745
0006 98681
0006 36400
存儲介質:
如果在本地運行的話,這里讀取的是本地文件。
如果在集群運行,對於批處理讀取的是Hive,對於流處理這里讀取Kafka。
四、數據轉換流程
1.計算每一個區域車流量最多的前3條道路。
-
從表traffic.monitor_flow_action根據日期獲取車流量監控日志信息。
擋在集群中時,traffic.monitor_flow_action代表的是hive中的表,當在本地運行時,traffic.monitor_flow_action是本地創建的臨時表。
-
從area_info表中獲取區域信息。
area_info是MySql中的表。
-
根據步驟二獲取的區域信息,補全監控日志名稱。根據join,map即可拼接一個新的RDD,並將RDD轉換為DataFrame的臨時表tmp_car_flow_basic。
-
統計各個區域的道路車流量。
使用Spark SQL根據區域名稱,道路ID進行分組。即可統計出每個區域,每條道路對應的車流量。
-
統計每個區域top3車流量。
利用開窗函數進行統計。row_number() over(partition by area_name order by road_id desc)
用到的技術:Hive,Spark SQL,臨時表,MySql,JDBC,join,map,RDD轉換DataFrame。
2.計算道路轉換率
-
從MySql拿出我們要對比的轉換路段
-
從日志拿出指定日期的監控數據
-
將監控數據轉換為鍵值對(car,row)格式
-
計算每個路段的匹配情況。
邏輯:將第三步拿到的數據,根據car進行分組,映射鍵值對。將軌跡信息根據時間進行排序,然后拼接。
將我們指定的路段(第一步獲取到的)和上面拼接的數據進行比對,得出匹配情況。(路段,匹配次數)
-
因為上面求的是多輛車的 (路段,匹配次數)。這步使用reduceByKey進行聚合,將相同路段進行匯總。
-
獲取轉化率。
轉換率=(這次路段的匹配度)/(上次路段的匹配度)即可得到。、
這次路段的匹配度=(聚合數據.get(路段))
用到的技術:mapToPair,groupByKey,flatMapToPair(進來一輛車,出去多個對應路段信息),reduceByKey。
3.實時統計道路擁堵情況(根據車輛和車速判斷)
- 根據日志獲取(卡口ID,汽車速度)格式數據
- 獲取(卡口ID,(汽車速度,1))格式數據,后面的1代表車輛數
- 獲取(卡口ID,(汽車總速度,總車輛數))
- 打印車輛(卡口,總速度,總車輛,平均速度)
用到的技術:map,mapToPair,mapValues(僅僅針對value進行map,(key,(value,1))格式數據),reduceByKeyAndWindow。