Go操作Elasticsearch


 

Elasticsearch


Elasticsearch

下載

  https://www.elastic.co/cn/start

運行

  解壓后cd到解壓目錄 ./bin/elasticsearch

介紹

Elasticsearch(ES)是一個基於Lucene構建的開源、分布式、RESTful接口的全文搜索引擎。Elasticsearch還是一個分布式文檔數據庫,其中每個字段均可被索引,而且每個字段的數據均可被搜索,ES能夠橫向擴展至數以百計的服務器存儲以及處理PB級的數據。可以在極短的時間內存儲、搜索和分析大量的數據。通常作為具有復雜搜索場景情況下的核心發動機。

Elasticsearch能做什么

  1. 當你經營一家網上商店,你可以讓你的客戶搜索你賣的商品。在這種情況下,你可以使用ElasticSearch來存儲你的整個產品目錄和庫存信息,為客戶提供精准搜索,可以為客戶推薦相關商品。
  2. 當你想收集日志或者交易數據的時候,需要分析和挖掘這些數據,尋找趨勢,進行統計,總結,或發現異常。在這種情況下,你可以使用Logstash或者其他工具來進行收集數據,當這引起數據存儲到ElasticsSearch中。你可以搜索和匯總這些數據,找到任何你感興趣的信息。
  3. 對於程序員來說,比較有名的案例是GitHub,GitHub的搜索是基於ElasticSearch構建的,在github.com/search頁面,你可以搜索項目、用戶、issue、pull request,還有代碼。共有40~50個索引庫,分別用於索引網站需要跟蹤的各種數據。雖然只索引項目的主分支(master),但這個數據量依然巨大,包括20億個索引文檔,30TB的索引文件。

Elasticsearch基本概念

  Near Realtime(NRT) 幾乎實時

Elasticsearch是一個幾乎實時的搜索平台。意思是,從索引一個文檔到這個文檔可被搜索只需要一點點的延遲,這個時間一般為毫秒級。

  Cluster 集群

群集是一個或多個節點(服務器)的集合, 這些節點共同保存整個數據,並在所有節點上提供聯合索引和搜索功能。一個集群由一個唯一集群ID確定,並指定一個集群名(默認為“elasticsearch”)。該集群名非常重要,因為節點可以通過這個集群名加入群集,一個節點只能是群集的一部分。

確保在不同的環境中不要使用相同的群集名稱,否則可能會導致連接錯誤的群集節點。例如,你可以使用logging-dev、logging-stage、logging-prod分別為開發、階段產品、生產集群做記錄。

  Node節點

節點是單個服務器實例,它是群集的一部分,可以存儲數據,並參與群集的索引和搜索功能。就像一個集群,節點的名稱默認為一個隨機的通用唯一標識符(UUID),確定在啟動時分配給該節點。如果不希望默認,可以定義任何節點名。這個名字對管理很重要,目的是要確定你的網絡服務器對應於你的ElasticSearch群集節點。

我們可以通過群集名配置節點以連接特定的群集。默認情況下,每個節點設置加入名為“elasticSearch”的集群。這意味着如果你啟動多個節點在網絡上,假設他們能發現彼此都會自動形成和加入一個名為“elasticsearch”的集群。

在單個群集中,你可以擁有盡可能多的節點。此外,如果“elasticsearch”在同一個網絡中,沒有其他節點正在運行,從單個節點的默認情況下會形成一個新的單節點名為”elasticsearch”的集群。

  Index索引

索引是具有相似特性的文檔集合。例如,可以為客戶數據提供索引,為產品目錄建立另一個索引,以及為訂單數據建立另一個索引。索引由名稱(必須全部為小寫)標識,該名稱用於在對其中的文檔執行索引、搜索、更新和刪除操作時引用索引。在單個群集中,你可以定義盡可能多的索引。

  Type類型

在索引中,可以定義一個或多個類型。類型是索引的邏輯類別/分區,其語義完全取決於你。一般來說,類型定義為具有公共字段集的文檔。例如,假設你運行一個博客平台,並將所有數據存儲在一個索引中。在這個索引中,你可以為用戶數據定義一種類型,為博客數據定義另一種類型,以及為注釋數據定義另一類型。

  Document文檔

文檔是可以被索引的信息的基本單位。例如,你可以為單個客戶提供一個文檔,單個產品提供另一個文檔,以及單個訂單提供另一個文檔。本文件的表示形式為JSON(JavaScript Object Notation)格式,這是一種非常普遍的互聯網數據交換格式。

在索引/類型中,你可以存儲盡可能多的文檔。請注意,盡管文檔物理駐留在索引中,文檔實際上必須索引或分配到索引中的類型。

  Shards & Replicas分片與副本

索引可以存儲大量的數據,這些數據可能超過單個節點的硬件限制。例如,十億個文件占用磁盤空間1TB的單指標可能不適合對單個節點的磁盤或可能太慢服務僅從單個節點的搜索請求。

為了解決這一問題,Elasticsearch提供細分你的指標分成多個塊稱為分片的能力。當你創建一個索引,你可以簡單地定義你想要的分片數量。每個分片本身是一個全功能的、獨立的“指數”,可以托管在集群中的任何節點。

  Shards分片的重要性主要體現在以下兩個特征:

  1. 分片允許你水平拆分或縮放內容的大小
  2. 分片允許你分配和並行操作的碎片(可能在多個節點上)從而提高性能/吞吐量 這個機制中的碎片是分布式的以及其文件匯總到搜索請求是完全由ElasticSearch管理,對用戶來說是透明的。

在同一個集群網絡或雲環境上,故障是任何時候都會出現的,擁有一個故障轉移機制以防分片和節點因為某些原因離線或消失是非常有用的,並且被強烈推薦。為此,Elasticsearch允許你創建一個或多個拷貝,你的索引分片進入所謂的副本或稱作復制品的分片,簡稱Replicas。

  Replicas的重要性主要體現在以下兩個特征:

  1. 副本為分片或節點失敗提供了高可用性。為此,需要注意的是,一個副本的分片不會分配在同一個節點作為原始的或主分片,副本是從主分片那里復制過來的。
  2. 副本允許用戶擴展你的搜索量或吞吐量,因為搜索可以在所有副本上並行執行。

ES基本概念與關系型數據庫的比較

ES概念 關系型數據庫
Index(索引)支持全文檢索 Database(數據庫)
Type(類型) Table(表)
Document(文檔),不同文檔可以有不同的字段集合 Row(數據行)
Field(字段) Column(數據列)
Mapping(映射) Schema(模式)

ES API

以下示例使用curl演示。

查看健康狀態

curl -X GET 127.0.0.1:9200/_cat/health?v

輸出:

epoch      timestamp cluster       status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1564726309 06:11:49  elasticsearch yellow          1         1      3   3    0    0        1             0                  -                 75.0%

查詢當前es集群中所有的indices

curl -X GET 127.0.0.1:9200/_cat/indices?v

輸出:

health status index                uuid                   pri rep docs.count docs.deleted store.size pri.store.size
green  open   .kibana_task_manager LUo-IxjDQdWeAbR-SYuYvQ   1   0          2            0     45.5kb         45.5kb
green  open   .kibana_1            PLvyZV1bRDWex05xkOrNNg   1   0          4            1     23.9kb         23.9kb
yellow open   user                 o42mIpDeSgSWZ6eARWUfKw   1   1          0            0       283b           283b

創建索引

curl -X PUT 127.0.0.1:9200/www

輸出:

{"acknowledged":true,"shards_acknowledged":true,"index":"www"}

刪除索引

curl -X DELETE 127.0.0.1:9200/www

輸出:

{"acknowledged":true}

插入記錄

curl -H "ContentType:application/json" -X POST 127.0.0.1:9200/user/person -d '
{
	"name": "dsb",
	"age": 9000,
	"married": true
}'

輸出:

{
    "_index": "user",
    "_type": "person",
    "_id": "MLcwUWwBvEa8j5UrLZj4",
    "_version": 1,
    "result": "created",
    "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
    },
    "_seq_no": 3,
    "_primary_term": 1
}

也可以使用PUT方法,但是需要傳入id

curl -H "ContentType:application/json" -X PUT 127.0.0.1:9200/user/person/4 -d '
{
	"name": "sb",
	"age": 9,
	"married": false
}

檢索

Elasticsearch的檢索語法比較特別,使用GET方法攜帶JSON格式的查詢條件。

全檢索:

curl -X GET 127.0.0.1:9200/user/person/_search

按條件檢索:

curl -H "ContentType:application/json" -X PUT 127.0.0.1:9200/user/person/4 -d '
{
	"query":{
		"match": {"name": "sb"}
	}	
}

ElasticSearch默認一次最多返回10條結果,可以像下面的示例通過size字段來設置返回結果的數目。

curl -H "ContentType:application/json" -X PUT 127.0.0.1:9200/user/person/4 -d '
{
	"query":{
		"match": {"name": "sb"},
		"size": 2
	}	
}

Go操作Elasticsearch

elastic client

我們使用官方包 https://github.com/elastic/go-elasticsearch 來連接ES並進行操作。

注意下載與你的ES相同版本的client,例如我們這里使用的ES是7.2.1的版本,那么我們下載的client也要與之對應為github.com/olivere/elastic/v7

$ go get github.com/elastic/go-elasticsearch/v7   // 7.x

 

連接ES

默認連接

package main

import (
	"github.com/elastic/go-elasticsearch"
	"log"
)

// es操作

// 創建客戶端
func main() {
	// 連接es 
	
	// NewDefaultClient 默認客戶端 127.0.0.1:9200
	es, err := elasticsearch.NewDefaultClient()
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}

	// 獲取 當前es的詳細信息
	res, err := es.Info()
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()
	log.Printf("%#v\n",res)
	log.Printf("%v\n",res)
}

  

配置文件連接

package main

import (
	"github.com/elastic/go-elasticsearch"
	"log"
)

// es操作

// 創建客戶端
func main() {
	// 連接es

	// elasticsearch.Config 配置ES連接屬性
	/*
		type Config struct {
				Addresses []string // 要使用的Elasticsearch節點列表.
				Username  string   // HTTP基本身份驗證的用戶名.
				Password  string   // HTTP基本認證密碼.

				CloudID string // 彈性服務的端點 (https://elastic.co/cloud).
				APIKey  string // 用於授權的Base64編碼令牌;如果設置,則覆蓋用戶名和密碼.

				RetryOnStatus        []int // 重試狀態代碼列表. 默認: 502, 503, 504.
				DisableRetry         bool  // 默認: false.
				EnableRetryOnTimeout bool  // 默認: false.
				MaxRetries           int   // 默認: 3.

				DiscoverNodesOnStart  bool          // 初始化客戶端時發現節點. Default: false.
				DiscoverNodesInterval time.Duration // 定期發現節點. Default: disabled.

				EnableMetrics     bool // 啟用指標收集.
				EnableDebugLogger bool // 啟用調試日志記錄.

				RetryBackoff func(attempt int) time.Duration // 可選的退避時間. 默認: nil.

				Transport http.RoundTripper    // HTTP傳輸對象.
				Logger    estransport.Logger   // 記錄器對象.
				Selector  estransport.Selector // 選擇器對象.

				// 自定義ConnectionPool的可選構造函數. Default: nil.
				ConnectionPoolFunc func([]*estransport.Connection, estransport.Selector) estransport.ConnectionPool
			}
	*/
	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
			"http://localhost:9201",
		},
		// ...
	}

	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	// 獲取 當前es的詳細信息
	res, err := es.Info()
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()
	log.Printf("%#v\n", res)
	log.Printf("%v\n", res)
}

  

增刪改查

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"github.com/elastic/go-elasticsearch"
	"github.com/elastic/go-elasticsearch/esapi"
	"log"
	"strconv"
	"strings"
	"sync"
)

// es操作

// 創建客戶端
func main() {
	// 連接es
	cfg := elasticsearch.Config{
		Addresses: []string{
			"http://localhost:9200",
			"http://localhost:9201",
		},
		// ...
	}
	// 創建客戶端
	es, err := elasticsearch.NewClient(cfg)
	if err != nil {
		log.Fatalf("Error creating the client: %s", err)
	}
	// 獲取 es集群消息
	res, err := es.Info()
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	log.Println(res)
	defer res.Body.Close()

	// 檢查回應狀態
	if res.IsError() {
		log.Fatalf("Error: %s", res.String())
	}

	// 將響應轉化為map
	var (
		r map[string]interface{}
		//wg sync.WaitGroup
	)
	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
		log.Fatalf("Error parsing the response body: %s", err)
	}
	// 打印序列化的map r
	log.Printf("%#v", r)
	// 打印客戶端和服務器版本號.
	log.Printf("Client: %s", elasticsearch.Version)
	log.Printf("Server: %v", r["version"].(map[string]interface{})["number"])
	log.Println(strings.Repeat("~", 37))

	// 創建索引 && 文件
	var wg sync.WaitGroup
	// 循環list
	for i, title := range []string{"Test One", "Test Two"} {
		wg.Add(1)

		go func(i int, title string) {
			defer wg.Done()

			// 建立請求主體.
			// Builder創建json串
			var b strings.Builder
			b.WriteString(`{"title" : "`)
			b.WriteString(title)
			b.WriteString(`"}`)

			// 設置請求對象.
			req := esapi.IndexRequest{
				Index:      "test",
				DocumentID: strconv.Itoa(i + 1),
				Body:       strings.NewReader(b.String()),
				Refresh:    "true",
			}

			// 與客戶端執行請求.

			res, err := req.Do(context.Background(), es)
			if err != nil {
				log.Fatalf("Error getting response: %s", err)
			}
			defer res.Body.Close()

			if res.IsError() {
				log.Printf("[%s] Error indexing document ID=%d", res.Status(), i+1)
			} else {
				// Deserialize the response into a map.
				var r map[string]interface{}
				if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
					log.Printf("Error parsing the response body: %s", err)
				} else {
					// 打印響應狀態和索引文檔版本.
					log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
				}
			}
		}(i, title)
	}
	wg.Wait()
	log.Println(strings.Repeat("-", 37))

	// 搜索索引文件

	// 建立請求主體.
	var buf bytes.Buffer
	query := map[string]interface{}{
		"query": map[string]interface{}{
			"match": map[string]interface{}{
				"title": "test",
			},
		},
	}
	if err := json.NewEncoder(&buf).Encode(query); err != nil {
		log.Fatalf("Error encoding query: %s", err)
	}

	// 執行搜索請求.
	res, err = es.Search(
		es.Search.WithContext(context.Background()),
		es.Search.WithIndex("test"),
		es.Search.WithBody(&buf),
		es.Search.WithTrackTotalHits(true),
		es.Search.WithPretty(),
	)
	if err != nil {
		log.Fatalf("Error getting response: %s", err)
	}
	defer res.Body.Close()

	if res.IsError() {
		var e map[string]interface{}
		if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
			log.Fatalf("Error parsing the response body: %s", err)
		} else {
			// Print the response status and error information.
			log.Fatalf("[%s] %s: %s",
				res.Status(),
				e["error"].(map[string]interface{})["type"],
				e["error"].(map[string]interface{})["reason"],
			)
		}
	}

	if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
		log.Fatalf("Error parsing the response body: %s", err)
	}
	// 打印響應狀態,結果數和請求持續時間.
	log.Printf(
		"[%s] %d hits; took: %dms",
		res.Status(),
		int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
		int(r["took"].(float64)),
	)
	// 打印每次匹配的ID和文檔來源.
	for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
		log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])
	}
	log.Println(strings.Repeat("=", 37))

	// 更新
	//es.Update()
	
	// 刪除
	//es.Delete()
}

  

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM