【譯】使用Kafka、DynamoDB和Rockset設計實時ETA預測系統


原文:https://dzone.com/articles/designing-a-real-time-eta-prediction-system-using


 

總有些奇怪的時刻,城市處於封鎖狀態,很少人敢於走出去冒險。此時像外賣這樣的物流服務興起就不足為怪了。

當用戶下單后,大多數的此類應用都提供了准實時的訂單送達時間(ETA)追蹤功能。構建一個高伸縮性、分布式的實時ETA預測系統不是一件容易的事情,但如果我們能夠簡化這類系統的設計,是否情況會變得好一些呢?本文我們將對這類系統進行分解,將其拆解成幾部分使得每一部分對應於一個獨立的任務。

現在,我們看下構成這類系統的幾大組件。

  1. 發送騎手app:安裝到外賣員手機上的Android/iOS應用
  2. 用戶app:安裝在用戶手機上的Android/iOS應用
  3. Rockset:支持任意模型和服務的查詢引擎
  4. 消息隊列:用於組件之間的數據傳輸。本文使用Kafka
  5. KV存儲:用於保存訂單和模型參數。本文使用DynamoDB

模型輸入

騎手位置

為了獲得准確的ETA估計,你要獲取到外賣人員的位置,尤其是緯度和經度。這可以通過手機中的GPS輕松獲取。調用設備GPS服務能夠返回經緯度以及米級別的精確定位信息。你能夠在應用中啟動一個后台服務每10秒獲取一次GPS坐標。這些坐標間隔太密了以至於無法用它們來預測ETA。為了增加GPS的精度,我們使用geohash算法來增加GPS精度。geohash是一個地址信息的標准化N字符哈希值。每個geohash表示了M平方英里的面積。N和M成反比例,所以N越大,M越小。若要了解geohash,請戳這里

目前有很超多的庫都支持將經緯度轉換成geohash。這里我們使用一個github第三方庫來獲取geohash。待拿到geohash之后我們將它連同坐標信息寫入到Kafka。Rockset會從Kafka讀取對應的數據,並更新到一個名為locations的集合中。

訂單

用戶下的訂單存儲在DynamoDB中用於后續處理。一個訂單通常有很多狀態。所有狀態都在DynamoDB中更新,連同保存一些額外的數據,比如源地址、目標地址、訂單詳情等。一旦訂單被送達,到達的實際時間也要被保存在數據庫中。Rockset也需要從DynamoD的訂單表中讀取這些更新數據,然后將它們更新到名為orders的集合中。

ML模型

指數平滑法

我們從訂單表中能夠獲取到訂單實際到達時間以及源地址和目標地址。我們將其稱為TA。這樣,我們可以將外賣員的最新位置作為源地址,用戶地址作為目標地址,然后對所有這些TA統計平均值來計算大致的ETA。但是,這樣做不一定很精確,因為它並未考慮到哪些變化的因素,比如該地區存在的新施工情況或者是到目標地址存在更短的新路線。

為此,我們需要一個簡單,易於調試且具有良好准確性的預測模型。這就是引入指數平滑法的原因。一個指數平滑值的計算公式如下:

St = Alpha * Xt + (1 - Alpha) * St-1

其中,

St表示時刻t的平滑值

Xt表示時刻t的實際值

Alpha表示平滑因子

在我們的環境中,St表示ETA,Xt表示訂單表源-目標地址對的最近實際到達時間。那么,

ETAt = Alpha * TAt + (1 - Alpha) * ETAt-1

Rockset

當前系統的服務層需要滿足三個主要標准:

  1. 每分鍾可處理數百萬次寫入的能力。每個外賣人員手機上的應用程序將會以每5-10秒發送一次GPS坐標的速率進行坐標推送,從而每次都會產生一個新的ETA。一個大規模食品外送企業可能有將近10萬個外送人員。
  2. 這些數據獲取的延時要最低。如果要有非常棒的用戶交互體驗,我們必須要能夠在這些數據變更后及時地反映在用戶應用。
  3. 能夠實時處理schema的變更。未來我們可能需要保存額外的元數據,比如ETA預測准確度和模型版本等。我們可不想到時候再創建一個新的數據源只是為了應對增加新字段之用。

Rockset滿足以上所有的這些要求。它具有:

  • 動態伸縮能力:通過增加資源來實現處理更大容量數據的能力
  • 分布式查詢處理:多個計算節點並行查詢以最小化延時
  • 無schema抽取:實時支持schema的變更

Rockset提供了內置的Kafka連接器。我們使用這個連接器來提取外送人員的位置數據。

若要在Rockset中實現指數平滑法,我們創建兩個查詢Lambda。Rockset中的查詢Lambda是具有名字的泛型化SQL,能夠從一個專屬的REST endpoint中被執行。
1. calculate_ETA:這個查詢Labmda接收alpha,源地址和目標地址作為參數,返回一個指數平滑后的ETA。它運行的結果如下:

SELECT
   (:alpha * SUM(term)) + (POW((1 - :alpha), MAX(idx))* MIN_BY(ta_i, time_i)) as ans
