ES Client


關於 ElasticSearch的學習參見:ELK | wjcx_sqh
本文分別學習 .Net | Java 下操作 ES:

.Net

目前主流的 .Net 客戶端有 2 種:

  • PlainElastic.Net
  • Elasticsearch.Net.dll 和 Nest.dll

PlainElastic.Net 簡單了解即可,具體參見:https://www.cnblogs.com/eggTwo/p/4039779.html

Elasticsearch.Net + Nest

直接在Nuget | 官網下載對應的.nuget包,項目中引入即可。注意不同版本.dll對.Net框架的依賴

索引

創建連接(連接池

var nodes = new Uri[] { new Uri("http://ip1:9200"), new Uri("http://ip2:9200") };
var pool = new StaticConnectionPool(nodes); //推薦使用
var settings = new ConnectionSettings(pool);
var ESClient = new ElasticClient(settings);

其他配置參見:

settings.DefaultIndex("defaultidx"); //指定默認索引
settings.DefaultFieldNameInferrer((name) => name) //與Model字段同名,避免出現字段不一致的情況
settings.BasicAuthentication("username", "password"); //用戶認證
settings.RequestTimeout(new TimeSpan(10000)); //請求超時設置
settings.MaximumRetries(2); //最大重試次數
settings.MaxRetryTimeout(new TimeSpan(50000)); //重試超時時間, 默認是RequestTimeout
settings.DisableDirectStreaming(true); //開啟debug調試,生產環境建議關閉

配置回調方法

settings.OnRequestCompleted(apiCallDetails => { //請求完成 返回 apiCallDetails });
settings.OnRequestDataCreated(requestData => { //請求的數據創建完成 返回請求的數據 });

索引判斷和創建

var descriptor = new CreateIndexDescriptor("idxName").Settings(s => s.NumberOfShards(5).NumberOfReplicas(1));
client.CreateIndex(descriptor);

if (!client.TypeExists(_indexName, _typeName).Exists) {
	client.CreateIndex(_indexName, p => p.InitializeUsing(_indexState)
	      .Mappings(m => m.Map<DefClassName>(mp => mp.AutoMap()))); }	  
protected static IIndexState _indexState = new IndexState() { //索引配置
	Settings = new IndexSettings() {
		NumberOfReplicas = 1, NumberOfShards = 5
	}};

client.IndexExists("index_name"); //判斷
client.DeleteIndex("index_name"); //刪除

一旦mapping原字段不能再修改,需刪除再重新創建。其中DefClassName:

[ElasticsearchType(Name = "TYPE_NAME")]
public class AFVInfo {
	[Keyword(Name = "field1", Index = true, IgnoreAbove = 20)]
	public string field1 { get; set; }

	[Text(Name = "field2",Index= false)]
	public string field2 { get; set; }
	
        public DateTime dt{ get; set; }
}

通過配置各字段的KeywordText屬性即可完成創建索引時的映射。Elasticsearch.net client NEST
但是,可以新增其他字段

var result = _client.Map<AFVInfo>(m => m.Index(indexName).Properties(p => p
  .Keyword(s => s.Name("field_name1").Index(true))
  .Text(s=>s.Name("field_name2").Index(false))
));

調試:獲取ES交互時的請求和響應

var requestStr = System.Text.Encoding.UTF8.GetString(result.ApiCall.RequestBodyInBytes);
var responseStr = System.Text.Encoding.UTF8.GetString(result.ApiCall.ResponseBodyInBytes);

查詢

對象方式查詢和Fluent API
TermQuery 是整詞搜索;MatchQuery 是按分詞器分詞搜索,可以搭配from和size從指定位置返回指定條數。
注意 match 與 match_phrase 的不同。

Search After
通過上一頁的結果檢索下一頁,使用search_after參數時,from的值必須設為0或-1:search_after

  • from和size:深度分頁或size特別大時,會出deep pagination,es自保機制max_result_window預設值10000,建議from + size <= 1萬
  • scroll:代表某時刻的snapshot,不適合實時查詢,scroll后接超時時間,頻繁發起scroll請求,也會出現一系列問題

search_after解決scroll的非實時取值問題,提供live cursor規避消耗存儲和時間的性能問題:search_after性能
Source Filter
推薦在查詢請求SearchRequest中使用,按需返回字段

sr.Source = new SourceFilter() {
   Includes = new string[] { "xx", "zz" }, Excludes = new string[] { "vv" }
};

fielddata
延遲加載-->內存控制:正排索引,列式存儲(驅逐線、斷路器)

//配置示例 elasticsearch.yml
indices.fielddata.cache.size: 20%
indices.breaker.total.limit:indices.breaker.fielddata.limit + indices.breaker.request.limit

內存使用監控

GET /_stats/fielddata?fields=* //各個分片、索引的fielddata在內存中的占用情況
GET /_nodes/stats/indices/fielddata?fields=* //每個node的fielddata在內存中的占用情況
GET /_nodes/stats/indices/fielddata?level=indices&fields=* //每個node中的每個索引的fielddata在內存中的占用情況

_explain和_analyze
_explain:幫助分析文檔的relevance score如何計算出來
_analyze:幫助分析每個field或某個analyzer/tokenizer如何分析和索引一段文字

工具類

首先是初始化部分

public class NestEsUtil<T> where T : class {
    private readonly List<Uri> _nodes = new List<Uri>();
    private readonly string _indexName;
    private readonly ElasticClient _elasticClient;

    public static ElasticClient GetClient() {  //var client =  NestEsUtil<ModelVo>.GetClient();
        string indexName = typeof(T).Name;
        var nodes = new Uri[] { new Uri(""), new Uri("") };
        var _ConnectionPool = new StaticConnectionPool(nodes);
        var _ConnectionConfig = new ConnectionSettings(_ConnectionPool, sourceSerializer: (builtin, settingss) => new JsonNetSerializer());
        var _ElasticClient = new ElasticClient(_ConnectionConfig.DefaultIndex(indexName).DefaultFieldNameInferrer((name) => name));
        return _ElasticClient;
    }

    public NestEsUtil(string indexName, string ips) {  //NestEsUtil esUtil = new NestEsUtil("xxx", "ips");
        this._indexName = indexName;
        foreach (var ip in ips.Split(';')) { _nodes.Add(new Uri("http://" + ip + "/")); }
        var _ConnectionPool = new StaticConnectionPool(_nodes);
        var _ConnectionConfig = new ConnectionSettings(_ConnectionPool, sourceSerializer: (builtin, settingss) => new JsonNetSerializer());
        this._elasticClient = new ElasticClient(_ConnectionConfig.DefaultIndex(this._indexName).DefaultFieldNameInferrer((name) => name));
    }
    ///以ip:port;ip:port;ip:port格式送值
    public NestEsUtil(string ips) : this(typeof(T).Name, ips) { } //獲取類名
}

插入

public void InsertOne(T data) {
    var result = _elasticClient.Index<T>(data, s => s.Index(_indexName));
}

刪除

public void DeleteOne(T data) {
    dynamic d = data;
    _elasticClient.DeleteByQuery<T>(s => s.Index(this._indexName)
        .Query(q => q.Term(tm => tm.Field(fd => fd.GetType().GetProperty("PhoneNumber").Name).Value(d.PhoneNumber))));
}

查詢


修改/更新


對於分頁、聚合、分組、掃描滾屏等,待學習の...
Elasticsearch工具類清單

問題解決

問題1:Kibana顯示的時間比實際插入ES的時間多8個小時
原因:NEST的序列化器默認DateTime類型是UTC時區,序列化時丟棄了時區信息,而Kibana設置是東八區
解決:創建client時傳入設置參數

var settings = new ConnectionSettings(pool, 
	sourceSerializer: (builtin, setting) => new JsonNetSerializer(builtin, setting, 
		() => new Newtonsoft.Json.JsonSerializerSettings { 
			DateTimeZoneHandling = Newtonsoft.Json.DateTimeZoneHandling.Local 
}));

問題2:實現超時自動重試
解決:通過添加max_retries和retry_on_timeout兩個參數

es = Elasticsearch( hosts=[{'host': 'localhost', 'port': 9200}], timeout=60, max_retries=3, retry_on_timeout=True);

序列化

ElasticSearch是Restful相關,自然經常用到 json
推薦學習:Custom Serialization
說到json,自然需要解析,提供2種方式:JsonPathLinq to JSON

log4net.ElasticSearch

除了調用ES對外的接口,還可以直接向ES寫日志:log4net.ElasticSearch

<appender name="ElasticSearchAppender" type="log4net.ElasticSearch.ElasticSearchAppender, log4net.ElasticSearch">
  <layout type="log4net.Layout.PatternLayout,log4net">
	<param name="ConversionPattern" value="%d{ABSOLUTE} %-5p %c{1}:%L - %m%n" />
  </layout>
  <connectionString value="Server=xxx.xxx.xxx.xxx;Index=logsqh;Port=9200;rolling=false"/>
  <lossy value="false" />
  <evaluator type="log4net.Core.LevelEvaluator">
	<threshold value="ALL" />
  </evaluator>
  <bufferSize value="1" />
</appender>

其中,rolling屬性控制是否每天生成一個索引,具體參見:log4net.ElasticSearch+ Kibana日志記錄和顯示

Java

原生API

  • transport:TCP,只支持java
  • rest:http,無語言限制

建議rest,transport將在v7.0、v8.0中逐步廢棄。

SpringBoot + Elasticsearch

SpringBoot集成Elasticsearch,支持4種方式

  • REST Client:http,Java Low Level Rest Client和推薦:Java High Level Rest Client
  • Jest:http,java社區版
  • Spring Data:spring集成elasticsearch開發包
  • Spring Data Repositories

Spring Data Elasticsearch

Spring Data 子模塊套件,支持快速初始化maven項,官網移步參考示例

  • SpringBoot:v2.2.2
  • Elasticsearch:v6.8.0

務必保證SpringBoot和Elasticsearch的版本匹配,對應關系

High Level Rest Client

版本配置:sb-2.2.2 + es-6.6.2

<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.6.2</version>

初始引用v6.1.4會報錯:org.elasticsearch.client.Request.<init>(Ljava/lang/String;Ljava/lang/String;)V
提供一個ES的客戶端配置示例

private String schema = "http";
private int connectTimeOut = 1000;
private int socketTimeOut = 30000;
private int connectionRequestTimeOut = 500;
private int maxConnectNum = 100;
private int maxConnectPerRoute = 100;
private boolean uniqueConnectTimeConfig = true;
private boolean uniqueConnectNumConfig = true;
private RestClientBuilder builder;
private RestHighLevelClient client;

List<HttpHost> httpHosts = new ArrayList<>();
HttpHost it = new HttpHost(host, port, schema);
httpHosts.add(it);

@Bean(autowire = Autowire.BY_NAME, name = "restHighLevelClient")
public RestHighLevelClient client() {
	try {
		builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));
		if (uniqueConnectTimeConfig) { setConnectTimeOutConfig(); }
		if (uniqueConnectNumConfig) { setMutiConnectConfig(); }
		client = new RestHighLevelClient(builder);
		return client;
	} catch (NumberFormatException e) { }
	return null;
}

/**
 * 異步httpclient的連接延時配置
 */
public void setConnectTimeOutConfig() {
	builder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
		@Override
		public Builder customizeRequestConfig(Builder requestConfigBuilder) {
			requestConfigBuilder.setConnectTimeout(connectTimeOut);
			requestConfigBuilder.setSocketTimeout(socketTimeOut);
			requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut);
			return requestConfigBuilder;
		}
	});
}
/**
 * 異步httpclient的連接數配置
 */
public void setMutiConnectConfig() {
	builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
		@Override
		public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
			httpClientBuilder.setMaxConnTotal(maxConnectNum);
			httpClientBuilder.setMaxConnPerRoute(maxConnectPerRoute);
			return httpClientBuilder;
		}
	});
}

若是需認證,客戶端注入參考如下方法

@Bean(autowire = Autowire.BY_NAME, name = "restHighLevelClientNew")
public RestHighLevelClient newClient() {
	try {
		final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
		credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(newESUserName,newESUserPassWord));
		builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]))
		 .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
			  @Override
			  public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
				  return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
			  }
		  });
		String auth = Base64Util.decode((_esUserName + ":" + _esUserPassWord).getBytes());
		builder.setDefaultHeaders(new BasicHeader[]{new BasicHeader("Authorization", "Basic " + auth)});

		if (uniqueConnectTimeConfig) { setNewConnectTimeOutConfig(); }
		if (uniqueConnectNumConfig) { setNewMutiConnectConfig(); }
		newClient = new RestHighLevelClient(newBuilder);
		return newClient;
	} catch (NumberFormatException e) { }
	return null;
}

參考學習:示例1示例2
BulkMulti-GetReindexUpdate by queryDelete by queryRethrottle
多記錄操作-RestClient


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM