Golang封裝Elasticsearch常用功能


前言(為什么要寫這篇文章)

首先看過我博客的都應該知道,我去年發了一篇Python封裝Elasticsearch的文章。但那是去年了,今年我將我的檢索服務后端用Golang全部重寫了一波,相當於用Go重構了以前的Python代碼,不過我個人感覺Golang的效率還是高於Python的,而且我還加了一些異常判斷和處理,這次的代碼只會比以前更好更牛逼,為了紀念這一個多月的重構歷程,我將關鍵功能記錄下來,方便自己復習和各位兄弟姐妹查看。

使用的Go包

我的Elasticsearch版本是6.3.2,6系列了,現在(2020-05-13)最新版本應該是7,不過新版本和舊版本應該就是少了Type,我的6版本代碼,請各位自己自行斟酌使用。
安裝指定的Go包 olivere/elastic,現在有官方驅動的包了,但是我這篇文章用的包是 olivere/elastic,所以一切的代碼都是以olivere為主。

go get github.com/olivere/elastic

基礎的使用(Simple的使用)

接下來我舉幾個例子來說下這個golang 如何驅動 elasticsearch的

連接Elasticserach

package elasticdb

import (
	"context"
	"fmt"

	"github.com/olivere/elastic"
)

func main() {
    //連接127.0.0.1
    client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
    if err != nil {
        fmt.Println(err)
        return
    }
    //檢查健康的狀況,ping指定ip,不通報錯
    _, _, err = client.Ping(ip).Do(context.Background())
    if err != nil {
        fmt.Println(err)
        return
    }
}

創建一個index索引

這里我默認你們都有一定的Es基礎,其實你把index想成Mysql里面的表就可以了。

//indexname 你可以想成表名
func CreateIndex(indexname string) {
    client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
    if err != nil {
        fmt.Println(err)
        return
    }
    client.CreateIndex(indexname).Do(context.Background())
}

刪除一個index索引

想成刪除一張表

//indexname 你可以想成表名
func CreateIndex(indexname string) {
    client, err := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
    if err != nil {
        fmt.Println(err)
        return
    }
    client.DeleteIndex(indexname).Do(context.Background())
}

往指定的index當中導一條數據

想成往一張表里面導入一條數據,在Golang中,我們可以導入json的字符串,我們也可以導入golang的struct類型,例如

//結構體
type Task struct {
    Taskid  string  `json:"taskid"`
    Taskname  string  `json:"taskname"`
}
//字符串
jsonmsg := `{"taskid":"123456", "taskname":"lwb"}`
//導入數據,你需要index名,index的type,導入的數據
func PutData(index string, typ string, bodyJSON interface{}) bool {
    client, _ := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
	_, err := client.Index().
		Index(index).
		Type(typ).
		BodyJson(bodyJSON).
		Do(context.Background())
	if err != nil {
		//驗證是否導入成功
		fmt.Sprintf("<Put> some error occurred when put.  err:%s", err.Error())
		return false
	}
	return true
}
func main() {
    //json字符串導入
    jsonmsg = `{"taskid":"123456", "taskname":"hahah"}`
    status := PutData("test", "doc", jsonmsg)
    //struct結構體導入
    task := Task{Taskid: "123", Taskname: "hahah"}
    status := PutData("test", "doc", task)
}

刪除一條數據

刪除數據需要ID,這個ID是個啥玩意兒呢。。。就是咱們不是剛導了一條數據進去么,你可以設置這數據的唯一ID,也可以讓Elasticsearch幫你自動生成一個,一般沒事兒干誰自己設置啊,還容易重復,一重復就報錯。。我在這里把這個刪除的方法教給大家,記住這個ID一定是唯一的

func DeleteData(index, typ, id string) {
    client, _ := elastic.NewClient(elastic.SetURL("http://127.0.0.1:9200"))
    _, err := client.Delete().Index(index).Type(typ).Id(id).Do(context.Background())
    if err != nil {
        fmt.Println(err)
        return
    }
}

高階使用(條件查詢/封裝等)

簡單講了一下增刪改,現在我們來講一下高階用法,高級增刪改查吧,其實官方的文檔講的還算是比較清楚,不過我等大中華程序狗的姿勢水平。。。至少我是圖樣圖森破,看英文也算是廢了九牛二虎之力,算是捋出來了一些高階用法,順手自己造了個輪子,現在我就來挑幾點來說下吧。

自動選擇可用的Es節點

配合olivere的ping機制,可以做一個自動檢測Es ip 是否可用的邏輯,這樣可以增加我們put,updatge時候的穩定性

自動檢測節點Elasticseach的IP是否可用

olivere的Elasticserach sdk 限定了只能用一個ip,類似“http://127.0.0.1:9200”這樣,我對原本的邏輯進行了一點改造,改成支持一個ip list,依次檢測Es ip 是否可用

package elasticdb

import (
	"context"
	"fmt"

	"github.com/olivere/elastic"
)

//Elastic es的連接,
type Elastic struct {
	Client *elastic.Client
	host   string
}

//Connect 基礎的連接代碼
func Connect(ip string) (*Elastic, error) {
    //引入IP
	client, err := elastic.NewClient(elastic.SetURL(ip))
	if err != nil {
		return nil, err
    }
    //Ping 的方式檢查是否可用
	_, _, err = client.Ping(ip).Do(context.Background())
	if err != nil {
		return nil, err
    }
    //輸出一個struct類型,可以被繼承
	es := &Elastic{
		Client: client,
		host:   ip,
	}
	return es, nil
}

//InitES 初始化Es連接
func InitES() (*Elastic, error) {
    //host是一個列表
    host := []string{"http://10.0.6.245:9200","http://10.0.6.246:9200","http://10.0.6.247:9200"}
    //統計host的數量
    Eslistsnum := len(host)
    //如果為零就不繼續接下來的邏輯
	if Eslistsnum == 0 {
		return nil, fmt.Errorf("Cluster Not Es Node")
	}
    //創建新的連接
	for i, ip := range host {
		//判斷是不是最后一個節點ip
		if (Eslistsnum - 1) != i {
			es, err := Connect(ip)
			//如果連接出錯,則跳過
			if err != nil {
				fmt.Println(err)
				continue
			}
            return es, nil
            //如果是最后一個節點
		} else {
            es, err := Connect(ip)
            //輸出錯誤
			if err != nil {
				return nil, err
			}
			return es, nil
		}
	}
	return nil, nil
}

后續我們可以采用繼承的方法調用Es的client的連接,這個在后面我就不詳細說了,聰明的你,看代碼一定能整明白,再整不明白,你就直接上Git拷貝我的代碼就得了。

條件查詢

以前我寫過一個Python的Elasticsearch Sdk,那里面的查詢基本都用了query,簡單來說,就是你,給Es的api發一個query,es給你返回一個查詢結果。這里我會舉幾個常用的條件查詢例子,然后用golang封裝一波。
這里我先定義一下數據結構,假設我們的Elasticsearch中,有一個叫做Task的index(索引),其中存儲着很多task的運行日志,它們的數據格式如下:

{
    "taskid": "081c255b-936c-11ea-8001-000000000000",
    "starttime": "2020/05/13 18:38:21",
    "endtime": "2020/05/13 18:38:47",
    "name": "cifs01",
    "status": 1,
    "count": 365
}

我們現在要做的就是圍繞task這個index和其中的數據做條件查找的例子,我說的很明白了吧?開工了!

查詢時間范圍/年齡大小的條件查詢方法

在業務需求中,我們經常會檢索各種各樣的數據,其中,范圍查找應該是用的比較多的,所以我把它放到了最前面。

type Task struct {
	TaskID    string `json:"taskid"`
	StartTime string `json:"starttime"`
	EndTime   string `json:"endtime"`
	Name      string `json:"name"`
	Status    int    `json:"status"`
	Count     int    `json:"count"`
}
//查找時間范圍大於2020/05/13 18:38:21,並且小於2020/05/14 18:38:21的數據
func (Es *Elastic) FindTime() {
    var typ Task
    boolQ := elastic.NewBoolQuery()
    //生成查詢語句,篩選starttime字段,查找大於2020/05/13 18:38:21,並且小於2020/05/14 18:38:21的數據
    boolQ.Filter(elastic.NewRangeQuery("starttime").Gte("2020/05/13 18:38:21"), elastic.NewRangeQuery("starttime").Lte("2020/05/14 18:38:21"))
    res, _ := Es.Client.Search("task").Type("doc").Query(boolQ).Do(context.Background())
        //從搜索結果中取數據的方法
        for _, item := range res.Each(reflect.TypeOf(typ)) {
            if t, ok := item.(Task); ok {
                fmt.Println(t)
            }
        }
}

查詢包含關鍵字的查詢方法

我們經常遇到那種,搜那么一兩個字,讓你展示所有包含這一兩個字的結果,就像百度,你搜個"開發",就能搜出來例如"軟件開發","硬件開發"等。接下來咱們也實現一個這個功能

//查找包含"cifs"的所有數據
func (Es *Elastic) FindKeyword() {
    //因為不確定cifs如何出現,可能是cifs01,也可能是01cifs,所以采用這種方法
    keyword := "cifs"
    keys := fmt.Sprintf("name:*%s*", keyword)
    boolQ.Filter(elastic.NewQueryStringQuery(keys))
     res, _ := Es.Client.Search("task").Type("doc").Query(boolQ).Do(context.Background())
        //從搜索結果中取數據的方法
        for _, item := range res.Each(reflect.TypeOf(typ)) {
            if t, ok := item.(Task); ok {
                fmt.Println(t)
            }
        }
}

多條件查詢

如果說我們現在不僅僅需要找到符合時間的,也需要找到符合關鍵字的查詢,那么就需要在查詢條件上做文章。

func (Es *Elastic) FindAll() {
    //因為不確定cifs如何出現,可能是cifs01,也可能是01cifs,所以采用這種方法
    keyword := "cifs"
    keys := fmt.Sprintf("name:*%s*", keyword)
    boolQ.Filter(elastic.NewRangeQuery("starttime").Gte("2020/05/13 18:38:21"), elastic.NewRangeQuery("starttime").Lte("2020/05/14 18:38:21"), elastic.NewQueryStringQuery(keys))
	res, err := Es.Client.Search("task").Type("doc").Query(boolQ).Do(context.Background())
        //從搜索結果中取數據的方法
        for _, item := range res.Each(reflect.TypeOf(typ)) {
            if t, ok := item.(Task); ok {
                fmt.Println(t)
            }
        }
}

統計數量/多條件統計數量

有些時候我們需要去統計符合查詢條件的結果數量,做統計用,這里也有直接可用的Sdk

func (Es *Elastic) GetTaskLogCount() (int, error) {
	boolQ := elastic.NewBoolQuery()
	boolQ.Filter(elastic.NewRangeQuery("starttime").Gte("2020/05/13 18:38:21"), elastic.NewRangeQuery("starttime").Lte("2020/05/14 18:38:21"))
	//統計count
	count, err := Es.Client.Count("task").Type("doc").Query(boolQ).Do(context.Background())
	if err != nil {
		return 0, nil
	}
	return int(count), nil
}

總結

我這邊完成了幾個查詢/導入的基礎功能,當然,我的代碼大部分都放置在了github當中
放置在: https://github.com/Alexanderklau/Go_poject/tree/master/Go-Elasticdb/Elasticsearch_sdk
最近項目比較忙,我打算月中寫一篇我開發的時候使用的一些Go特性,或者高級用法。如果喜歡的話麻煩Star我!最近壓力頗大,想要換一個地方生活,所以也要准備離開了。如果大家有什么問題,可以直接給我提問,我看到了就會幫助大家的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM