大數據基礎---Spark車輛監控項目


一、項目簡介

這是一個車輛監控項目。主要實現了三個功能:

1.計算每一個區域車流量最多的前3條道路。

2.計算道路轉換率

3.實時統計道路擁堵情況(當前時間,卡口編號,車輛總數,速度總數,平均速度)

二、項目結構

├─TrafficBySparkAndKafka
├─data
└─src
├─main
│ ├─java
│ │ └─vip
│ │ ├─producedate2hive(模擬數據到文件和Hive)
│ │ ├─shuai7boy
│ │ │ └─trafficTemp
│ │ │ ├─areaRoadFlow(每個區域top3道路速度統計。道路轉換率。)
│ │ │ ├─conf (獲取配置文件幫助類)
│ │ │ ├─constant (接口靜態類,防止硬編碼)
│ │ │ ├─dao
│ │ │ │ ├─factory (工廠類)
│ │ │ │ └─impl (接口實現類)
│ │ │ ├─domain (屬性定義類)
│ │ │ ├─jdbc (jdbc幫助類)
│ │ │ ├─rtmroad(實時統計道路擁堵情況)
│ │ │ ├─skynet
│ │ │ └─util (幫助類)
│ │ └─spark
│ │ └─spark
│ │ └─test (模擬實時數據)
│ ├─resources
│ └─scala
│ └─top
│ └─shuai7boy
│ └─trafficTemp
│ └─areaRoadFlow (利用scala和java互調用,實現top3道路速度統計)

└─test
└─java

三、數據源

數據源類型:

monitor_flow_action(每個攝像頭的監控數據)

當天日期 卡口編號 攝像頭編號 車牌號 拍攝時間 車速 道路編號 區域編號

2020-05-08  0001   34287  京M80025    2020-05-08 05:35:58    57 25 03
2020-05-08 0005   99132  京M80025    2020-05-08 05:51:28    149    50 04

monitor_camera_info(卡口和攝像頭對應編號)

0006    00443
0006   25745
0006   98681
0006   36400

存儲介質:

如果在本地運行的話,這里讀取的是本地文件。

如果在集群運行,對於批處理讀取的是Hive,對於流處理這里讀取Kafka。

四、數據轉換流程

1.計算每一個區域車流量最多的前3條道路。

  • 從表traffic.monitor_flow_action根據日期獲取車流量監控日志信息。

    擋在集群中時,traffic.monitor_flow_action代表的是hive中的表,當在本地運行時,traffic.monitor_flow_action是本地創建的臨時表。

  • 從area_info表中獲取區域信息。

    area_info是MySql中的表。

  • 根據步驟二獲取的區域信息,補全監控日志名稱。根據join,map即可拼接一個新的RDD,並將RDD轉換為DataFrame的臨時表tmp_car_flow_basic。

  • 統計各個區域的道路車流量。

    使用Spark SQL根據區域名稱,道路ID進行分組。即可統計出每個區域,每條道路對應的車流量。

  • 統計每個區域top3車流量。

    利用開窗函數進行統計。row_number() over(partition by area_name order by road_id desc)

用到的技術:Hive,Spark SQL,臨時表,MySql,JDBC,join,map,RDD轉換DataFrame。

2.計算道路轉換率

  • 從MySql拿出我們要對比的轉換路段

  • 從日志拿出指定日期的監控數據

  • 將監控數據轉換為鍵值對(car,row)格式

  • 計算每個路段的匹配情況。

    邏輯:將第三步拿到的數據,根據car進行分組,映射鍵值對。將軌跡信息根據時間進行排序,然后拼接。

    將我們指定的路段(第一步獲取到的)和上面拼接的數據進行比對,得出匹配情況。(路段,匹配次數)

  • 因為上面求的是多輛車的 (路段,匹配次數)。這步使用reduceByKey進行聚合,將相同路段進行匯總。

  • 獲取轉化率。

    轉換率=(這次路段的匹配度)/(上次路段的匹配度)即可得到。、

    這次路段的匹配度=(聚合數據.get(路段))

用到的技術:mapToPair,groupByKey,flatMapToPair(進來一輛車,出去多個對應路段信息),reduceByKey。

3.實時統計道路擁堵情況(根據車輛和車速判斷)

  • 根據日志獲取(卡口ID,汽車速度)格式數據
  • 獲取(卡口ID,(汽車速度,1))格式數據,后面的1代表車輛數
  • 獲取(卡口ID,(汽車總速度,總車輛數))
  • 打印車輛(卡口,總速度,總車輛,平均速度)

用到的技術:map,mapToPair,mapValues(僅僅針對value進行map,(key,(value,1))格式數據),reduceByKeyAndWindow。

項目開源地址

系列傳送門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM