Flink/CEP/規則引擎/風控


 

 


 

 

基於 Apache Flink 和規則引擎的實時風控解決方案 ​

對一個互聯網產品來說,典型的風控場景包括:注冊風控、登陸風控、交易風控、活動風控等,而風控的最佳效果是防患於未然,所以事前事中和事后三種實現方案中,又以事前預警和事中控制最好。這要求風控系統一定要有實時性。本文就介紹一種實時風控解決方案。風控是業務場景的產物,風控系統直接服務於業務系統,與之相關的還有懲罰系統和分析系統,各系統關系與角色如下:

 

 

業務系統,通常是 APP + 后台 或者 web,是互聯網業務的載體,風險從業務系統觸發;

  • 風控系統,為業務系統提供支持,根據業務系統傳來的數據或埋點信息來判斷當前用戶或事件有無風險;
  • 懲罰系統,業務系統根據風控系統的結果來調用,對有風險的用戶或事件進行控制或懲罰,比如增加驗證碼、限制登陸、禁止下單等等;
  • 分析系統,該系統用以支持風控系統,根據數據來衡量風控系統的表現,比如某策略攔截率突然降低,那可能意味着該策略已經失效,又比如活動商品被搶完的時間突然變短,這表明總體活動策略可能有問題等等,該系統也應支持運營/分析人員發現新策略;

其中風控系統和分析系統是本文討論的重點,而為了方便討論,我們假設業務場景如下:

 

風控系統有規則和模型兩種技術路線,規則的優點是簡單直觀、可解釋性強、靈活,所以長期活躍在風控系統之中,但缺點是容易被攻破,一但被黑產猜中就會失效,於是在實際的風控系統中,往往需要再結合上基於模型的風控環節來增加健壯性。但限於篇幅,本文中我們只重點討論一種基於規則的風控系統架構,當然如果有模型風控的訴求,該架構也完全支持。規則就是針對事物的條件判斷,我們針對注冊、登陸、交易、活動分別假設幾條規則,比如:

  • 某 IP 最近 1 小時注冊賬號數超過 10 個;
  • 某賬號群體最近 1 小時購買優惠商品超過 100 件;

規則可以組合成規則組,為了簡單起見,我們這里只討論規則。

  • 事實,即被判斷的主體和屬性,如上面規則的賬號及登陸次數、IP 和注冊次數等;
  • 指標閾值,判斷的依據,比如登陸次數的臨界閾值,注冊賬號數的臨界閾值等;

規則可由運營專家憑經驗填寫,也可由數據分析師根據歷史數據發掘,但因為規則在與黑產的攻防之中會被猜中導致失效,所以無一例外都需要動態調整。

 

  • 實時風控數據流,由紅線標識,同步調用,為風控調用的核心鏈路;
  • 准實時指標數據流,由藍線標識,異步寫入,為實時風控部分准備指標數據;
  • 准實時/離線分析數據流,由綠線標識,異步寫入,為風控系統的表現分析提供數據;

實時風控是整個系統的核心,被業務系統同步調用,完成對應的風控判斷。前面提到規則往往由人編寫並且需要動態調整,所以我們會把風控判斷部分與規則管理部分拆開。規則管理后台為運營服務,由運營人員去進行相關操作:

  • 場景管理,決定某個場景是否實施風控,比如活動場景,在活動結束后可以關閉該場景;
  • 黑白名單,人工/程序找到系統的黑白名單,直接過濾;
  • 規則管理,管理規則,包括增刪或修改,比如登陸新增 IP 地址判斷,比如下單新增頻率校驗等;
  • 閾值管理,管理指標的閾值,比如規則為某 IP 最近 1 小時注冊賬號數不能超過 10 個,那 1 和 10 都屬於閾值;

講完管理后台,那規則判斷部分的邏輯也就十分清晰了,分別包括前置過濾、事實數據准備、規則判斷三個環節。

2.1.1 前置過濾

 

業務系統在特定事件(如注冊、登陸、下單、參加活動等)被觸發后同步調用風控系統,附帶相關上下文,比如 IP 地址,事件標識等,規則判斷部分會根據管理后台的配置決定是否進行判斷,如果是,接着進行黑白名單過濾,都通過后進入下一個環節。

2.1.2 實時數據准備

 

在進行判斷之前,系統必須要准備一些事實數據,比如:

  • 注冊場景,假如規則為單一 IP 最近 1 小時注冊賬號數不超過 10 個,那系統需要根據 IP 地址去 Redis/Hbase 找到該 IP 最近 1 小時注冊賬號的數目,比如 15;
  • 登陸場景,假如規則為單一賬號最近 3 分鍾登陸次數不超過 5 次,那系統需要根據賬號去 Redis/Hbase 找到該賬號最近 3 分鍾登陸的次數,比如 8;

Redis/Hbase 的數據產出我們會在第 2.2 節准實時數據流中進行介紹。

2.2.3 規則判斷

 

在得到事實數據之后,系統會根據規則和閾值進行判斷,然后返回結果,整個過程便結束了。整個過程邏輯上是清晰的,我們常說的規則引擎主要在這部分起作用,一般來說這個過程有兩種實現方式:

  • 借助成熟的規則引擎,比如 Drools,Drools 和 Java 環境結合的非常好,本身也非常完善,支持很多特性,不過使用比較繁瑣,有較高門檻,可參考文章【1】;
  • 基於 Groovy 等動態語言自己完成,這里不做贅述。可參考文章【2】;

這部分屬於后台邏輯,為風控系統服務,准備事實數據。把數據准備與邏輯判斷拆分,是出於系統的性能/可擴展性的角度考慮的。前邊提到,做規則判斷需要事實的相關指標,比如最近一小時登陸次數,最近一小時注冊賬號數等等,這些指標通常有一段時間跨度,是某種狀態或聚合,很難在實時風控過程中根據原始數據進行計算,因為風控的規則引擎往往是無狀態的,不會記錄前面的結果。同時,這部分原始數據量很大,因為用戶活動的原始數據都要傳過來進行計算,所以這部分往往由一個流式大數據系統來完成。在這里我們選擇 Flink,Flink 是當今流計算領域無可爭議的 No.1,不管是性能還是功能,都能很好的完成這部分工作。

  • 業務系統把埋點數據發送到 Kafka;

  • Flink 訂閱 Kafka,完成原子粒度的聚合;

注:Flink 僅完成原子粒度的聚合是和規則的動態變更邏輯相關的。舉例來說,在注冊場景中,運營同學會根據效果一會要判斷某 IP 最近 1 小時的注冊賬號數,一會要判斷最近 3 小時的注冊賬號數,一會又要判斷最近 5 小時的注冊賬號數……也就是說這個最近 N 小時的 N 是動態調整的。那 Flink 在計算時只應該計算 1 小時的賬號數,在判斷過程中根據規則來讀取最近 3 個 1 小時還是 5 個 1 小時,然后聚合后進行判斷。因為在 Flink 的運行機制中,作業提交后會持續運行,如果調整邏輯需要停止作業,修改代碼,然后重啟,相當麻煩;同時因為 Flink 中間狀態的問題,重啟還面臨着中間狀態能否復用的問題。所以假如直接由 Flink 完成 N 小時的聚合的話,每次 N 的變動都需要重復上面的操作,有時還需要追數據,非常繁瑣。

  • Flink 把匯總的指標結果寫入 Redis 或 Hbase,供實時風控系統查詢。兩者問題都不大,根據場景選擇即可。

通過把數據計算和邏輯判斷拆分開來並引入 Flink,我們的風控系統可以應對極大的用戶規模。前面的東西靜態來看是一個完整的風控系統,但動態來看就有缺失了,這種缺失不體現在功能性上,而是體現在演進上。即如果從動態的角度來看一個風控系統的話,我們至少還需要兩部分,一是衡量系統的整體效果,一是為系統提供規則/邏輯升級的依據。

  • 判斷規則是否多余,比如某規則從來沒攔截過任何事件;
  • 判斷規則是否有漏洞,比如在舉辦某個促銷活動或發放代金券后,福利被領完了,但沒有達到預期效果;
  • 發現全局規則,比如某人在電子產品的花費突然增長了 100 倍,單獨來看是有問題的,但整體來看,可能很多人都出現了這個現象,原來是蘋果發新品了……
  • 識別某種行為的組合,單次行為是正常的,但組合是異常的,比如用戶買菜刀是正常的,買車票是正常的,買繩子也是正常的,去加油站加油也是正常的,但短時間內同時做這些事情就不是正常的。
  • 群體識別,比如通過圖分析技術,發現某個群體,然后給給這個群體的所有賬號都打上群體標簽,防止出現那種每個賬號表現都正常,但整個群體卻在集中薅羊毛的情況。

這便是分析系統的角色定位,在他的工作中有部分是確定性的,也有部分是探索性的,為了完成這種工作,該系統需要盡可能多的數據支持,如:

  • 業務系統的數據,業務的埋點數據,記錄詳細的用戶、交易或活動數據;
  • 風控攔截數據,風控系統的埋點數據,比如某個用戶在具有某些特征的狀態下因為某條規則而被攔截,這條攔截本身就是一個事件數據;

這是一個典型的大數據分析場景,架構也比較靈活,我僅僅給出一種建議的方式。

 

相對來說這個系統是最開放的,既有固定的指標分析,也可以使用機器學習/數據分析技術發現更多新的規則或模式,限於篇幅,這里就不詳細展開了。http://archive.keyllo.com/L-編程/drools-從Drools規則引擎到風控反洗錢系統v0.3.2.pdfhttps://www.jianshu.com/p/d6f45f91bedehttps://jinfei21.github.io/2018/09/29/基於規則的風控系統/https://sq.163yun.com/blog/article/183314611296591872https://sq.163yun.com/blog/article/213006222321659904https://github.com/sunpeak/riskcontrol


 

 

Apache Flink 及大數據領域盛會 Flink Forward Asia 2019 將於 11月28-30日在北京國家會議中心舉辦,大會議程已上線,點擊閱讀原文可了解大會議程詳情。

 

點擊圖片可查看 Flink Forward Asia 2019 詳情

 


 

 

 

從滴滴的Flink CEP引擎說起

CEP業務場景

復雜事件處理(Complex Event Process,簡稱CEP)用來檢測無盡數據流中的復雜模 式,擁有從不同的數據行中辨識查找模式的能力。模式匹配是復雜事件處理的一個強 大援助。 例子包括受一系列事件驅動的各種業務流程,例如在安全應用中偵測異常行為;在金 融應用中查找價格、交易量和其他行為的模式。其他常見的用途如欺詐檢測應用和傳 感器數據的分析等。

說了這么多可能還是覺得比較抽象,那么我們可以看看這次滴滴分享的FlinkCEP在滴滴中的業務場景。

 

吐槽時刻:

雖然,業務場景ppt寫的很好,但是最近幾次順風車事故,給大家留下了糟糕的印象。大數據沒用起來,cep其實應該也可以用在順風車安全檢測上吧。

Flink CEP

Flink的CEP是基於Flink Runtime構建的實時數據規則引擎,擅長解決跨事件的匹配問題。
可以看看,滴滴的屁屁踢上給出的兩個demo

Flink CEP的特點

 

動態規則

其實,對於實時領域的規則引擎,我們不想每次修改都要打包編碼,只希望簡單修改一下規則就讓它能執行。

當然,最好規則是sql 的形式,運營人員直接參與規則編寫而不是頻繁提需求,很麻煩。。。。此處,省略萬字。。

要知道flink CEP官網給出的API也還是很豐富的,雖然滴滴這比也給出了他們完善的內容。

 

flink官方的CEP文章,浪尖及浪尖組織的flink小團隊,已經翻譯過了。鏈接如下:

https://github.com/crestofwave1/oneFlink/blob/master/doc/CEP/FlinkCEPOfficeWeb.md

那么,為了實現動態規則編寫,滴滴的架構如下:

具體的規則實現如下:

可以看到,其規則還是要編碼成java代碼,然后再用groovy動態編譯解析,不知道效率如何。。。

對於規則引擎,當然很多人想到的是drools,這個跟flink結合也很簡單,但是效率不怎么苟同。

Flink CEP的SQL實現

熟悉flink的小伙伴肯定都知道Flink的SQL引擎是基於Calcite來實現的。那么細心的小伙伴,在calcite官網可以發現,calcite有個關鍵字MATCH_RECOGNIZE。可以在這個網頁搜索,找到MATCH_RECOGNIZE關鍵字使用。

http://calcite.apache.org/docs/reference.html

那么這時候可能會興沖沖寫個demo。

final String sql = "select frequency,word,timestamp1 "
    + "  from wc match_recognize "
    + "  ("
    + "       order by  timestamp1 "
    + "       measures A.timestamp1 as timestamp1  ,"
    + "       A.word as  word ,"
    + "       A.frequency as  frequency "
    + "       ONE ROW PER MATCH "
    + "    pattern (A B) "
    + "    within interval '5' second "
    + "    define "
    + "      A AS A.word = 'bob' , "
    + "      B AS B.word = 'kaka' "
    + "  ) mr";

