如何利用Flink實現超大規模用戶行為分析
各位晚上好,首先感謝大家參與我的這次主題分享,同時也感謝 InfoQ AI 前線組織這次瀚思科技主題月!
瀚思科技成立於 2014 年,按行業划分我們是一家安全公司。但和大家熟知的賣殺毒軟件或者防火牆的傳統安全公司不同。瀚思科技幫助各種中大型企業搭建安全大數據的分析平台,平台上應用的安全分析策略深度結合了多種機器學習算法,最終幫助企業定位與揭示各種安全問題。所以我們自己定位是一家安全 + 大數據 +AI 的公司。
言歸正傳,今天的分享主題是:基於 Flink 流處理的動態實時大規模用戶行為分析
今天的分享主要包括四大部分:
1)網絡安全中的用戶行為分析(簡稱 UBA);
2)實時超大規模用戶行為分析的技術挑戰 ;
3)Drools 規則引擎在 CEP 中的應用 ;
4)Flink 原生 CEP 組件。
首先,我們先明確一個概念,什么是網絡安全中的用戶行為分析?簡而言之,用戶行為分析是通過分析用戶數據(例如交易數據,用戶登錄數據),找出異常行為以檢測外部及內部人士的攻擊活動。舉例來說,外部攻擊通常是由外部黑客通過破解 VPN 密碼並奪取員工帳戶的方式實現。而內部攻擊則往往表現為心存不滿的或者即將離職的員工對敏感信息的竊取。從右圖可以看出,我們需要分析的源數據通常表現為多種數據類型,例如服務器數據、網絡數據、數據庫數據、應用程序數據、安全數據等。這些數據被送入用戶行為分析系統,輸出用戶正常行為和異常行為(內 / 外部攻擊)。
傳統的用戶行為分析系統通常以離線批處理模式根據既定規則對這些數據進行分析。比如每天定時跑前一天的數據。這種方式適合對實時性要求不高甚至沒有要求的業務場景。而面對強調實時性的場景,例如反欺詐場景,需要對實時交易數據進行分析,及時應對不正常的交易。在這種情況下,離線的方式顯然不適用。我們必須利用在線 / 流式處理框架,並添加必要的機器學習算法,對實時數據進行 7/24 監控,以區分威脅行為與正常行為。
然而大規模用戶行為分析並沒有想象中那么簡單,尤其是對實時性要求高的場景下。實際應用中,會存在很多細節問題以及技術挑戰。我們可以看到這里列舉出了四點主要的技術挑戰。第一點,輸入信息規模過大。舉例來說:在一個有 10k+ 員工的大型企業部署用戶行為分析系統時,往往需要對這 10k+ 用戶在十余個維度上做分析。面對這樣體量的數據。分析性能往往受到巨大的挑戰。第二點也就是我們先前談到的實際需求往往需要以實時方式檢測攻擊活動。第三點講的是檢測邏輯往往不是單一的,實際應用中需要將黑名單、業務邏輯規則以及機器學習算法加以結合。一旦規則數量過大,單個規則邏輯復雜時,整個系統的吞吐量就會受到挑戰。第四點講的是規則不是靜態的,而是需要被實時更新的,也就是規則的動態部署。
為了應對以上問題,我們通過技術調研選擇了 Flink 作為底層的流處理框架。主要出於以下幾點原因:
第一,Flink 是一個純流式系統,吞吐量實際測試可達 100K EPS。而不像某些框架是用 mini batch 的模式來達到所謂的流式處理的;
第二,面對不同的用戶數據格式,我們必須支持多種數據源,這一點上 Flink 內置的對多種數據源的支持(CSV,Kafka,Hbase,Text,Socket 數據等)也為用戶數據的接入提供了便利;
第三,Flink 強大的窗口機制(包括翻轉窗口,滑動窗口,session 窗口,全窗口以及允許用戶自定義窗口)可以滿足復雜的業務邏輯,使得用戶可以編寫復雜的業務規則;
第四,Flink 內置的 RocksDB 數據存儲格式使其數據處速度快且資源消耗少,在 Checkpoint 上起到了至關重要的作用;
第五,Flink 對算子(operator)的高可控性,使得用戶可以靈活添加刪除或更改算子行為。這一點對於動態部署有着至關重要的意義。
下面我們來看一下整個系統的架構。上圖中左邊為數據源,右邊為監測結果輸出,我們在之前已經提過。中間是整個系統的核心部分。可以看到,底層的 Flink 是整個實時系統的根基,上方綠色部分為三種 ETL 類型:統計指標、實體關系與序列。我們可以將這些 ETL 類型轉換為由 Drools 規則引擎解析並配合機器學習算法的 Scenario 規則。同時對於規則和告警事件,我們需要提供良好的交互界面。此圖中省去了數據源輸入所依賴的一些技術,比如 Kafka,Elasticsearch。而每一條數據在進入 Flink 時會按 user key 做 partition。同時我們根據場景特點對 Drool 規則引擎做了必要的修改,定義了規則所包含的一些基本概念。這一點會在下面具體展開。
規則引擎方面我們有兩個選擇:Flink 原生 CEP 組件和 Drools 規則引擎。那么兩者各有什么優勢和劣勢呢?首先我們看一下 Flink CEP。當前穩定的 Flink1.3 版本的 CEP 是一套極具通用性、易於使用的實時流式事件處理方案。作為 Flink 的原生組件,省去了第三方庫與 Flink 配合使用時可能會導致的各種問題。但其功能現階段看來還比較基礎,不能表達復雜的業務場景,同時它不能夠做到動態更新(這是一個痛點)。具體如何解決我們稍后會看到。
接下來我們看看什么是 Drools。Drools 是一套基於 JVM 的,實現了 RETE 算法的規則引擎。它可以將多變的規則從硬編碼中解放出來,以規則腳本的形式存在。右邊圖中顯示的是一個典型的 Drools 規則的定義方式。可以看到,其語義與 Java 非常類似。既可以導入既有的 Java POJO(圖中 Person 類),也可以在規則文件中直接定義類(EventA)。when 語句中是具體的判斷條件,then 語句中是滿足判斷條件之后所做的操作。操作可以是任意的,不僅限於對滿足條件的那個對象進行操作。比如你可以在 then 里調用某個 Java 類的方法,或者調用某個全局變量。總之,可以在 Drools 規則文件中 import Java 類,然后對其進行操作。
那么 Drools 有些什么優缺點呢?它最大優勢在於語法規則簡單,類似 Java,編寫門檻不高、能夠無縫化與 Java 集成,且用戶可以對 Drools 規則進行動態配置。但這套方案也存在着自己的不足之處:例如其內置聚合功能速度緩慢,不適合我們自身或者客戶使用場景下的大量聚合操作任務。另外,其內置事件序列處理機制也需要消耗大量內存資源。
具體如何攻克這些缺點,待會兒我們會提到。接下來我們先看看如何將業務場景轉變成 Drools 規則。
作為常用的業務場景,我們需要將三種 ETL 類型翻譯成對應的 Drools 規則。具體來講,由 Flink 對源數據進行預處理生成的事件數據中的每一行都需要由三種 ETL 類型進行處理:統計指標、實體關系與序列,並借此將內容轉換為實際行為。統計指標是指對某個或某幾個字段在特定窗口內做聚合,例如一小時內的某用戶的登錄次數或者某個時間段內連續登錄失敗的次數。實體關系關注的是兩個實體之間的關聯關系。實體可以是用戶,部門,設備,郵件,地理位置等。那么哪個用戶使用了哪台設備就是一個實體關系。哪封郵件發給了哪個用戶也是一個實體關系。
下面我們來看一個具體的例子。
可以看到我們這里有一條檢測 VPN 可疑行為的規則。規則當中包含三條判斷條件。第一條 metric 用來判斷一小時能登錄失敗的次數。第二條演示的是用戶與設備之間的實體關系,表達式 expression == “[vpn.user, vpn.device]”說明了這一點。第三條演示的是在序列算法下異常值大於 50 的行為。最后會將滿足條件的三個行為收集起來發送給下游的模塊。下游模塊可以是另一個算子,或者是持久化結果的 DB。
有了 Flink 作為流計算引擎,有了 Drools 作為規則引擎,那么我們如何將兩者結合放到一個系統里發揮作用呢。我們需要做的是將源數據輸入到 Flink 生成所謂的事件流,同時將 Drools 規則文本讀取到 Flink 生成所謂的規則流。而 Flink 中提供了一個 CoFlatMapFunction 可以將兩個流結合起來進行分析。在這個 function 里我們所要做的就是將在 Flink 里結合機器學習算法計算出來的結果與 Drools 規則進行匹配。
但事實上,這個方案在實際運行當中會有一些性能上的問題。這些問題主要表現在長周期行為的分析上。比如,機器學習算法需要對長周期行為(數據往往跨越三個月)進行計算,得出異常值。那么這種情況下我們需要維護算法生成的長周期行為的狀態。具體方法可以是直接保存在 Drools Engine 中,或者將其保存在外部 DB 中,再或者可以利用 Flink 的 stateful operator 來維護狀態。但現有情況下,每種方法都多多少少會有一些問題。接下來我們看看具體問題都有哪些。
需要保存過往窗口的狀態,作為中間結果送入 Drools 規則引擎進行計算。Flink 內置的窗口機制在窗口結束時會清除窗口狀態。 Flink 內置的 RocksDB 存儲結構在窗口清理時會自動刪除數據。 Flink 產生的長周期聚合結果被送入 Drool 規則引擎進行匹配的時候往往會消耗大量內存。可以看到,主要的痛點就在於中間結果的維護和資源消耗的問題。面對這些問題我們可以嘗試以下的做法。
首先想到的是用 redis,memcached 之類的 KV store 來保存中間結果。但實際測試結果表明,它們的性能趕不上 Flink 的速度。所以在追求高吞吐量的情況下,此方法行不通。其次,可以通過修改 Flink RockDB backend 的源碼來解決窗口清理時自動刪除數據的問題。同時為了保證過期數據不擠壓,需要引入“TTL”(time to live)屬性,是的 rocksdb 在超時的時候自動刪除過期數據。內存問題主要是由 Drools 引擎引起的。因為每一條事件與規則匹配都會生成一個 Fact,默認情況下 fact 無論是否匹配,Drools 都不會立刻刪除它。你必須手動的刪除它。但當事件數量過大或者規則數量過大時,即使你手動刪除沒有匹配的 fact,可能也會出現某一時間段大量 fact 存在於內存中的情況。所以可行的辦法是設定閾值來控制內存中允許同時存在的 fact 的數量,同時清理失效的 fact。或者也可以盡量保持規則簡單化。復雜的聚合規則交給 Flink 去做。
可以看到,以上方案所產生的性能問題主要在於 Drools。其實除了以上的方案,我們還有一個 Plan B。Flink1.4 Snapshot 版本增加了一些新功能。利用這些新功能,我們可以直接使用 Flink CEP 並做到動態更新。這些功能主要包括:新版本加入了對算子粒度的操作。我們可以 checkpoint 某一個算子的狀態。同時 Flink CEP 中新增了 pattern group 的概念。可以將多個規則 pattern 歸為同一個 group。這樣增加了規則的表達能力。利用這些功能,我們重新設計了一個系統來實現規則的動態更新。下面我們來看一下新設計的工作流程。
簡單來講,整個工作流程就是用戶更新規則,新規則被翻譯成 Java 源碼,然后編譯並打包成可執行 jar,這個時候系統將觸發 Flink 的 Savepoint,保存當前 operator 的狀態,然后 cancel 當前運行的 Flink Job,然后把新生成的 jar 發布到 Flink 上去,同時讀取最新的 operator 狀態,恢復整個系統的運行。值得提出的一點是,根據規則文件里規則的數量和復雜度。我們可以划分規則生成多個 jar 發布到 Flink 上。這樣單個 job 的負載就不至於過高。這種動態生成規則代碼的方式擴展性和並發性更出色,不存在單一大負載算子。缺陷在於從 Savepoint 到整個流程恢復會有數秒延遲。
具體實踐過程中我們發現,如果一個 pattern 對應一條規則流的話,當 pattern 數量過大,程序初始化時就會內存溢出。那么自然而然就想到多個 pattern 對應一條規則流。這就需要用到新版本中的 GroupPattern 的概念。右邊圖中可以看到 下方的 patterns 是五個 pattern 的組合。這樣就解決了內存溢出的問題。另外提一點,此方案下提到的規則並非 Drools 規則,而是根據 Flink CEP 重新定義的一套語法規則。
接下來會有一個視頻 demo,我會對 demo 流程做簡要說明。
https://v.qq.com/x/page/g0553z9fa6n.html
首先我們登錄系統,大家可以看到這里我已經預置了一條規則。此規則中包含兩個判斷條件:轉賬數額大於 19000 和小於 300 的。我們現在發布這條規則到 Flink 來看看它是否生效。大家可以看到,在警告界面已經有滿足規則的事件被告出。
回到規則界面,我們先關閉這條規則並新增一條按時間窗口聚合的規則。在 10 秒的翻轉窗口中設置 5s 的滑動窗口來計算轉賬數額的總和,一旦總和大於 100000,就被視為可疑事件被告警。同樣我們發布這條規則到 Flink 再去告警界面看是否有滿足條件的事件。
很顯然新規則已經生效,相關告警已經顯示在告警界面。同時第一條規則已失效,大家可以看到沒有滿足第一條規則的事件被告出。讓我們再一次回到規則配置界面,讓第一條規則再次生效並發布它。這樣我們就有兩條規則同時在運行。回到告警界面我們可以看到兩條規則都已生效。
問答環節
Q(1)這個規則引擎支持自定義變量或者簡單邏輯運算么? Q(2)drools 的性能指標大概是多少?
A:1、無論是 drools 規則還是我們自定義的 CEP 規則都支持自定義變量或者簡單邏輯運算。
2、drools 加入到 Flink 里之后,整體的吞吐量大致在 100k 到 200k EPS。
Q:請問翻轉窗口是什么意思?
A:翻轉窗口即 Tumbling Window,是指有固定時間大小的窗口。比如 1 分鍾的翻轉窗口。當 1 分鍾時間到的時候。對應的窗口函數會被觸發,這個函數里會包含這個窗口內的所有記錄。函數執行完后會翻轉到下一個時間窗口。兩個時間窗口不重疊。
Q: 請問 flink 做實時計算,穩定性如何?
A:Flink 的穩定性很高。即使出現由於性能問題引起的程序 halt,你也可以通過 flink 提供的后台界面操控正在運行的 job。只要當前 job 是做了 checkpoint 的,你都可以停止當前 job,重啟后會從最近的一次 checkpoint 運行。
Q:我想了解下瀚思的老師有沒有調研過 storm+esper 這種方式。另外 spark structured streaming 和 esper 都支持類 SQL 的規則,感覺更方便一些
A:esper 我們有調研過,但是從我們當前已有的場景規則來看,esper 的吞吐量不能和 Flink 相媲美。且 esper 對與硬件的要求也比 Flink 高,相對來說更加吃內存。至於 spark structured streaming,從它給出的官方文檔來看,有些我們需要的場景它是無法支持的。比如多個流的聚合操作。事實上,我們也正在開發一套類似 SQL 語法的規則引擎。
講師介紹
吳昊,瀚思科技高級軟件架構師,畢業於 University of Waterloo 計算機系。
精通領域:大數據處理,大型流式處理系統架構設計