前言
我曾經面試安踏的技術崗,當時面試官問了我一個問題:如果你想使用某個新技術但是領導不願意,你怎么辦?
對於該問題我相信大家就算沒有面試被問到過,現實工作中同事之間的合作也會遇到。
因此從我的角度重新去回答這個問題,有以下幾點:
1.師出有名,在軟件工程里是針對問題場景提供解決方案的,如果脫離的實際問題(需求)去做技術選型,無疑是耍流氓。大家可以回顧身邊的“架構師”、“技術Leader”是不是拍拍腦袋做決定,問他們為什么這么做,可能連個冠冕堂皇的理由都給不出。
2.信任度,只有基於上面的條件,你才有理由建議引入新技術。領導願不願意引入新技術有很多原因:領導不了解這技術、領導偏保守、領導不是做技術的等。那么我認為這幾種都是信任度,這種信任度分人和事,人就是引入技術的提出者,事就是提出引入的技術。
3.盡人事,任何問題只是單純解決事都是簡單的,以我以往的做法,把基本資料收集全並以通俗易懂的方式歸納與講解,最好能提供一些能量化的數據,這樣更加有說服力。知識普及OK后,就可以嘗試寫方案與做個Demo,方案最好可以提供多個,可以分短期收益與長期收益的。完成上面幾點可以說已經盡人事了,如果領導還不答應那么的確有他的顧慮,就算無法落實,到目前為止的收獲也不錯。
4.復雜的是人,任何人都無法時刻站在理智與客觀的角度去看待問題,事是由人去辦的,所以同一件事由不同的人說出來的效果也不一樣。因此得學會向上管理、保持與同事之間合作融洽度,盡早的建立合作信任。本篇文章更多敘述的事,因此人方面不過多深究,有興趣的我可以介紹一本書《知行 技術人的管理之路》。
本篇我的實踐做法與上述一樣,除了4無法體現。那么下文我分了4大模塊:業務背景介紹、基礎概念講解、方案的選用與技術細節。
該篇文章不包含代碼有8000多千字,花了我3天時間寫,可能需要您花10分鍾慢慢閱讀,我承諾大家正文里面細節滿滿。
曾有朋友建議我拆開來寫,但是我的習慣還是希望以一篇文章,這樣更加系統化的展示給大家。當然大家有什么建議也可以在下方留言給我。
部分源碼,我放到了https://github.com/SkyChenSky/Sikiro 的Sikiro.ES.Api里
背景
本公司多年以來用SQL Server作為主存儲,隨着多年的業務發展,已經到了數千萬級的數據量。
而部分非核心業務原本應該超億的量級了,但是因為從物理表的設計優化上進行了數據壓縮,導致維持在一個比較穩定的數量。壓縮數據雖然能減少存儲量,優化提供一定的性能,但是同時帶來的損失了業務可擴展性。舉個例子:我們平台某個用戶擁有最后訪問作品記錄和總的閱讀時長,但是沒有某個用戶的閱讀明細,那么這樣的設計就會導致后續新增一個抽獎業務,需要在某個時間段內閱讀了多長時間或者章節數量的作品,才能參加與抽獎;或者運營想通過閱讀記錄統計或者分析出,用戶的愛好和受歡迎的作品。現有的設計對以上兩種業務情況都是無法滿足的。
此外我們平台還有作品搜索功能,like ‘%搜索%’查詢是不走索引的而走全表掃描,一張表42W全表掃描,數據庫服務器配置可以的情況下還是可以的,但是存在並發請求時候,資源消耗就特別厲害了,特別是在偶爾被爬蟲爬取數據。(我們平台API的並發峰值能達到8w/s,每天的接口在淡季請求次數達到了1億1千萬)
關系型數據庫擁有ACID特性,能通過金融級的事務達成數據的一致性,然而它卻沒有橫向擴展性,只要在海量數據場景下,單實例,無論怎么在關系型數據庫做優化,都是只是治標。而NoSQL的出現很好的彌補了關系型數據庫的短板,在馬丁福勒所著的《NoSQL精粹》對NoSQL進行了分類:文檔型、圖形、列式,鍵值,從我的角度其實可以把搜索引擎納入NoSQL范疇,因為它的確滿足的NoSQL的4大特性:易擴展、大數據量高性能、靈活的數據模型、高可用。我看過一些同行的見解,把Elasticsearch歸為文檔型NoSQL,我個人是沒有給他下過於明確的定義,這個上面說法大家見仁見智。
MongoDB作為文檔型數據庫也屬於我的技術選型范圍,它的讀寫性能高且平衡、數據分片與橫向擴展等都非常適合我們平台部分場景,最后我還是選擇Elasticsearch。原因有三:
- 我們運維相比於MongoDB更熟悉Elasticsearch。
- 我們接下來有一些統計報表類的需求,Elastic Stack的各種工具能很好滿足我們的需求。
- 我們目前着手處理的場景以非實時、純讀為主的業務,Elasticsearch近實時搜索已經能滿足我們。
Elasticsearch優缺點
百度百科 :
Elasticsearch是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch由Java語言開發的,是一種流行的企業級搜索引擎。Elasticsearch用於雲計算中,能夠達到實時搜索,穩定,可靠,快速,安裝使用方便。官方客戶端在Java、.NET(C#)、PHP、Python、Apache Groovy、Ruby和許多其他語言中都是可用的。
對於滿足當下的業務需求和未來支持海量數據的搜索,我選擇了Elasticsearch,其實原因主要以下幾點:
優點 | 描述 |
橫向可擴展性 | 可單機、可集群,橫向擴展非常簡單方便,自動整理數據分片 |
快 | 索引被分為多個分片(Shard),利用多台服務器,使用了分而治之的思想提升處理效率 |
支持搜索多樣化 | 與傳統關系型數據庫相比,ES提供了全文檢索、同義詞處理、相關度排名、復雜數據分析、海量數據的近實時處理等功能 |
高可用 | 提供副本(Replica)機制,一個分片可以設置多個副本,假如某服務器宕機后,集群仍能正常工作。 |
開箱即用 | 簡易的運維部署,提供基於Restful API,多種語言的SDK |
那么我個人認為Elasticsearch比較大的缺點只有 吃內存,具體原因可以看下文內存讀取部分。
Elasticsearch為什么快?
我個人對於Elasticsearch快的原因主要總結三點:
- 內存讀取
- 多種索引
- 倒排索引
- doc values
- 集群分片
內存讀取
Elasticsearch是基於Lucene, 而Lucene被設計為可以利用操作系統底層機制來緩存內存數據結構,換句話說Elasticsearch是依賴於操作系統底層的 Filesystem Cache,查詢時,操作系統會將磁盤文件里的數據自動緩存到 Filesystem Cache 里面去,因此要求Elasticsearch性能足夠高,那么就需要服務器的提供的足夠內存給Filesystem Cache 覆蓋存儲的數據。
上一段最后一句話什么意思呢?假如:Elasticsearch 節點有 3 台服務器各64G內存,3台總內存就是 64 * 3 = 192G。每台機器給 Elasticsearch jvm heap 是 32G,那么每服務器留給 Filesystem Cache 的就是 32G(50%),而集群里的 Filesystem Cache 的就是 32 * 3 = 96G 內存。此時,在 3 台Elasticsearch服務器共占用了 1T 的磁盤容量,那么每台機器的數據量約等於 341G,意味着每台服務器只有大概10分之1數據是緩存在內存的,其余都得走硬盤。
說到這里大家未必會有一個直觀得認識,因此我從《大型網站技術架構:核心原理與案例分析》第36頁摳了一張表格下來:
操作 | 響應時間 |
打開一個網站 | 幾秒 |
在數據庫中查詢一條記錄(有索引) | 十幾毫秒 |
機械磁盤一次尋址定位 | 4毫秒 |
從機械磁盤順序讀取1MB數據 | 2毫秒 |
從SSD磁盤順序讀取1MB數據 | 0.3毫秒 |
從遠程分布式緩存Redis讀取一個數據 | 0.5毫秒 |
從內存中讀取1MB數據 | 十幾微秒 |
Java程序本地方法調用 | 幾微秒 |
網絡傳輸2KB數據 | 1微秒 |
從上圖加粗項看出,內存讀取性能是機械磁盤的200倍,是SSD磁盤約等於30倍,假如讀一次Elasticsearch走內存場景下耗時20毫秒,那么走機械硬盤就得4秒,走SSD磁盤可能約等於0.6秒。講到這里我相信大家對是否走內存的性能差異有一個直觀的認識。
對於Elasticsearch有很多種索引類型,但是我認為核心主要是倒排索引和doc values
倒排索引
Lucene將寫入索引的所有信息組織為倒排索引(inverted index)的結構形式。倒排索引是一種將分詞映射到文檔的數據結構,可以認為倒排索引是面向分詞的而不是面向文檔的。
假設在測試環境的Elasticsearch存放了有以下三個文檔:
- Elasticsearch Server(文檔1)
- Masterring Elasticsearch(文檔2)
- Apache Solr 4 Cookbook(文檔3)
以上文檔索引建好后,簡略顯示如下:
詞項 | 數量 | 文檔 |
4 | 1 | <3> |
Apache | 1 | <3> |
Cookbook | 1 | <3> |
Elasticsearch | 2 | <1><2> |
Mastering | 1 | <1> |
Server | 1 | <1> |
Solr | 1 | <3> |
如上表格所示,每個詞項指向該詞項所出現過的文檔位置,這種索引結構允許快速、有效的搜索出數據。
doc values
對於分組、聚合、排序等某些功能來說,倒排索引的方式並不是最佳選擇,這類功能操作的是文檔而不是詞項,這個時候就得把倒排索引逆轉過來成正排索引,這么做會有兩個缺點:
- 構建時間長
- 內存占用大,易OutOfMemory,且影響垃圾回收
Lucene 4.0之后版本引入了doc values和額外的數據結構來解決上面得問題,目前有五種類型的doc values:NUMERIC、BINARY、SORTED、SORTED_SET、SORTED_NUMERIC,針對每種類型Lucene都有特定的壓縮方法。
doc values是列式存儲的正排索引,通過docID可以快速讀取到該doc的特定字段的值,列式存儲存儲對於聚合計算有非常高的性能。
集群分片
Elasticsearch可以簡單、快速利用多節點服務器形成集群,以此分攤服務器的執行壓力。
此外數據可以進行分片存儲,搜索時並發到不同服務器上的主分片進行搜索。
這里可以簡單講述下Elasticsearch查詢原理,Elasticsearch的查詢分兩個階段:分散階段與合並階段。
任意一個Elasticsearch節點都可以接受客戶端的請求。接受到請求后,就是分散階段,並行發送子查詢給其他節點;
然后是合並階段,則從眾多分片中收集返回結果,然后對他們進行合並、排序、取長等后續操作。最終將結果返回給客戶端。
機制如下圖:
分頁深度陷阱
基於以上查詢的原理,擴展一個分頁深度的問題。
現需要查頁長為10、第100頁的數據,實際上是會把每個 Shard 上存儲的前 1000(10*100) 條數據都查到一個協調節點上。如果有 5 個 Shard,那么就有 5000 條數據,接着協調節點對這 5000 條數據進行一些合並、處理,再獲取到最終第 100 頁的 10 條數據。也就是實際上查的數據總量為pageSize*pageIndex*shard,頁數越深則查詢的越慢。因此ElasticSearch也會有要求,每次查詢出來的數據總數不會返回超過10000條。
那么從業務上盡可能跟產品溝通避免分頁跳轉,使用滾動加載。而Elasticsearch使用的相關技術是search_after、scroll_id。
ElasticSearch與數據庫基本概念對比
ElasticSearch |
RDBMS |
Index |
表 |
Document |
行 |
Field |
列 |
Mapping |
表結構 |
在Elasticsearch 7.0版本之前(<7.0),有type的概念,而Elasticsearch和關系型數據庫的關系是,index = database、type = table,但是在Elasticsearch 7.0版本后(>=7.0)弱化了type默認為_doc,而官方會在8.0之后會徹底移除type。
服務器選型
在官方文檔(https://www.elastic.co/guide/cn/elasticsearch/guide/current/heap-sizing.html)里建議Elasticsearch JVM Heap最大為32G,同時不超過服務器內存的一半,也就是說內存分別為128G和64G的服務器,JVM Heap最大只需要設置32G;而32G服務器,則建議JVM Heap最大16G,剩余的內存將會給到Filesystem Cache充分使用。如果不需要對分詞字符串做聚合計算(例如,不需要 fielddata )可以考慮降低JVM Heap。JVM Heap越小,會導致Elasticsearch的GC頻率更高,但Lucene就可以的使用更多的內存,這樣性能就會更高。
對於我們公司的未來新增業務還會有收集用戶的訪問記錄來統計PV(page view)、UV(user view),有一定的聚合計算,經過多方便的考慮與討論,平衡成本與需求后選擇了騰訊雲的三台配置為CPU 16核、內存64G,SSD雲硬盤的服務器,並給與Elasticsearch 配置JVM Heap = 32G。
需求場景選擇
Elasticsearch在本公司系統的可使用場景非常多,但是作為第一次引入因慎重選擇,給與開發與運維一定的時間熟悉與觀察。
經過商討,選擇了兩個業務場景,用戶閱讀作品的記錄明細與作品搜索,選擇這兩個業務場景原因如下:
- 寫場景
- 我們平台的用戶黏度比較高,閱讀作品是一個高頻率的調用,因此用戶閱讀作品的記錄明細可在短時間內造成海量數據的場景。(現一個月已達到了70G的數據量,共1億1千萬條)
- 讀場景
- 閱讀記錄需提供給未來新增的抽獎業務使用,可從閱讀章節數、閱讀時長等進行搜索計算。
- 作品搜索原有實現是通過關系型數據庫like查詢,已是具有潛在的性能問題與資源消耗的業務場景
對於上述兩個業務,用戶閱讀作品的記錄明細與抽獎業務屬於新增業務,對於在投入成本相對較少,也無需過多的需要兼容舊業務的壓力。
而作品搜索業務屬於優化改造,得保證兼容原有的用戶搜索習慣前提下,新增拼音搜索。同時最好以擴展的方式,盡可能的減少代碼修改范圍,如果使用效果不好,隨時可以回滾到舊的實現方式。
設計方案
共性設計
我使用.Net 5 WebApi將Elasticsearch封裝成ES業務服務API,這樣的做法主要用來隱藏技術細節(時區、分詞器、類型轉換等),暴露粗粒度的讀寫接口。這種做法在馬丁福勒所著的《NoSQL精粹》稱把數據庫視為“應用程序數據庫”,簡單來說就是只能通過應用間接的訪問存儲,對於這個應用由一個團隊負責維護開發,也只有這個團隊才知道存儲的結構。這樣通過封裝的API服務解耦了外部API服務與存儲,調用方就無需過多關注存儲的特性,像Mongodb與Elasticsearch這種無模式的存儲,無需優先定義結構,換而言之就是對於存儲已有結構可隨意修改擴展,那么“應用程序數據庫”的做法也避免了其他團隊無意侵入的修改。
考慮到現在業務需求復雜度相對簡單,MQ消費端也一起集成到ES業務服務,若后續MQ消費業務持續增多,再考慮把MQ消費業務抽離到一個(或多個的)消費端進程。
目前以同步讀、同步寫、異步寫的三種交互方式,進行與其他服務通信。
閱讀記錄明細
本需求是完全新增,因此引入相對簡單,只需要在【平台API】使用【RabbitMQ】進行解耦,使用異步方式寫入Elasticsearch,使用隊列除了用來解耦,還對此用來緩沖高並發寫壓力的情況。
對於后續新增的業務例如抽獎服務,則只需要通過RPC框架對接ES業務API,以同步讀取的方式查詢數據。
作品搜索
對於該業務,我第一反應采用CQRS的思想,原有的寫入邏輯我無需過多的關注與了解,因此我只需要想辦法把關系型數據庫的數據同步到Elasticsearch,然后提供業務查詢API替換原有平台API的數據源即可。
那么數據同步則一般都是分推和拉兩種方式。
推
推的實時性無疑是比拉要高,只需增量的推送做寫入的數據(增、刪、改)即可,無論是從性能、資源利用、時效各方面來看都比拉更有效。
實施該方案,可以選擇Debezium和SQL Server開啟CDC功能。
Debezium由RedHat開源的,同時需要依賴於kafka的,一個將多種數據源實時變更數據捕獲,形成數據流輸出的開源工具,同類產品有Canal, DataBus, Maxwell。
CDC全稱Change Data Capture,直接翻譯過來為變更數據捕獲,核心為監測服務捕獲數據庫的寫操作(插入,更新,刪除),將這些變更按發生的順序完整記錄下來。
我個人在我博客文章多次強調架構設計的輸入核心為兩點:滿足需求與組織架構,在滿足需求的前提應優先選擇簡單、合適的方案。技術選型應需要考慮自己的團隊是否可以支撐。在上述無論是額外加入Debezium和kafka,還是需要針對SQL Server開啟CDC都超出了我們運維所能承受的極限,引入新的中間件和技術是需要試錯的,而試錯是需要額外高的成本,在未知的情況下引入更多的未知,只會造成更大的成本和不可控。
拉
拉無疑是最簡單最合適的實現方式,只需要使用調度任務服務,每隔段時間定時去從數據庫拉取數據寫入到Elasticsearch就可。
然而拉取數據,分全量同步與增量同步:
對於增量同步,只需要每次查詢數據源Select * From Table_A Where RowVersion > LastUpdateVersion,則可以過濾出需要同步的數據。但是這個方式有點致命的缺點,數據源已被刪除的數據是無法查詢出來的,如果把Elasticsearch反向去跟SQL Server數據做對比又是一件比較愚蠢的方式,因此只能放棄該方式。
而全量同步,只要每次從SQL Server數據源全量新增到Elasticsearch,並替換舊的Elasticsearch的Index,因此該方案得全刪全增。但是這里又引申出新的問題,如果先刪后增,那么在刪除后再新增的這段真空期怎么辦?假如有5分鍾的真空期是沒有數據,用戶就無法使用搜索功能。那么只能先增后刪,先新增到一個Index_Temp,全量新增完后,把原有Index改名成Index_Delete,然后再把Index_Temp改成Index,最后把Index_Delete刪除。這么一套操作下來,有沒有覺得很繁瑣很費勁?Elasticsearch有一個叫別名(Aliases)的功能,別名可以一對多的指向多個Index,也可以以原子性的進行別名指向Index的切換,具體實現可以看下文。
閱讀記錄實現細節
實體定義
優先定義了個抽象類ElasticsearchEntity進行復用,對於實體定義有三個注意的細節點:
1.對於ElasticsearchEntity我定義兩個屬性_id與Timestamp,Elasticsearch是無模式的(無需預定義結構),如果實體本身沒有_id,寫入到Elasticsearch會自動生成一個_id,為了后續的使用便捷性,我仍然自主定義了一個。
2.基於上述的分頁深度的問題,因此在后續涉及的業務盡可能會以search_after+滾動加載的方式落實到我們的業務。原本我們只需要使用DateTime類型的字段用DateTime.Now記錄后,再使用search_after后會自動把DateTime類型字段轉換成毫秒級的Timestamp,但是我在實現demo的時候,去制造數據,在程序里以for循環new數據的時候,發現生成的速度會在微秒級之間,那么假設用毫秒級的Timestamp進行search_after過濾,同一個毫秒有4、5條數據,那么容易在使用滾動加載時候少加載了幾條數據,這樣就到導致數據返回不准確了。因此我擴展了個[DateTime.Now.DateTimeToTimestampOfMicrosecond()]生成微秒級的Timestamp,以此盡可能減少出現漏加載數據的情況。
3.對於Elasticsearch的操作實體的日期時間類型均以DateTimeOffset類型聲明,因為Elasticsearch存儲的是UTC時間,而且會因為Http請求的日期格式不同導致存放的日期時間也會有所偏差,為了避免日期問題使用DateTimeOffset類型是一種保險的做法。而對於WebAPI 接口或者MQ的Message接受的時間類型可以使用DateTime類型,DTO(傳輸對象)與DO(持久化對象)使用Mapster或者AutoMapper類似的對象映射工具進行轉換即可(注意DateTimeOffset轉DateTime得定義轉換規則 [TypeAdapterConfig<DateTimeOffset, DateTime>.NewConfig().MapWith(dateTimeOffset => dateTimeOffset.LocalDateTime)])。
如此一來,把Elasticsearch操作細節隱藏在WebAPI里,以友好、簡單的接口暴露給開發者使用,降低了開發者對技術細節認知負擔。
[ElasticsearchType(RelationName = "user_view_duration")] public class UserViewDuration : ElasticsearchEntity { /// <summary> /// 作品ID /// </summary> [Number(NumberType.Long, Name = "entity_id")] public long EntityId { get; set; } /// <summary> /// 作品類型 /// </summary> [Number(NumberType.Long, Name = "entity_type")] public long EntityType { get; set; } /// <summary> /// 章節ID /// </summary> [Number(NumberType.Long, Name = "charpter_id")] public long CharpterId { get; set; } /// <summary> /// 用戶ID /// </summary> [Number(NumberType.Long, Name = "user_id")] public long UserId { get; set; } /// <summary> /// 創建時間 /// </summary> [Date(Name = "create_datetime")] public DateTimeOffset CreateDateTime { get; set; } /// <summary> /// 時長 /// </summary> [Number(NumberType.Long, Name = "duration")] public long Duration { get; set; } /// <summary> /// IP /// </summary> [Ip(Name = "Ip")] public string Ip { get; set; } }
public abstract class ElasticsearchEntity { private Guid? _id; public Guid Id { get { _id ??= Guid.NewGuid(); return _id.Value; } set => _id = value; } private long? _timestamp; [Number(NumberType.Long, Name = "timestamp")] public long Timestamp { get { _timestamp ??= DateTime.Now.DateTimeToTimestampOfMicrosecond(); return _timestamp.Value; } set => _timestamp = value; } }
異步寫入
對於異步寫入有兩個細節點:
1.該數據從RabbtiMQ訂閱消費寫入到Elasticsearch,從下面代碼可以看出,我刻意以月的維度建立Index,格式為 userviewrecord-2021-12,這么做的目的是為了方便管理Index和資源利用,有需要的情況下會刪除舊的Index。
2.消息訂閱與WebAPI暫時集成到同一個進程,這樣做主要是開發、部署都方便,如果后續訂閱多了,在把消息訂閱相關的業務抽離到獨立的進程。
按需演變,避免過度設計
訂閱消費邏輯
public class UserViewDurationConsumer : BaseConsumer<UserViewDurationMessage> { private readonly ElasticClient _elasticClient; public UserViewDurationConsumer(ElasticClient elasticClient) { _elasticClient = elasticClient; } public override void Excute(UserViewDurationMessage msg) { var document = msg.MapTo<Entity.UserViewDuration>(); var result = _elasticClient.Create(document, a => a.Index(typeof(Entity.UserViewDuration).GetRelationName() + "-" + msg.CreateDateTime.ToString("yyyy-MM"))).GetApiResult(); if (result.Failed) LoggerHelper.WriteToFile(result.Message); } }
/// <summary> /// 訂閱消費 /// </summary> public static class ConsumerExtension { public static IApplicationBuilder UseSubscribe<T, TConsumer>(this IApplicationBuilder appBuilder, IHostApplicationLifetime lifetime) where T : EasyNetQEntity, new() where TConsumer : BaseConsumer<T> { var bus = appBuilder.ApplicationServices.GetRequiredService<IBus>(); var consumer = appBuilder.ApplicationServices.GetRequiredService<TConsumer>(); lifetime.ApplicationStarted.Register(() => { bus.Subscribe<T>(msg => consumer.Excute(msg)); }); lifetime.ApplicationStopped.Register(() => bus?.Dispose()); return appBuilder; } }
訂閱與注入
public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } public void ConfigureServices(IServiceCollection services) { ...... } public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IHostApplicationLifetime lifetime) { app.UseAllElasticApm(Configuration); app.UseHealthChecks("/health"); app.UseDeveloperExceptionPage(); app.UseSwagger(); app.UseSwaggerUI(c => { c.SwaggerEndpoint("/swagger/v1/swagger.json", "SF.ES.Api v1"); c.RoutePrefix = ""; }); app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); app.UseSubscribe<UserViewDurationMessage, UserViewDurationConsumer>(lifetime); } }
查詢接口
查詢接口此處有兩個細節點:
1.如果不確定月份,則使用通配符查詢userviewrecord-*,當然有需要的也可以使用別名處理。
2.因為Elasticsearch是記錄UTC時間,因此時間查詢得指定TimeZone。
[HttpGet]
[Route("record")] public ApiResult<List<UserMarkRecordGetRecordResponse>> GetRecord([FromQuery] UserViewDurationRecordGetRequest request) { var dataList = new List<UserMarkRecordGetRecordResponse>(); string dateTime; if (request.BeginDateTime.HasValue && request.EndDateTime.HasValue) { var month = request.EndDateTime.Value.DifferMonth(request.BeginDateTime.Value); if(month <= 0 ) dateTime = request.BeginDateTime.Value.ToString("yyyy-MM"); else dateTime = "*"; } else dateTime = "*"; var mustQuerys = new List<Func<QueryContainerDescriptor<UserViewDuration>, QueryContainer>>(); if (request.UserId.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.UserId).Value(request.UserId.Value))); if (request.EntityType.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value(request.EntityType))); if (request.EntityId.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityId).Value(request.EntityId.Value))); if (request.CharpterId.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.CharpterId).Value(request.CharpterId.Value))); if (request.BeginDateTime.HasValue) mustQuerys.Add(a => a.DateRange(dr => dr.Field(f => f.CreateDateTime).GreaterThanOrEquals(request.BeginDateTime.Value).TimeZone(EsConst.TimeZone))); if (request.EndDateTime.HasValue) mustQuerys.Add(a => a.DateRange(dr => dr.Field(f => f.CreateDateTime).LessThanOrEquals(request.EndDateTime.Value).TimeZone(EsConst.TimeZone))); var searchResult = _elasticClient.Search<UserViewDuration>(a => a.Index(typeof(UserViewDuration).GetRelationName() + "-" + dateTime) .Size(request.Size) .Query(q => q.Bool(b => b.Must(mustQuerys))) .SearchAfterTimestamp(request.Timestamp) .Sort(s => s.Field(f => f.Timestamp, SortOrder.Descending))); var apiResult = searchResult.GetApiResult<UserViewDuration, List<UserMarkRecordGetRecordResponse>>(); if (apiResult.Success) dataList.AddRange(apiResult.Data); return ApiResult<List<UserMarkRecordGetRecordResponse>>.IsSuccess(dataList); }
作品搜索實現細節
實體定義
SearchKey是原有SQL Server的數據,現需要同步到Elasticsearch,仍是繼承抽象類ElasticsearchEntity實體定義,同時這里有三個細節點:
1. public string KeyName,我定義的是Text類型,在Elasticsearch使用Text類型才會分詞。
2.在實體定義我沒有給KeyName指定分詞器,因為我會使用兩個分詞器:拼音和默認分詞,而我會在批量寫入數據創建Mapping時定義。
3.實體里的 public List<int> SysTagId 與SearchKey在SQL Server是兩張不同的物理表,是一對多的關系,在代碼表示如下,但是在關系型數據庫是無法與之對應和體現的,這就是咱們所說的“阻抗失配”,但是能在以文檔型存儲系統(MongoDB、Elasticsearch)里很好的解決這個問題,可以以一個聚合的方式寫入,避免多次查詢關聯。
[ElasticsearchType(RelationName = "search_key")] public class SearchKey : ElasticsearchEntity { [Number(NumberType.Integer, Name = "key_id")] public int KeyId { get; set; } [Number(NumberType.Integer, Name = "entity_id")] public int EntityId { get; set; } [Number(NumberType.Integer, Name = "entity_type")] public int EntityType { get; set; } [Text(Name = "key_name")] public string KeyName { get; set; } [Number(NumberType.Integer, Name = "weight")] public int Weight { get; set; } [Boolean(Name = "is_subsidiary")] public bool IsSubsidiary { get; set; } [Date(Name = "active_date")] public DateTimeOffset? ActiveDate { get; set; } [Number(NumberType.Integer, Name = "sys_tag_id")] public List<int> SysTagId { get; set; } }
數據同步
數據同步我采用了Quartz.Net定時調度任務框架,因此時效不高,所以每4小時同步一次即可,有42W多的數據,分批進行同步,每次查詢1000條數據同時進行一次批量寫入。全量同步一次的時間大概2分鍾。因此使用RPC調用[ES業務API服務]。
因為具體業務邏輯已經封裝在[ES業務API服務],因此同步邏輯也相對簡單,查詢出SQL Server數據源、聚合整理、調用[ES業務API服務]的批量寫入接口、重新綁定別名到新的Index。
[DisallowConcurrentExecution] public class SearchKeySynchronousJob : BaseJob { public override void Execute() { var rm = SFNovelReadManager.Instance(); var maxId = 0; var size = 1000; string indexName = ""; while (true) { //避免一次性全部查詢出來,每1000條一次寫入。 var searchKey = sm.searchKey.GetList(size, maxId); if (!searchKey.Any()) break; var entityIds = searchKey.Select(a => a.EntityID).Distinct().ToList(); var sysTagRecord = rm.Novel.GetSysTagRecord(entityIds); var items = searchKey.Select(a => new SearchKeyPostItem { Weight = a.Weight, EntityType = a.EntityType, EntityId = a.EntityID, IsSubsidiary = a.IsSubsidiary ?? false, KeyName = a.KeyName, ActiveDate = a.ActiveDate, SysTagId = sysTagRecord.Where(c => c.EntityID == a.EntityID).Select(c => c.SysTagID).ToList(), KeyID = a.KeyID }).ToList(); //以一個聚合寫入到ES var postResult = new SearchKeyPostRequest { IndexName = indexName, Items = items }.Excute(); if (postResult.Success) { indexName = (string)postResult.Data; maxId = searchKey.Max(a => a.KeyID); } } //別名從舊Index指向新的Index,最后刪除舊Index var renameResult = new SearchKeyRenameRequest { IndexName = indexName }.Excute(); } } }
業務API接口
批量新增接口這里有2個細節點:
1.在第一次有數據進來的時候需要創建Mapping,因為得對KeyName字段定義分詞器,其余字段都可以使用AutoMap即可。
2.新創建的Index名稱是精確到秒的 SearchKey-202112261121
/// <summary> /// 批量新增作品搜索列表(返回創建的indexName) /// </summary> /// <param name="request"></param> /// <returns></returns> [HttpPost] public ApiResult Post(SearchKeyPostRequest request) { if (!request.Items.Any()) return ApiResult.IsFailed("無傳入數據"); var date = DateTime.Now; var relationName = typeof(SearchKey).GetRelationName(); var indexName = request.IndexName.IsNullOrWhiteSpace() ? (relationName + "-" + date.ToString("yyyyMMddHHmmss")) : request.IndexName; if (request.IndexName.IsNullOrWhiteSpace()) { var createResult = _elasticClient.Indices.Create(indexName, a => a.Map<SearchKey>(m => m.AutoMap().Properties(p => p.Custom(new TextProperty { Name = "key_name", Analyzer = "standard", Fields = new Properties(new Dictionary<PropertyName, IProperty> { { new PropertyName("pinyin"),new TextProperty{ Analyzer = "pinyin"} }, { new PropertyName("standard"),new TextProperty{ Analyzer = "standard"} } }) })))); if (!createResult.IsValid && request.IndexName.IsNullOrWhiteSpace()) return ApiResult.IsFailed("創建索引失敗"); } var document = request.Items.MapTo<List<SearchKey>>(); var result = _elasticClient.BulkAll(indexName, document); return result ? ApiResult.IsSuccess(data: indexName) : ApiResult.IsFailed(); }
重新綁定別名接口這里有4個細節點:
1.別名使用searchkey,只會有一個Index[searchkey-yyyyMMddHHmmss]會跟searchkey綁定.
2.優先把已綁定的Index查詢出來,方便解綁與刪除。
3.別名綁定在Elasticsearch雖然是原子性的,但是不是數據一致性的,因此得先Add后Remove。
4.刪除舊得Index免得占用過多資源。
/// <summary> /// 重新綁定別名 /// </summary> /// <returns></returns> [HttpPut] public ApiResult Rename(SearchKeyRanameRequest request) { var aliasName = typeof(SearchKey).GetRelationName(); var getAliasResult = _elasticClient.Indices.GetAlias(aliasName); //給新index指定別名 var bulkAliasRequest = new BulkAliasRequest { Actions = new List<IAliasAction> { new AliasAddDescriptor().Index(request.IndexName).Alias(aliasName) } }; //移除別名里舊的索引 if (getAliasResult.IsValid) { var indeNameList = getAliasResult.Indices.Keys; foreach (var indexName in indeNameList) { bulkAliasRequest.Actions.Add(new AliasRemoveDescriptor().Index(indexName.Name).Alias(aliasName)); } } var result = _elasticClient.Indices.BulkAlias(bulkAliasRequest); //刪除舊的index if (getAliasResult.IsValid) { var indeNameList = getAliasResult.Indices.Keys; foreach (var indexName in indeNameList) { _elasticClient.Indices.Delete(indexName); } } return result != null && result.ApiCall.Success ? ApiResult.IsSuccess() : ApiResult.IsFailed(); }
查詢接口這里跟前面細節得差不多:
但是這里有一個得特別注意的點,可以看到這個查詢接口同時使用了should和must,這里得設置minimumShouldMatch才能正常像SQL過濾。
should可以理解成SQL的Or,Must可以理解成SQL的And。
默認情況下minimumShouldMatch是等於0的,等於0的意思是,should不命中任何的數據仍然會返回must命中的數據,也就是你們可能想搜索(keyname.pinyin=’chengong‘ or keyname.standard=’chengong‘) and id > 0,但是es里沒有存keyname='chengong'的數據,會把id> 0 而且 keyname != 'chengong' 數據給查詢出來。
因此我們得對minimumShouldMatch=1,就是should條件必須得任意命中一個才能返回結果。
在should和must混用的情況下必須得注意minimumShouldMatch的設置!!!
/// <summary> /// 作品搜索列表 /// </summary> /// <param name="request"></param> /// <returns></returns> [HttpPost] [Route("search")] public ApiResult<List<SearchKeyGetResponse>> Get(SearchKeyGetRequest request) { var shouldQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>>(); int minimumShouldMatch = 0; if (!request.KeyName.IsNullOrWhiteSpace()) { shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.pinyin").Query(request.KeyName))); shouldQuerys.Add(a => a.MatchPhrase(m => m.Field("key_name.standard").Query(request.KeyName))); minimumShouldMatch = 1; } var mustQuerys = new List<Func<QueryContainerDescriptor<SearchKey>, QueryContainer>> { a => a.Range(t => t.Field(f => f.Weight).GreaterThanOrEquals(0)) }; if (request.IsSubsidiary.HasValue) mustQuerys.Add(a => a.Term(t => t.Field(f => f.IsSubsidiary).Value(request.IsSubsidiary.Value))); if (request.SysTagIds != null && request.SysTagIds.Any()) mustQuerys.Add(a => a.Terms(t => t.Field(f => f.SysTagId).Terms(request.SysTagIds))); if (request.EntityType.HasValue) { if (request.EntityType.Value == ESearchKey.EntityType.AllNovel) { mustQuerys.Add(a => a.Terms(t => t.Field(f => f.EntityType).Terms(ESearchKey.EntityType.Novel, ESearchKey.EntityType.ChatNovel, ESearchKey.EntityType.FanNovel))); } else mustQuerys.Add(a => a.Term(t => t.Field(f => f.EntityType).Value((int)request.EntityType.Value))); } var sortDescriptor = new SortDescriptor<SearchKey>(); sortDescriptor = request.Sort == ESearchKey.Sort.Weight ? sortDescriptor.Field(f => f.Weight, SortOrder.Descending) : sortDescriptor.Field(f => f.ActiveDate, SortOrder.Descending); var searchResult = _elasticClient.Search<SearchKey>(a => a.Index(typeof(SearchKey).GetRelationName()) .From(request.Size * request.Page) .Size(request.Size) .Query(q => q.Bool(b => b.Should(shouldQuerys).Must(mustQuerys).MinimumShouldMatch(minimumShouldMatch))) .Sort(s => sortDescriptor)); var apiResult = searchResult.GetApiResult<SearchKey, List<SearchKeyGetResponse>>(); if (apiResult.Success) return apiResult; return ApiResult<List<SearchKeyGetResponse>>.IsSuccess("空集合數據"); }
APM監控
雖然在上面我做了足夠的實現准備,但是對於上生產后的實際使用效果我還是希望有一個直觀的體現。我之前寫了一篇文章《.Net微服務實戰之可觀測性》很好敘述了該種情況,有興趣的可以移步去看看。
在之前公司做微服務的時候的APM選型我們使用了Skywalking,但是現在這家公司的運維沒有接觸過,但是對於Elastic Stack他相對比較熟悉,如同上文所說架構設計的輸入核心為兩點:滿足需求與組織架構,秉着我的技術選型原則是基於團隊架構,我們采用了Elastic APM + Kibana(7.4版本),如下圖所示:
結尾
最后上生產的時候也是平滑無損的切換到Elasticsearch,總體情況都十分滿意。從上圖可以見Sql Server的負載從40%-90%降低到了20%左右。
本篇文章的信息量作為作者的我也認為非常的多,非常感謝各位讀者抽出您寶貴的時間閱讀完,如果您對此篇文章有任何疑問,隨時可以通過右上角的聯系方式與我交流(QQ或者微信)。