FROM
  (
      (
          SELECT
              order_id,
              ta_i,
              (ta_i * POW((1 - :alpha), (idx - 1))) AS term,
              time_i,
              idx
          FROM
              (
                  SELECT
                      order_id,
                      CAST(ta AS int) as ta_i,
                      time_i,
                      ROW_NUMBER() OVER(
                          ORDER BY
                              time_i DESC, order_id ASC
                      ) AS idx
                  FROM
                      commons.orders_fixed
                  WHERE
                      source_geohash = :source
                      AND
                      destination_geohash = :destination
                  ORDER BY
                      time_i DESC, order_id ASC
              ) AS idx
      ) AS terms
    )

2. calculate_speed:這個查詢Lambda接收order_id作為參數,然后返回外送人員的平均運輸速度。它執行以下的查詢:

SELECT Sum(St_distance(prev_geo, geo) / ( ts - prev_ts )) / Count(*) AS speed 
FROM   (SELECT geo, 
               Lead(geo, 1) 
                 OVER( 
                   ORDER BY ts DESC ) AS prev_geo, 
               ts, 
               Lead(ts, 1) 
                 OVER( 
                   ORDER BY ts DESC ) AS prev_ts 
        FROM   (SELECT St_geogpoint(Cast(lng AS DOUBLE), Cast(lat AS DOUBLE)) AS 
                       geo, 
                       order_id, 
                       Cast(timestamp AS INT)                                 AS 
                       ts 
                FROM   commons.locations 
                WHERE  order_id = :order_id) AS ts) AS speed  

預測ETA

用戶應用程序發起請求來預測ETA。它需要在API調用時傳入訂單號。這個請求被發送到查詢服務那里。之后查詢服務執行以下動作:

  1. 從DynamoDB獲取最新的平滑因子Alpha和Beta。這里,Alpha是平滑參數,Beta是在計算最終ETA時分配給歷史ETA的權重。有關更多詳細信息,請參閱步驟6
  2. 獲取訂單ID的目標geohash。
  3. 從locations集合中獲取騎手當前的geohash
  4. 使用前兩步獲取到的平滑因子alpha和騎手geohash數據觸發calculate_ETA查詢Lambda。我們把這個ETA稱為歷史ETA。
curl --request POST \
--url https://api.rs2.usw2.rockset.com/v1/orgs/self/ws/commons/lambdas/calculateETA/versions/f7d73fb5a786076c \
-H 'Authorization: YOUR ROCKSET API KEY' \
-H 'Content-Type: application/json' \
-d '{
  "parameters": [
    {
      "name": "alpha",
      "type": "float",
      "value": "0.7"
    },
    {
      "name": "destination",
      "type": "string",
      "value": "tdr38d"
    },
    {
      "name": "source",
      "type": "string",
      "value": "tdr706"
    }
  ]
  }'

  5. 使用當前訂單號觸發calulate_speed

curl --request POST \
--url https://api.rs2.usw2.rockset.com/v1/orgs/self/ws/commons/lambdas/calculate_speed/versions/cadaf89cba111c06 \
-H 'Authorization: YOUR ROCKSET API KEY' \
-H 'Content-Type: application/json' \
  -d '{
    "parameters": [
      {
        "name": "order_id",
        "type": "string",
        "value": "abc"
      }
    ]
  }

  6. 最后,計算待預測的ETA,公式如下:

預測ETA = Beta * (歷史ETA) + (1 - Beta) * 距離(騎手, 目的地)/速度

之后,將計算得出的ETA返回給用戶手機的應用程序上。

反饋回路

ML模型需要不斷訓練以提升預測准確性。 在我們的場景中,我們有必要重新訓練ML模型,以應對不斷變化的天氣情況,節日等問題。這是參數調整服務發揮作用的地方。

參數調整服務

一旦計算出預測ETA,我們要把它與實際ETA放入到一個名為predictions的集合中。要在Rockset中保存這部分數據的主要目的是創建一個實時儀表盤能夠監控模型准確度。這是確保用戶在應用程序上不會看到荒謬離譜ETA的關鍵保證。
這些做完之后,下一個問題是如何確定平滑因子Alpha。為此,我們創建了一個參數調優服務,它就是一個Flink批處理任務。我們獲取過去7~30天所有訂單的所有歷史ETA和TA數據,並利用它們的差值來計算Alpha和Beta。這可以使用諸如邏輯回歸的模型來完成。

一旦計算出Alpha和Beta,它們就會存儲在DynamoDB中名為smoothing_parameters的表中。查詢服務從消費者應用程序接收請求時,將從該表中獲取參數。
我們可以使用locations集合中的ETA數據每周訓練一次參數調優模型。

總結

本文的架構旨在設計一個每分鍾能夠處理1百萬請求切具有足夠靈活性以支持動態擴展的應用程序。這個架構同時允許開發人員切換或插入組件,比如添加新的特性(天氣因素)或添加一個過濾層來優化ETA的預測。這里,Rockset可以幫助我們解決三個首要的需求:

  1. 低延時復雜查詢:Rockset允許我們僅僅使用API調用的方式執行復雜查詢,比如指數平滑。這是通過查詢Lambda來完成的。Lambda同時還支持參數以允許我們查詢不同位置的參數。
  2. 實時、高伸縮性的數據抽取:如果平台上注冊有10萬個騎手且每個騎手手機上的應用程序每5秒發送一次GPS位置,那么每分鍾就需要處理120萬個請求。Rockset可以讓我們在事件發生的幾秒鍾內查詢到此數據。
  3. 多數據源:Rockset允許我們使用需要最少配置的完全托管的連接器從多個來源(例如Kafka和DynamoDB)提取數據。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM