.NetCore下ES查詢驅動 PlainElastic .Net 升級官方驅動 Elasticsearch .Net


1.背景

由於歷史原因,筆者所在的公司原有的ES查詢驅動采用的是 PlainElastic.Net, 經過詢問原來是之前PlainElastic.Net在園子里文檔較多,上手比較容易,所以最初作者選用了該驅動,而發布也由於歷史原因都部署在 windows 服務器上,基於 .NET Framework開發。

后來由於遷移 .NET CORE 平台的需要,對代碼進行了升級,同時部署平台也遷移至 CentOS7 服務器,升級過程比較順利,由於沒有使用特殊API,所以幾乎沒有對業務代碼做更多的修改,同時測試階段由於沒有多余的機器,仍然放在了原有Windows服務器上做的測試,一切都沒有問題,完美上線。

事發突然,某天接到運維部門反饋,部署查詢服務的機器突然出現 TCP 連接數超高的問題,同時這台機器其他的TCP服務也無法建立新的連接,但已經建立的連接不受影響。聯想到 ElasticSearch 查詢服務是基於HTTP 請求的,腦子里馬上聯想到 .NET Core 下 HttpClient 如果每次訪問都創建新實例,則會每次都建立新的TCP連接,而 Linux 對已釋放端口回收的時間窗口,會導致在高並發情況下,客戶端機器端口占用持續增加,同時被調用服務端連接數也會持續增加。

基於此猜測,立馬去扒了一下PlainElastic.Net源代碼:

源碼地址:https://github.com/Yegoroff/PlainElastic.Net/blob/master/src/PlainElastic.Net/Connection/ElasticConnection.cs
果然如猜測的那樣,每次都創建了新的 HttpWebRequest 實例,看了作者的最后維護時間也已經是3年前了,可能是后來官方驅動日趨完善,作者也便停止了維護。

既然如此,那么讓我們看下官方最新驅動源碼是否如我們想象,是基於HttpClientFactory來解決這個問題的?

源碼地址:https://github.com/elastic/elasticsearch-net/blob/master/src/Elasticsearch.Net/Connection/HttpConnection.cs

上述代碼看來,官方驅動並非是采用微軟官方建議的 HttpClientFactory ,而是官方底層自己維護的一個線程安全的字典來管理 HttpClient 實例池,雖是自己實現,但效果一樣:相同地址的請求,是鏈接復用的,這樣就解決不斷開啟 TCP 連接的問題。

問題找到,立馬進行驅動升級:

2.驅動升級

說明: ElasticSearch.Net官方驅動地址:https://www.elastic.co/guide/en/elasticsearch/client/net-api/6.x/index.html

官方驅動分為 Low Level Client 和 NEST(Heigh Level Client),其中Low Level Client 僅僅做了最基本的封裝,幾乎等價於HTTP原生調用,帶來了極大的靈活性的同時,也帶來使用成本,而對於開發人員來說使用 NEST 提供的更加高級的API,可以更加快速的進行開發工作,也同時可以利用到 .NET 所提供的各種語法糖,比如 => 表達式。

話不多說,看示例代碼:

實例創建

public ElasticService()
{
    var uris = new Uri[] { new Uri("http://172.17.78.111:9200"), new Uri("http://172.17.78.112:9200") }; //支持多個節點
    var connectionPool = new SniffingConnectionPool(uris);
    var settings = new ConnectionSettings(connectionPool).DefaultIndex("testindex");//注意index不可以大寫
    settings.BasicAuthentication("", ""); //設置賬號密碼,沒有可以跳過
    this._client = new ElasticClient(settings);
}

插入待測試數據

public class People 
{
    public Guid Id { get; set; }
    public string Name { get; set; }
    public int Age { get; set; }
    public DateTime Birthday { get; set; }
    public bool Gender { get; set; }
    public string Address { get; set; }
    public DateTime CreateTime { get; set; } = DateTime.Now;
}

//批量插入
public async Task<IBulkResponse> AddPeopleAsync(People[] peoples)
{
    var descriptor = new BulkDescriptor();
    foreach (var p in peoples)
    {
        var response = await _client.IndexDocumentAsync(p);
        descriptor.Index<People>(op => op.Document(p));
    }
    return await _client.BulkAsync(descriptor);//批量插入
}

多查詢條件拼接

public QueryContainer BuildQueryContainer(SearchCondition condition)
{
    var queryCombin = new List<Func<QueryContainerDescriptor<People>, QueryContainer>>();
    if (!string.IsNullOrEmpty(condition.Name))
        queryCombin.Add(mt => mt.Match(m => m.Field(t => t.Name).Query(condition.Name))); //字符串匹配

    if (condition.Age.HasValue)
        queryCombin.Add(mt => mt.Range(m => m.Field(t => t.Address).GreaterThanOrEquals(condition.Age))); //數值區間匹配

    if (!string.IsNullOrEmpty(condition.Address))
        queryCombin.Add(mt => mt.MatchPhrase(m => m.Field(t => t.Address).Query(condition.Address))); //短語匹配

    if (!condition.Gender.HasValue)
        queryCombin.Add(mt => mt.Term(m => m.Field(t => t.Gender).Value(condition.Gender)));//精確匹配

    return Query<People>.Bool(b => b
        .Must(queryCombin)
        .Filter(f => f
            .DateRange(dr => dr.Field(t => t.CreateTime) //時間范圍匹配
                .GreaterThanOrEquals(DateMath.Anchored(condition.BeginCreateTime.ToString("yyyy-MM-ddTHH:mm:ss")))
                .LessThanOrEquals(DateMath.Anchored(condition.EndCreateTime.ToString("yyyy-MM-ddTHH:mm:ss"))))));
}

提示:Match 和 MatchPhrase 的區別,例如對於"長寧區"

  1. Match 會將"長寧區"進行分詞匹配,例如只要包含"區"的數據(比如靜安區),也會被查詢命中
  2. MatchPhrase 則可以理解為短語匹配,只有當數據包含“長寧區”完整短語的數據,才會被查詢命中

增加分頁查詢接口

public async Task<PagedResult<People[]>> QueryPeopleAsync(SearchCondition condition, int pageIndex, int pageSize)
{
    var query = this.BuildQueryContainer(condition);
    var response = await this._client.SearchAsync<People>(s => s
            .Index("testindex")
            .From(pageIndex * pageSize)
            .Size(pageSize)
            .Query(q => query)
            .Sort(st => st.Descending(d => d.CreateTime)));

    if (response.ApiCall.Success)
    {
        return new PagedResult<People[]>
        {
            PageIndex = pageIndex,
            PageSize = pageSize,
            Total = response.Total,
            ReturnObj = response.Hits.Select(s => s.Source).ToArray()
        };
    }

    return new PagedResult<People[]> { IsSuccess = false };
}

編寫單元測試

[TestMethod]
public async Task QueryPeopleTest()
{
    var condition = new SearchCondition
    {
        Address="長寧區",
        BeginCreateTime = DateTime.Now.AddDays(-1),
        EndCreateTime = DateTime.Now
    };

    var result = await this._elasticService.QueryPeopleAsync(condition, 0, 3);
    Assert.IsTrue(result.IsSuccess);
}

利用 Wireshark 抓包分析HTTP調用細節

將抓包的數據轉換為HTTP流,查看請求細節:

提示:通過wireshark抓包是排查錯誤很有效的方式,有時候通過查詢文檔進行分析,還不如先抓包查看請求數據來得直接,同時可以將抓包數據放在Kabana所提供的 Dev Tools中驗證自己的想法。

利用 Kibana 提供的 Dev Tools 驗證/測試 查詢條件

3.總結

從.NET Framework 平台轉向 .Net Core 平台,其實不僅僅是開發框架的升級,或者從 Windows 轉向 Linux 的遷移,而是需要我們有更多的開源思維,即:

  1. 由於會使用到更多的三方組件,開發人員需要更多關注社區的變化
  2. 開源代碼,意味着開發人員可以並且需要更多關注源代碼的底層實現

本文示例代碼地址:https://github.com/xBoo/articles/tree/master/src/ElasticSearchNetDemo


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM