背景
- 數據量不斷增加,企業需要靈活快速地處理這些數據。
- 處理器主頻和散熱遇到瓶頸,多核處理器成為主流,並行化計算應用不斷增加。
- 開源軟件的成功使得大數據技術得以興起。
互聯網技術的發展讓大多數企業能夠積累大量的數據,而企業需要靈活快速地從這些數據中提取出有價值的信息來服務用戶或幫助企業自身決策。然而處理器的主頻和散熱遇到了瓶頸,CPU難以通過縱向優化來提升性能,所以多核這種橫向擴展成為了主流。也因此,開發者需要利用多核甚至分布式架構技術來提高企業的大數據處理能力。這些技術隨着開源軟件的成功而在業界得到廣泛應用。
下面我稍微介紹一些大數據應用中通常出現的一些原理或者說是特征。
基本原理
分布式:將數據分布到不同的節點(機器),從而存儲大量的數據。而分布式同時為並行讀寫和計算提供了基礎,這樣就能提高數據的處理能力。
為什么不直接使用分布式的關系型數據庫,比如主從模式的mysql?這主要是效率的問題。分布式關系型數據庫為了實現分布式事務、線性一致性、維護自身索引結構等功能會對性能造成影響。而正如剛剛背景所提到,大數據需求重點是快速處理大量數據,幫助用戶和企業的決策。這個決策就包括推薦、監控、數據分析等。這些場景並不一定需要數據庫這種嚴格的約束,是OLAP而非OLTP。所以大數據技術會通過解除這些限制而提升性能。
除了分布式外,還可以利用
- 批量處理:單位是上百MB的數據塊而非一條條數據,這樣在數據讀寫時能夠整體操作,減少IO尋址的時間消耗。
- 最終線性一致性:大數據技術很多都放棄線性一致性,這主要是跨行/文檔(關系型模型叫行,文檔型模型叫文檔)時非原子操作,在一行/一個文檔內還是會做到原子的。為了讀寫性能而允許跨行/文檔出現讀寫延遲。
- 增加數據冗余:規范化的數據能夠減少數據量,但在使用時需要關聯才能獲得完整數據,而在大數據下進行多次關聯的操作是十分耗時的。為此,一些大數據應用通過合並寬表減少關聯來提高性能。
- 列式存儲:讀取數據時只讀取業務所關心的列而不需要把整行數據都取出再做進行截取,而且列式的壓縮率更高,因為一列里一般都是同類的數據。
可靠性相關
- 副本:大數據存儲通常都會有副本設置,這樣即便其中一份出現丟失,數據也能從副本找到。
- 高可用:大數據應用通常都會考慮高可用,即某個節點掛了,會有其他的節點來繼續它的工作。
由於這個分享會的標題起得有點大,包括存儲、搜索、計算三大塊,而且篇幅有限,所以我就只根據這三塊中我了解且比較流行的開源組件來分享,而且只講解大概的原理。畢竟下面的每個組件的原理和實戰都可以各自出一本五六百頁的書了,而且還沒涉及源碼細節的。下面首先來介紹分布式文件系統,就是把我們單台計算機的文件系統擴展到多台。
HDFS(Hadoop Distributed File System)
架構原理
圖中有8台機器或者容器,兩個client、5個DataNode、1個NameNode。一個分布式文本系統,組成:NameNode、DataNode和secondary namenode
-
NameNode:作為master,管理元數據,包括文件名、副本數、數據塊存儲的位置,響應client的請求,接收workers的heartbeating和blockreport。
-
DataNode:管理數據(data block,存儲在磁盤,包括數據本身和元數據)和處理master、client端的請求。定期向namenode發送它們所擁有的塊的列表。
-
secondary namenode:NameNode的輔助檢查點。通過定時查詢NameNode上的edit logs,把logs更新到fsimage,最后還要把fsimage復制回NameNode的fsimage上。這樣避免NN重啟時要執行大量的logs。
-
Block:默認128MB,但小於一個block的文件只會占用相應大小的磁盤空間。設置100+MB是為了盡量減少尋址時間占整個數據讀取時間的比例,但如果block過大,又不適合數據的分散存儲或計算。將數據抽象成block,還有其他好處,比如分離元數據和數據的存儲、存儲管理(block大小固定方便計算)、容錯等。
讀寫流程
寫入:client端調用filesystem的create方法,后者通過RPC調用NN的create方法,在其namespace中創建新的文件。NN會檢查該文件是否存在、client的權限等。成功時返回FSDataOutputStream對象。client對該對象調用write方法,這個對象會選出合適存儲數據副本的一組datanode,並以此請求DN分配新的block。這組DN會建立一個管線,例如從client node到最近的DN_1,DN_1傳遞自己接收的數據包給DN_2。DFSOutputStream自己還有一個確認隊列。當所有的DN確認寫入完成后,client關閉輸出流,然后告訴NN寫入完成。
讀取:client端通過DistributedFileSystem對象調用open方法,同樣通過RPC調用遠程的NN方法獲取所要查詢的文件所涉及的blocks所存儲的DN位置,而且這些位置是按照距離排序的。返回的結果是一個FSDataInputStream對象,對輸入流對象調用read方法。輸入流會從距離最近的DN中讀取數據,將數據傳遞到client,讀取結束后關閉流。
這個機制看上去是很笨重的,有了這個分布式文件系統的基礎,其他組件就能利用這個系統提供的 API 來對數據的存儲進行優化。在介紹下一個組件前,先對主要的主鍵索引作簡單的介紹。
索引
類型 | 哈希 | SSTables/LSM樹 | BTree/B+Tree |
---|---|---|---|
大致原理 | 數據結構:哈希表。 | 內存:有序集合,例如紅黑樹、平衡二叉樹、跳躍表。 磁盤:一個個獨立文件,里面包含一個個數據塊。 寫入:內存維護一個有序集合,數據大小達到一定閾值寫入磁盤。后台會按照特定策略合並segment。 讀取:先查詢內存,然后磁盤中的最新segment,然后第二新,以此類推。 |
數據結構:平衡多叉樹。寫入:通過二分查找找到相應的葉子結點進行修改。讀取:同上。 |
優勢 | 適合數據經常更新 | 寫入快, 順序讀取快, 容易壓縮 | 讀取快,更時間可控 |
劣勢 | 必須存儲在內存;范圍查詢效率低 | 隨機讀取,讀取舊數據較慢 | 寫入較慢 |
涉及數據庫 | Mysql、Redis | MongoDB、Elasticsearch、HBase | Mysql、MongoDB |
主要的主鍵索引有哈希、LSM、BTree。下面主要涉及到LSM樹,所以哈希和BTree這里就不多說了。LSM樹有內存和磁盤兩個部分....,以跳躍表為例,大致的模型如下圖
內存的 MemStore 是一個有序集合,數據寫入會先寫入這里,當大小達到閾值就會 flush 到磁盤。而后台會有程序按一定策略對這些文件進行合並。合並的原因有:減少小文件,進而減少讀取時IO來提升讀性能。數據合並,比如圖中第二個file有數據a,但現在客戶端發送請求要把它刪掉或進行修改,如果每次刪改都要把數據找到再調整,就會有大量的磁盤IO,所以這些操作一般只做標記,等到后續文件合並時才真正對數據進行修改。還有一個原因是調整排序,因為flush后數據只在file內部有序,合並能夠調整整體排序。正因為這種結構,所以LSM的寫入是很快的,范圍讀取也快,因為數據已經有序。而為了保證不讀取到舊版本的數據,所以讀取需要從最新的開始遍歷,這也導致讀取舊數據的效率較低。當然,這里面還能優化,但細節就不說了。
HBase
簡介
HBase 就是基於 HDFS API 構建的一個可以在線低延遲訪問大數據的NoSQL數據庫。本質上就是給 HDFS 加上一個 LSM Tree 索引,從而提高讀寫性能。當然,即便優化了,這個高性能也是相對大數據量而言。實際上“HBase並不快,只是當數據量很大的時候它慢的不明顯”。由於是 NoSQL 數據庫,所以它有文檔型數據庫的弱項,即基本不支持表關聯。
特點
- 適合:
- 數據量大,單表至少超千萬。對稀疏數據尤其適用,因為文檔型數據庫的 null 就相當於整個字段沒有,是不需要占用空間的。
- 高並發寫入,正如上面 LSM 樹所說。
- 讀取近期小范圍數據,效率較高,大范圍需要計算引擎支持。
- 數據多版本
- 不適合:
- 小數據
- 復雜數據分析,比如關聯、聚合等,僅支持過濾
- 不支持全局跨行事務,僅支持單行事務
場景
- 對象存儲:新聞、網頁、圖片
- 時序數據:HBase之上有OpenTSDB模塊,可以滿足時序類場景的需求
- 推薦畫像:特別是用戶的畫像,是一個比較大的稀疏矩陣,螞蟻的風控就是構建在HBase之上
- 消息/訂單等歷史數據:在電信領域、銀行領域,不少的訂單查詢底層的存儲,另外不少通信、消息同步的應用構建在HBase之上
- Feeds流:典型的應用就是xx朋友圈類似的應用
更多適用場景可以根據HBase的特點判斷
架構原理
這里大概有10台機器或節點,5個DataNode、兩個RegionServer、一個Client、Master、ZooKeeper
-
Client:發送DML、DDL請求,即數據的增刪改查和表定義等操作。
-
ZooKeeper(類似微服務中的注冊中心)
- 實現Master的高可用:當active master宕機,會通過選舉機制選取出新master。
- 管理系統元數據:比如正常工作的RegionServer列表。
- 輔助RS的宕處理:發現宕機,通知master處理。
- 分布式鎖:方式多個client對同一張表進行表結構修改而產生沖突。
-
Master
- 處理 client 的 DDL 請求
- RegionServer 數據的負載均衡、宕機恢復等
- 清理過期日志
-
RegionServer:處理 client 和 Master 的請求,由 WAL、BlockCache 以及多個 Region 構成。
- HLog(WAL):提高數據可靠性。寫入數據時先按順序寫入HLog,然后異步刷新落盤。這樣即便 MemoStore 的數據丟失,也能通過HLog恢復。而HBase數據的主從復制也是通過HLog回放實現的。
- BlockCache
- Region:數據表的一個分片,當數據表大小達到一定閾值后會“水平切分”成多個Region,通常同一張表的Region會分不到不同的機器上。
- Store:一個Store存儲一個列簇,即一組列。
- MemStore和HFile:寫緩存,閾值為128M,達到閾值會flush成HFile文件。后台有程序對這些HFile進行合並。
- Store:一個Store存儲一個列簇,即一組列。
讀寫過程
- client 根據待寫入數據的主鍵(rowkey)尋找合適的 RegionServer 地址,如果沒有符合的,就向 zookeeper 查詢存儲HBase元數據表的 RegionServer 地址。
- client 從第一步找到的 RegionServer 查詢HBase元數據表,找出合適的寫入地址。
- 將數據寫入對應的 RegionServer 的 Region。
寫入和讀取的流程類似。
ElasticSearch
簡介
Elastic Stack 是以 Elasticsearch 為中心開發的一組組件,其中Kibana、Logstash、Beats使用較多。
Beats 是用 GO 實現的一個開源的用來構建輕量級數據匯集組件,可用於將各種類型的數據發送至 Elasticsearch 與 Logstash。
Logstash:流入、流出 Elasticsearch 的傳送帶。其他MQ或計算引擎也可以導入ES。
利用 Logstash 同步 Mysql 數據時並非使用 binlog,而且不支持同步刪除操作。
Kibana 是 ES 大數據的圖形化展示工具。集成了 DSL 命令行查看、數據處理插件、繼承了 x-pack(收費)安全管理插件等。
Elasticsearch 搜索引擎,它並不是基於 HDFS 建立的,而是自己實現了分布式存儲,並通過各種索引和壓縮技術來提高搜索的性能。當然,它作為文檔型數據庫,其在內存組織數據的方式也是類似LSM樹的。
特點
-
適合:
-
全文檢索,like "%word%"
-
一定寫入延遲的高效查詢
-
多維度數據分析
-
-
不合適:
- 關聯
- 數據頻繁更新
- 不支持全局跨行事務,僅支持單行事務
場景
- 數據分析場景
- 時序數據監控
- 搜索服務
框架原理
Cluster
Node:JVM進程
- Master:主要負責集群中索引的創建、刪除以及數據的Rebalance等操作。
- Data:存儲和檢索數據
- Coordinator:請求轉發和合並檢索結果
- Ingest:轉換輸入的數據
Index:一組形成邏輯數據存儲的分片的集合,數據庫
Shard:Lucene 索引,用於存儲和處理 Elasticsearch 索引的一部分。
Segment:Lucene 段,存儲了 Lucene 索引的一部分且不可變。結構為倒排索引。
Document:條記錄,用以寫入 Elasticsearch 索引並從中檢索數據。
增刪改查原理
Update = Delete + (Index - Ingest Pipeline)
細節補充
倒排索引
一般正向的就是通過文檔id找相應的值,而倒排索引則是通過值找文檔id。通過倒排這種結構,判斷哪些文檔包含某個關鍵詞時,就不需要掃描所有文檔里面的值,而是從這個關鍵詞列表中去搜索即可。而頻率主要是用來計算匹配程度的,默認使用TF-IDF算法。
為什么全文檢索中 ES 比 Mysql 快?
Mysql 的輔助索引對於只有一個單詞的字段,查詢效率就跟 ES 差距不大。
select field1, field2
from tbl1
where field2 = a
and field3 in (1,2,3,4)
這里如果 field2 和 field3 都建立了索引,理論上速度跟 es 差不多。es最多把 field2 和 field3 concat 起來,做到查詢時只走一次索引來提高查詢效率。
但如果該字段是有多個單詞,那么缺乏分詞的 Mysql 就無法建立有效的索引,且查詢局限於右模糊,對於“%word%”的搜索效率是極低的。而 ES 通過分詞,仍然可以構建出 term dictionary。
然而 Term Dictionary 和 Position 加起來是很大的,難以完全存儲在內存。因此,在查找 Term Dictionary 的過程會涉及磁盤IO,效率就會降低。為此,Luence 增加了 term index。這一層通過 Lucene 壓縮算法,使得整個 Term Index 存儲在內存成為可能。搜索時在內存找到相應的節點,然后再到 Term Dictionary 找即可,省去大量磁盤IO。
內存消耗大
ES 之所以快,很大程度是依賴 Lucene 的緩存以及緩存中的索引結構。而這些緩存只有被預先加載到內存才能做到快速的響應,查詢沒有被加載的數據通常都是比較慢的,這是 ES 需要大量內存的原因之一。所以有人建議 ES 僅作為內存索引庫,即與where、group by、in、sort等過濾、聚合相關的才存儲到 ES,而且其他字段並不能幫助查詢,只會浪費內存空間。而查詢得出的id將返回通過 Mysql 或者 Hbase 進行第二次的查詢。由於是主鍵的搜索,所以不會耗費太多時間。
而 ES 由於給了大部分內存到 Lucene 緩存,那自己聚合計算時用的內存空間就很有限了,這也是 ES 需要大量內存的原因。
從上面的介紹我們可以知道,ES 是不支持關聯的,而且聚合計算的資源很有限。那這時就用到計算引擎了。
計算引擎
計算引擎目前主流的兩個開源組件分別是 Spark 和 Flink。從兩個引擎的處理模型來看,Spark 的批處理更為高效,Flink 則善於流處理,盡管兩者都向着流批一體化的方向發展。當然,只要對弱項做優化還是可以跟另一方未做太多優化的強項比的,只是實現難度大些和效果上限可能略低。比如 Blink,阿里內部的 Flink,其 ML 模塊經過優化,在大部分常用模型的計算效率都能高於開源的 Spark 的。如果開源 Spark 也經過阿里那樣深度的優化,兩者的差距就難說了。
簡單提一下他們的特點
- 適合:大批量數據的靈活計算,包括關聯、機器學習、圖計算、實時計算等。
- 不適合:小量數據的交互式計算。
Spark
下面首先介紹 Spark,它是一個用於大規模數據處理的統一分析引擎,其內部主要由 Scala 實現。Spark 當初引起關注主要是它與 Hadoop 的三大件之一的 MapReduce 之間的比較。Hadoop 的三大組件包括 HDFS、Yarn 和 MapReduce。他們三個都是可以拆分開來單獨使用的。比如 Yarn 作為資源調度系統,傳統 Spark 和 Flink 都會借助它的功能實現任務的調度。而 MapReduce 作為計算引擎,其計算速度當時是弱於 Spark 的,主要是 Spark 減少了不必要的磁盤IO;增加迭代計算功能,從而更好支持機器學習;引入了一些自動優化功能。另外,Spark 廣泛的語言支持、API 更強的表達能力等優點都讓 Spark 在當時的離線計算領域中超越 MapReduce。
功能豐富
4大場景:Spark 的高層組件包括Spark SQL、Spark Streaming、Spark ML、GraphX。他們都是通過底層組件為 Spark Core 實現具體功能的。但是在使用 Spark 的時候,盡量是不要使用 Spark Core,因為高層組件的產生的 Spark Core一般會更高效,因為Spark做了不少優化,具體后面再說。
多種語言:支持 Java、Python、R 和 Scala 來編寫應用代碼。
多種部署模式:本地、獨立部署、Mesos、Yarn、K8S
多種數據源:HDFS、HBase、Hive、Cassandra、Kafka等
架構原理
Driver 是啟動 Spark 作業的JVM進程,它會運行作業(Application)里的main函數,並創建 SparkContext 對象。這個 SparkContext 里面包含這次 Spark 計算的各種配置信息。Spark 通過它實現與 Cluster Manager 通信來申請計算資源。這里的 Cluster Manager,在生產環境一般是 Mesos、Yarn 或者 K8s。這些 Manager 根據其管理的集群情況,給這個 Spark 任務分配相應的容器container,在容器中啟動 executor 進程。這些啟動后的 executor 會向 Driver 注冊,之后 Driver 就可以把它根據用戶計算代碼生成出的計算任務task發送給這些 executor 執行。計算結束后,結果可能輸出到 Driver,也可能輸出到當前 executor 的磁盤,或者其他存儲。
作業例子
object SparkSQLExample {
def main(args: Array[String]): Unit = {
// 創建 SparkSession,里面包含 sparkcontext
val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.getOrCreate()
import spark.implicits._
// 讀取數據
val df1 = spark.read.load("path1...")
val df2 = spark.read.load("path2...")
// 注冊表
df1.createOrReplaceTempView("tb1")
df2.createOrReplaceTempView("tb2")
// sql
val joinedDF = sql(
"""
|select tb1.id, tb2.field
|from tb1 inner join tb2
|on tb1.id = tb2.id
""".stripMargin)
// driver 終端顯示結果
joinedDF.show()
// 退出 spark
spark.stop()
}
}
SQL會經過一層層的解析然后生成對應的 Java 代碼來執行。
計算引擎的優勢
與 HBase、 es 和傳統數據庫查詢比較,計算引擎的優勢:1)數據量大時速度快,2)計算更加靈活。
以大數據關聯為例:
- 文檔型數據庫:大部分都不支持關聯,因為效率低。關聯基本都要全文檔掃描。因為文檔是 schemaless 的,並不確定某個文檔是否有關聯所需字段。而且個文檔的讀取都是整個對象的讀取,並不會只讀某個字段來減少內存開銷。另外,這兩個組件在內存中本身就有各自的數據結構來服務讀寫,所以額外的內存用於這類大開銷計算也是不現實的。因此,HBase 本身只支持簡單的過濾,不支持關聯。ES 即便支持過濾、聚合,但依然不支持關聯。
- 傳統關系型數據庫:可以完成較大數據關聯,然而效率低,這主要是受到其大量的磁盤 IO、自身服務(讀寫、事務等、數據同步)的干擾。在真正大數據情況下,這關聯還涉及數據在不同機器的移動,數據庫需要維持其數據結構,如 BTree,數據的移動效率較低。
- 計算引擎:
- 基於內存:計算引擎留有大量內存空間專門用於計算,盡量減少磁盤 IO。
- 計算並行化
- 算法優化
具體而言,Spark 提供了三種 Join 執行策略:
- BroadcastJoin:當一個大表和一個小表進行Join操作時,為了避免數據的Shuffle,可以將小表的全部數據分發到每個節點上。算法復雜度:O(n).
- ShuffledHashJoinExec:先對兩個表進行hash shuffle,然后把小表變成map完全存儲到內存,最后進行join。算法復雜度:O(n)。不適合兩個表都很大的情況,因為其中一個表的hash部分要全部放到內存。
- SortMergeJoinExec:先hash shuffle將兩表數據數據相同key的分到同一個分區,然后sort,最后join。由於排序的特性,每次處理完一條記錄后只需要從上一次結束的位置開始繼續查找。算法復雜度:O(nlogn),主要來源於排序。適合大表join大表。之所以適合大表,是因為 join 階段,可以只讀取一部分數據到內存,但其中一塊遍歷完了,再把下一塊加載到內存,這樣關聯的量就能突破內存限制了。
從上面的例子可以看出計算引擎相比於其他組件在計算方面的優勢。
數據流動
下面通過一張圖,從另一個角度了解 Spark 的運作。
這是一張簡單的數據流程圖。描述了一個 WorkCount 的數據流向。其主要代碼如下:
// 假設每個 block 里的數據如下
// a
// b
// a
val textFile = sc.textFile("hdfs://...")
val counts = textFile.map(word => (word, 1)) // a -> <a,1>
.reduceByKey(_ + _) // <a,<1,1>> -> <a,2>
counts.saveAsTextFile("hdfs://...")
圖中同一階段有多個數據流體現的是並行。中間的 shuffle 是在聚合、關聯、全局排序等操作時會出現的。比如這里的 reduceByKey 就是將相同 key 的數據移動到相同的 partition。這樣就能對所有的 a 進行加總,從而得出 a 的總數。
上圖的任務是一次性的,或者是周期性的,數據的驅動是拉取型的。如果將數據塊換成數據流,map 和 reduce 在啟動后就一直存在,並接受數據源不斷發送過來的信息,那就變成了流計算。即由周期性變為一直處理,從而變為實時處理,由主動拉取變為被動接收的形式。下面就來介紹 Flink 計算引擎。
Flink
Flink 同樣是分布式的計算引擎,主要基於Java實現,但它的特色主要體現在流式計算。這個引擎流行的主要推手是阿里。阿里在19年初開源了它修改過的 Flink,收購了 Flink 的母公司,並在各種線下技術論壇上推廣 Flink,讓 Flink 在 19 年的關注度極速上升。
除了在實時計算領域,Flink 在其他領域或許稍微落后於 Spark,畢竟 Spark 發展比較早,其生態比 Flink 要成熟更多。Flink 目前支持 Scala、Java 和 Python 來寫任務代碼。功能上同樣支持批計算、ML、Graph。部署工具、支持的數據源也 Spark 類似。
場景
- 實時分析/BI指標:比如某天搞活動或新版本上線,需要盡快根據用戶情況來調整策略或發現異常。
- 實時監控:通過實時統計日志數據來盡快發現線上問題。
- 實時特征/樣本:模型預測和訓練
架構原理
細節補充
和 Spark 一樣,Flink 也會根據 SQL 或者業務代碼生成 DAG 圖,然后將任務划分並發送給不同的節點執行。最大的不同正如之前所說,數據是實時地、一條條或一小批一小批地不斷流進這些節點,然后節點輸出響應的結果。而在這種場景下,Flink 在一定程度上解決了實時處理中的不少難點。
- 保證數據剛好被處理一次,即便在計算過程中出現網絡異常或者宕機。
- event-time處理,即按照數據中的時間作為計算引擎的時間,這樣即便數據上報出現一定的延遲,數據仍然可以被划分到對應的時間窗口。而且還能對一定時間內的數據順序進行修正。
- 在版本升級,修改程序並行度時不需要重啟。
- 反壓機制,即便數據量極大,Flink 也可以通過自身的機制減緩甚至拒絕接收數據,以免程序被壓垮。
與 Spark 比較
Spark:
- 拉模型
- 系統更加成熟,尤其是離線計算
- 生態更加完善
Flink:
- 推模型
- 實時計算更優秀
- 阿里推動,正在迅速發展
- 生態對國內更為友好
小紅書實時技術
小紅書舊的離線框架和我們現在的大數據體系有點類似,都是把埋點數據上報到日志服務,然后進入離線數倉,只是小紅書用 Hive,我們用 DataWorks。然后我們同樣也有 T+1 的用戶畫像、BI報表和推薦的訓練數據。
而后續的實時框架是這樣的
日志服務的埋點數據先進入 Kafka 這一消息隊列里面。不太清楚為什么要加上 Kafka 這一中間件,或許當時並沒有開源的 日志服務到Flink 的 connecter 吧。但總之,引入 Flink 之后就可以實時累計埋點中的數據,進而產生實時的畫像、BI指標和訓練數據了。下面介紹一下這個實時歸因
如上圖所以,用戶app屏幕展示了4個筆記,然后就會有4條曝光埋點,而如果點擊筆記、點贊筆記以及從筆記中退出都會有相應的埋點。通過這些埋點就可以得出右面兩份簡單的訓練或分析數據。這些數據跟原來已經積累的筆記/用戶畫像進行關聯就能得出一份維度更多的數據,用於實時的分析或模型預測。實時模型訓練這一塊至少小紅書在19年8月都還沒有實現。下圖是小紅書推薦預測模型的演進
那么如何進行實時訓練深度學習模型呢?以下是我的一些想法。借助一個阿里的開源框架flink-ai-extended。
如上圖所示,這是 flink 的數據流結構圖,左邊 source 為數據源,然后進過join、udf等算子進行訓練樣本數據的生成,然后傳遞給一個 UDTF/FlatMap 算子,這實際上也是一個 Flink 節點,但它里面包含的是 Tensorflow 的訓練 worker,而上下也是 Flink 的節點,都是包含了 Tensorflow 訓練所需的一些角色,這樣數據源源不斷地實時進入 TF 模型來完成實時訓練。TF 也可以因此借助 Flink 的分布式框架來完成分布式的學習。多台GPU或者CPU或許應該會比一台GPU的訓練效率更高。
這個框架同時適用於模型預測,只要把里面的訓練角色換成訓練完成的 model,也就可以進行實時的預測,而且這里借助 Flink 內部的通信機制,效率應該會比普通的 http 調用要快不少。
總結
本次分享由於時間有限,講的都是比較淺層的東西,實際上剛剛所說的每一個組件里面包含的內容都不少,都可以作為一個長遠的目標去研究和改造。說回分享的主題之一,使用場景。
首先是存儲,上述介紹的 HDFS、HBase、ES(ES雖然是搜索引擎,但它也可以在某些方面替代傳統關系型數據的功能) 都是適用於 OLAP 場景,即分析推薦而非事務。從公司目前的情況來看,HDFS 基本可以忽略,因為已經有 DataWork,數據的存儲暫時不是問題。更多的問題在於數據使用時的性能。HBase 和 ES 作為文檔型數據庫,適合一對多的數據模型,比如將帖子和其評論作為一個整體來存儲。對於多對一、多對多的模型,文檔型數據庫實際上並不合適,但可以通過合並寬表、應用層關聯等方式在一定程度上進行彌補。而如果多對多關系確實復雜、量大、文檔型數據庫性能無法滿足,比如一些大型社交網絡,那么可以考慮圖數據庫。
當決定嘗試文檔型數據庫時,HBase 的特點在於較為快速地查詢小范圍的新數據,而且這條數據可以很大。ES 的特點則在於快速的全文檢索、准實時的數據分析。當然,分析的復雜度是不能跟計算引擎比的,比如關聯、機器學習等。但通過合並寬表、各種where、group by操作,還是能滿足不少需求的,尤其是應用的搜索功能,ES 實現起來是比較簡單的。目前公司並沒有應用它的強項,最好由專人負責它的調試,尤其是搜索排序方面。
然后是計算引擎,目前公司用的 MaxCompute 已經能夠滿足離線計算的各種需求,或者就欠缺實時計算了。但公司目前實時性需求不多而且也不緊急,所以開發一直都沒有啟動。目前就看明年推薦是否有這樣的需求,而且有相應的prd出來了。而考慮到成本和靈活性,自建或許是更好的選擇,比如剛剛提到的 Flink + Tensorflow。
以上便是這次分享會的全部內容,謝謝大家的參與。
參考:
書籍:
- Martin Kleppmann: “Designing Data-Intensive Applications”, O’Reilly Media, March 2017
- Tom White: “Hadoop: The Definitive Guide”, 4th edition. O’Reilly Media, March 2015
- 胡爭, 范欣欣: “HBase原理與實踐”, 機械工業出版社, 2019年9月
- 朱鋒, 張韶全, 黃明: “Spark SQL 內核剖析”, 電子工業出版社, 2018年8月
- Fabian Hueske and Vasiliki Kalavri: “Stream Processing with Apache Flink”, O’Reilly Media, April 2019
文章:
- 再談 HBase 八大應用場景:https://cloud.tencent.com/developer/article/1369824
- Elasticsearch讀寫原理:https://blog.csdn.net/laoyang360/article/details/103545432
- ES文章集:https://me.csdn.net/wojiushiwo987
- MySQL和Lucene索引對比分析:https://www.cnblogs.com/luxiaoxun/p/5452502.html
- 深入淺出理解 Spark:環境部署與工作原理:https://mp.weixin.qq.com/s/IdrX4Hh1HQaJZx-VnB7XsQ
文檔:
- ES官方文檔:https://www.elastic.co/guide/index.html
- Spark官方文檔:http://spark.apache.org/docs/latest/
- Flink官方文檔:https://flink.apache.org/
分享:
- 基於Flink的高性能機器學習算法庫 https://www.bilibili.com/video/av57447841?p=4
- “Redefining Computation” https://www.bilibili.com/video/av42325467?p=3
- Flink 實時數倉的應用 https://www.bilibili.com/video/av66782142
- Flink runtime 核心機制剖析 https://www.bilibili.com/video/av42427050?p=4
- 小紅書大數據在推薦中的應用 https://mp.weixin.qq.com/s/o7JM7DDkUNuGZEGKBtAmIw
- TensorFlow 與 Apache Flink 的結合 https://www.bilibili.com/video/av60808586/