很掃興的它報錯了:

 

那么問題來了,calcite支持而flink不支持,為啥?

趕緊發了個issue,然后迅速得到官方回復:

 

 

 

但是,翻翻阿里的blink使用手冊和華為的flink使用手冊發現兩者都支持。

好吧。其實,很不服氣,周末,除了健身就是加班

波折一番,解決了,需要修改flink-table相關的內容,執行計划,coden等。

最終,實現了。

 


 

滴滴是如何從零構建集中式實時計算平台的?

 

閱讀數:35242019 年 2 月 7 日 09:00

 

滴滴出行作為一家出行領域的互聯網公司,其核心業務是一個實時在線服務。因此具有豐富的實時數據和實時計算場景。本文將介紹滴滴實時計算發展之路以及平台架構實踐。

01 實時計算演進

隨着滴滴業務的發展,滴滴的實時計算架構也在快速演變。到目前為止大概經歷了三個階段,第一階段是業務方自建小集群;第二階段是集中式大集群、平台化;第三階段是 SQL 化。圖 1 標識了其中重要的里程碑,下面給出詳細闡述。

滴滴是如何從零構建集中式實時計算平台的?

在 2017 年以前滴滴並有沒有統一的實時計算平台,而是各個業務方自建小集群。其中用到的引擎有 Storm、JStorm、Spark Streaming、Samza 等。業務方自建小集群模式存在如下弊端:

(1)需要預先采購大量機器,由於單個業務獨占,資源利用率通常比較低;

(2)缺乏有效的監控報警體系;

(3)維護難度大,需要牽涉業務方大量精力來保障集群的穩定性;

(4)缺乏有效技術支持,且各自沉淀的東西難以共享。

為了有效解決以上問題,滴滴從 2017 年年初開始構建統一的實時計算集群及平台。技術選型上,我們基於滴滴現狀選擇了內部用以大規模數據清洗的 Spark Streaming 引擎,同時引入 On-YARN 模式。利用 YARN 的多租戶體系構建了認證、鑒權、資源隔離、計費等機制。相對於離線計算,實時計算任務對於穩定性有着更高的要求,為此我們構建了兩層資源隔離體系。

第一層是基於 CGroup 做進程(Container)級別的 CPU 及內存隔離。第二層是物理機器級別的隔離。我們通過改造 YARN 的 FairScheduler 使其支持 Node Label。達到的效果如圖 2 所示:普通業務的任務混跑在同一個 Label 機器上,而特殊業務的任務跑在專用 Label 的機器上。

滴滴是如何從零構建集中式實時計算平台的?

通過集中式大集群和平台化建設,基本消除了業務方自建小集群帶來的弊端,實時計算也進入了第二階段。伴隨着業務的發展,我們發現 Spark Streaming 的 Micro Batch 模式在一些低延時的報警業務及在線業務上顯得捉襟見肘。於是我們引入了基於 Native Streaming 模式的 Flink 作為新一代實時計算引擎。Flink 不僅延時可以做到毫秒級,而且提供了基於 Process Time/Event Time 豐富的窗口函數。基於 Flink 我們聯合業務方構架了滴滴流量最大的業務網關監控系統,並快速支持了諸如乘客位置變化通知、軌跡異常檢測等多個線上業務。

02 實時計算平台架構

為了最大程度方便業務方開發和管理流計算任務,我們構建了如圖 3 所示的實時計算平台。在流計算引擎基礎上提供了 StreamSQL IDE、監控報警、診斷體系、血緣關系、任務管控等能力。以下分別介紹各自的作用:

(1)StreamSQL IDE。下文會介紹,是一個 Web 化的 SQL IDE;

(2)監控報警。提供任務級的存活、延時、流量等監控以及基於監控的報警能力;

(3)診斷體系。包括流量曲線、Checkpoint、GC、資源使用等曲線視圖,以及實時日志檢索能力。

(4)血緣關系。我們在流計算引擎中內置了血緣上報能力,進而在平台上呈現流任務與上下游的血緣關系;

(5)任務管控。實現了多租戶體系下任務提交、啟停、資產管理等能力。通過 Web 化任務提交消除了傳統客戶機模式,使得平台入口完全可控,內置參數及版本優化得以快速上線。

滴滴是如何從零構建集中式實時計算平台的?

03 實時規則匹配服務建設

在滴滴內部有大量的實時運營場景,比如“某城市乘客冒泡后 10 秒沒有下單”。針對這類檢測事件之間依賴關系的場景,用 Fink 的 CEP 是非常合適的。但是社區版本的 CEP 不支持描述語言,每個規則需要開發一個應用,同時不支持動態更新規則。為了解決這些問題,滴滴做了大量功能擴展及優化工作。功能擴展方面主要改動有:

(1)支持 wait 算子。對於剛才例子中的運營規則,社區版本是表達不了的。滴滴通過增加 wait 算子,實現了這類需求;

(2)支持 DSL 語言。基於 Groovy 和 Aviator 解析引擎,我們實現了如圖 4 所示的 DSL 描述規則能力。

滴滴是如何從零構建集中式實時計算平台的?

(3)單任務多規則及規則動態更新。由於實時運營規則由一線運營同學來配置,所以規則數量,規則內容及規則生命周期會經常發生變化。這種情況每個規則一個應用是不太現實的。為此我們開發了多規則模式且支持了動態更新。

除了功能拓展之外,為了應對大規模運營規則的挑戰,滴滴在 CEP 性能上也做了大量優化,主要有:

(1)SharedBuffer 重構。基於 Flink MapState 重構 SharedBuffer,減少每次數據處理過程中的狀態交互。同時剝離規則和用戶數據極大降低每次匹配的時候從狀態中反序列化的數據量;

(2)增加訪問緩存(已貢獻社區)。緩存 SharedBuffer 數據中每次處理所需要更新的引用計數,延緩更新;

(3)簡化 event time 語義處理。避免 key 在很分散情況下每次 watermark 更新時要遍歷所有 key 的數據;

(4)復用 conditionContext(已貢獻社區)。減少條件查詢時對 partialMatch 元素的反復查詢。

以上優化將 CEP 性能提升了多個數量級。配合功能擴展,我們在滴滴內部提供了如圖 5 所示的服務模式。業務方只需要清洗數據並提供規則列表 API 即可具備負責規則的實時匹配能力。

滴滴是如何從零構建集中式實時計算平台的?

目前滴滴 CEP 已經在快車個性化運營、實時異常工單檢測等業務上落地,取得了良好的效果。

04 StreamSQL 建設

正如離線計算中 Hive 之於 MapReduce 一樣,流式 SQL 也是必然的發展趨勢。通過 SQL 化可以大幅度降低業務方開發流計算的難度,業務方不再需要學習 Java/Scala,也不需要理解引擎執行細節及各類參數調優。為此我們在 2018 年啟動了 StreamSQL 建設項目。我們在社區 Flink SQL 基礎上拓展了以下能力:

(1)擴展 DDL 語法。如圖 6 所示,打通了滴滴內部主流的消息隊列以及實時存儲系統。通過內置常見消息格式(如 json、binlog、標准日志)的解析能力,使得用戶可以輕松寫出 DDL 語法,並避免重復寫格式解析語句。

滴滴是如何從零構建集中式實時計算平台的?

(2)拓展 UDF。針對滴滴內部常見處理邏輯,內置了大量 UDF,包括字符串處理、日期處理、Map 對象處理、空間位置處理等。

(3)支持分流語法。單個輸入源多個輸出流在滴滴內部非常常見,為此我們改造了 Calcite 使其支持分流語義。

(4)支持基於 TTL 的 join 語義。傳統的 Window Join 因為存在 window 邊界數據突變情況,不能滿足滴滴內部的需求。為此我們引入了 TTL State,並基於此開發了基於 TTL Join 的雙流 join 以及維表 join。

(5)StreamSQL IDE。前文提到平台化之后我們沒有提供客戶機,而是通過 Web 提交和管控任務。因此我們也相應開發了 StreamSQL IDE,實現 Web 上開發 StreamSQL,同時提供了語法檢測、DEBUG、診斷等能力。

目前 StreamSQL 在滴滴已經成功落地,流計算開發成本得到大幅度降低。預期未來將承擔 80% 的流計算業務量。

05 總結

作為一家出行領域的互聯網公司,滴滴對實時計算有天然的需求。過去的一年多時間里,我們從零構建了集中式實時計算平台,改變了業務方自建小集群的局面。為滿足低延時業務的需求,成功落地了 Flink Streaming,並基於 Flink 構建了實時規則匹配(CEP)服務以及 StreamSQL,使得流計算開發能力大幅度降低。未來將進一步拓展 StreamSQL,並在批流統一、IoT、實時機器學習等領域探索和建設。

本文轉載自公眾號“滴滴技術“。

 

 


 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM