search(4)- elastic4s-ElasticDsl


   上次分析了一下elastic4s的運算框架。本來計划接着開始實質的函數調用示范,不過看過了Elastic4s的所有使用說明文檔后感覺還是走的快了一點。主要原因是elasticsearch在7.0后有了很多重點調整改變,elastic4s雖然一直在源代碼方面緊跟ES的變化,但使用文件卻一直未能更新,所以從說明文檔中學習elastic4s的使用方法是不可能的,必須從源碼中摸索。花了些時間過了一次elastic4s的源碼,感覺這個工具庫以后還是挺有用的:一是通過編程方式產生json請求比較靈活,而且可以通過compiler來保證json語句的正確性。二是對搜索結果的處理方面:由於返回的搜索結果是一堆又長又亂的復雜json,不敢想象自己要如何正確的解析這些json, 然后才能調用到正確的結果,但elastic4s提供了一套很完善的response類,使用起來可能會很方便。實際上elastic4s的編程模式和scala語言運用還是值得學習的。既然這樣,我想可能用elastic4s做一套完整的示范,包括:索引創建、索引維護、搜索、聚合統計等,對了解和掌握elastic4s可能大有幫助。在這之前,我們還是再回顧一下elastic4s的運算原理:elastic4s的功能其實很簡單:通過dsl語句組合產生json請求,然后發送給ES-rest終端, 對返回的json結果進行處理,篩選出目標答案。

上篇我們討論過elastic4s的基本運算框架:

 client.execute( createIndex("company") .shards(2).replicas(3) ) ... val bookschema = putMapping("books") .as( KeywordField("isbn"), textField("title"), doubleField("price") ) client.execute( bookschema ) ... val futAccts = client.execute( search("bank").termQuery("city" -> "dante") ) futAccts.onComplete{ case Success(esresp) => esresp.result.hits.hits.foreach(h =>println(h.sourceAsMap)) case Failure(err) => println(s"search error: ${err.getMessage}") }

實際上execute(T)的T代表elastic4s支持的所有ES操作類型。這種方法實現有賴於scala的typeclass模式。我們先看看execute函數定義:

  // Executes the given request type T, and returns an effect of Response[U] // where U is particular to the request type. // For example a search request will return a Response[SearchResponse].
  def execute[T, U, F[_]](t: T)(implicit executor: Executor[F], functor: Functor[F], handler: Handler[T, U], manifest: Manifest[U]): F[Response[U]] = { val request = handler.build(t) val f = executor.exec(client, request) functor.map(f) { resp => handler.responseHandler.handle(resp) match { case Right(u) => RequestSuccess(resp.statusCode, resp.entity.map(_.content), resp.headers, u) case Left(error) => RequestFailure(resp.statusCode, resp.entity.map(_.content), resp.headers, error) } } }

上篇提過Handler[T,U]是個typeclass, 代表對不同類型T的json構建方法。elastic4s提供了這個T類型的操作方法,如下:

trait ElasticDsl extends ElasticApi with Logging with ElasticImplicits with BulkHandlers with CatHandlers with CountHandlers with ClusterHandlers with DeleteHandlers with ExistsHandlers with ExplainHandlers with GetHandlers with IndexHandlers with IndexAdminHandlers with IndexAliasHandlers with IndexStatsHandlers with IndexTemplateHandlers with LocksHandlers with MappingHandlers with NodesHandlers with ReindexHandlers with RoleAdminHandlers with RoleHandlers with RolloverHandlers with SearchHandlers with SearchTemplateHandlers with SearchScrollHandlers with SettingsHandlers with SnapshotHandlers with UpdateHandlers with TaskHandlers with TermVectorHandlers with UserAdminHandlers with UserHandlers with ValidateHandlers { implicit class RichRequest[T](t: T) { def request(implicit handler: Handler[T, _]): ElasticRequest = handler.build(t) def show(implicit handler: Handler[T, _]): String            = Show[ElasticRequest].show(handler.build(t)) } } object ElasticDsl extends ElasticDsl

所有的操作api在這里:

trait ElasticApi extends AliasesApi with ElasticImplicits with AggregationApi with AnalyzerApi with BulkApi with CatsApi with CreateIndexApi with ClearRolesCacheApi with ClusterApi with CollapseApi with CountApi with CreateRoleApi with CreateUserApi with DeleteApi with DeleteIndexApi with DeleteRoleApi with DeleteUserApi with DynamicTemplateApi with ExistsApi with ExplainApi with ForceMergeApi with GetApi with HighlightApi with IndexApi with IndexAdminApi with IndexRecoveryApi with IndexTemplateApi with LocksApi with MappingApi with NodesApi with NormalizerApi with QueryApi with PipelineAggregationApi with ReindexApi with RoleApi with ScriptApi with ScoreApi with ScrollApi with SearchApi with SearchTemplateApi with SettingsApi with SnapshotApi with SortApi with SuggestionApi with TaskApi with TermVectorApi with TokenizerApi with TokenFilterApi with TypesApi with UpdateApi with UserAdminApi with UserApi with ValidateApi { implicit class RichFuture[T](future: Future[T]) { def await(implicit duration: Duration = 60.seconds): T = Await.result(future, duration) } } object ElasticApi extends ElasticApi

通過 import ElasticDsl._  ,所有類型的api,handler操作方法都有了。下面是例子里的api方法:

trait CreateIndexApi { def createIndex(name: String): CreateIndexRequest = CreateIndexRequest(name) ... } trait MappingApi { ... def putMapping(indexes: Indexes): PutMappingRequest = PutMappingRequest(IndexesAndType(indexes)) } trait SearchApi { def search(index: String): SearchRequest = search(Indexes(index)) ... }

CreateIndexRequest, PutMappingRequest,SearchRequest這幾個類型都提供了handler隱式實例:

trait IndexAdminHandlers { ... implicit object CreateIndexHandler extends Handler[CreateIndexRequest, CreateIndexResponse] { override def responseHandler: ResponseHandler[CreateIndexResponse] = new ResponseHandler[CreateIndexResponse] { override def handle(response: HttpResponse): Either[ElasticError, CreateIndexResponse] = response.statusCode match { case 200 | 201 => Right(ResponseHandler.fromResponse[CreateIndexResponse](response)) case 400 | 500 => Left(ElasticError.parse(response)) case _         => sys.error(response.toString) } } override def build(request: CreateIndexRequest): ElasticRequest = { val endpoint = "/" + URLEncoder.encode(request.name, "UTF-8") val params = scala.collection.mutable.Map.empty[String, Any] request.waitForActiveShards.foreach(params.put("wait_for_active_shards", _)) request.includeTypeName.foreach(params.put("include_type_name", _)) val body = CreateIndexContentBuilder(request).string() val entity = HttpEntity(body, "application/json") ElasticRequest("PUT", endpoint, params.toMap, entity) } } } ... trait MappingHandlers { ... implicit object PutMappingHandler extends Handler[PutMappingRequest, PutMappingResponse] { override def build(request: PutMappingRequest): ElasticRequest = { val endpoint = s"/${request.indexesAndType.indexes.mkString(",")}/_mapping${request.indexesAndType.`type`.map("/" + _).getOrElse("")}" val params = scala.collection.mutable.Map.empty[String, Any] request.updateAllTypes.foreach(params.put("update_all_types", _)) request.ignoreUnavailable.foreach(params.put("ignore_unavailable", _)) request.allowNoIndices.foreach(params.put("allow_no_indices", _)) request.expandWildcards.foreach(params.put("expand_wildcards", _)) request.includeTypeName.foreach(params.put("include_type_name", _)) val body = PutMappingBuilderFn(request).string() val entity = HttpEntity(body, "application/json") ElasticRequest("PUT", endpoint, params.toMap, entity) } } } ... trait SearchHandlers { ... implicit object SearchHandler extends Handler[SearchRequest, SearchResponse] { override def build(request: SearchRequest): ElasticRequest = { val endpoint =
        if (request.indexes.values.isEmpty) "/_all/_search"
        else
          "/" + request.indexes.values .map(URLEncoder.encode(_, "UTF-8")) .mkString(",") + "/_search" val params = scala.collection.mutable.Map.empty[String, String] request.requestCache.map(_.toString).foreach(params.put("request_cache", _)) request.searchType .filter(_ != SearchType.DEFAULT) .map(SearchTypeHttpParameters.convert) .foreach(params.put("search_type", _)) request.routing.map(_.toString).foreach(params.put("routing", _)) request.pref.foreach(params.put("preference", _)) request.keepAlive.foreach(params.put("scroll", _)) request.allowPartialSearchResults.map(_.toString).foreach(params.put("allow_partial_search_results", _)) request.batchedReduceSize.map(_.toString).foreach(params.put("batched_reduce_size", _)) request.indicesOptions.foreach { opts => IndicesOptionsParams(opts).foreach { case (key, value) => params.put(key, value) } } request.typedKeys.map(_.toString).foreach(params.put("typed_keys", _)) val body = request.source.getOrElse(SearchBodyBuilderFn(request).string()) ElasticRequest("POST", endpoint, params.toMap, HttpEntity(body, "application/json")) } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM