go操作elasticsearch


簡介:

  es包地址:https://github.com/olivere/elastic/ 這個版本被廣泛使用,我們也用這個。

  注意:es版本不同,要導入不同的包。6.0版本導入“github.com/olivere/elastic”

 

連接es:

var host  = "http://xxx.com:9201"
func es_init() {
   client, err := elastic.NewClient(
      elastic.SetURL(host),
      elastic.SetSniff(false),
      elastic.SetHealthcheckInterval(10*time.Second),
      elastic.SetGzip(true),
      elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)),
      elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags)))
   if err!= nil{
      panic(err)
   }
   fmt.Println("conn es succ",client)
}

  

連接參數詳解:

 1、SetHttpClient(*http.Client)允許您配置自己的http.Client和/或http.Transport(默認為http.DefaultClient);在許多彈性實例中使用相同的http.Client(即使使用http.DefaultClient)是一個好主意,以便有效地使用打開的TCP連接。

2、Seturl(…字符串)允許您指定要連接的URL(默認值是http://127.0.0.1:9200)。
3、StasBaseCuthe(用戶名,密碼字符串)允許您指定HTTP基本身份驗證詳細信息。使用這個,例如用盾牌。
4、SETSNIFF(BOOL)允許您指定彈性是否應該定期檢查集群(默認為真)。
5、StSnIFFEffTimeOUT(時間。持續時間)是嗅探節點彈出時間之前的時間(默認為2秒)。
6、StnSnFiffer-TimeOutExpLoT(時間。持續時間)是創建新客戶端時使用的嗅探超時。它通常比嗅探器超時大,並且證明對慢啟動有幫助(默認為5秒)。
7、StnSnIFFER間隔(時間。持續時間)允許您指定兩個嗅探器進程之間的間隔(默認為15分鍾)。
8、SetHealthcheck(bool)允許您通過嘗試定期連接到它的節點(默認為true)來指定Elastic是否將執行健康檢查。
9、SethalthCuffTimeExt(時間。持續時間)是健康檢查的超時時間(默認值為1秒)。
10、SethalthCuffTimeOutExtudio(時間。持續時間)是創建新客戶端時使用的健康檢查超時。它通常大於健康檢查超時,並可能有助於慢啟動(默認為5秒)。
11、sethealthcheckinterval(time.duration)指定間隔之間的兩個健康檢查(默認是60秒)。
12、SetDecoder(.ic.Decoder)允許您為來自Elasticsearch的JSON消息設置自己的解碼器(默認為&.ic.DefaultDecoder{})。
13、StError日志(*Log.LoggER)將日志記錄器設置為用於錯誤消息(默認為NIL)。錯誤日志將包含例如關於加入群集的節點或標記為“死亡”的消息。
14、SETIN FLUOG(*Log.LoggER)將記錄器設置為用於信息性消息(默認為NIL)。信息日志將包含例如請求和它們的響應時間。
15、StReTraceLoG(*Log.LoggER)設置用於打印HTTP請求和響應(默認為NIL)的記錄器。這有助於調試有線上正在發生的事情
16、StestRealdPuelin(插件…字符串)設置需要注冊的插件列表。彈性將設法在啟動時找到它們。如果沒有找到其中一個,則在啟動時會發現一個類型的彈性錯誤。
17、StReReTrice(…)設置用於處理失敗請求的重試策略。詳情請參閱重試和退避
18、SETGZIP(BOOL)啟用或禁用請求端的壓縮。默認情況下禁用。
 

打印queryLog

 我們在寫代碼調試的時候,有時候需要打印es的query log 看下請求json到底長什么樣。在new的時候我們需要傳一個 實現了 elasticsearch 這個包  Logger 接口的結構體進來。 下面看代碼:
type Dao struct {
	*elastic.Client
}
//tracelog 實現 elastic.Logger 接口
type tracelog struct{}
//實現輸出
func (tracelog) Printf(format string, v ...interface{}) {
	fmt.Printf(format, v...)
}
func NewDao(cfg *elasticsearch.ElasticConfig) (d *Dao) {
	var (
		err error
	)
	//實例化 es client
	instance, err := elasticsearch.New(cfg,elastic.SetTraceLog(new(tracelog)))
	if err != nil {
		panic(err)
	}

	d = &Dao{
		instance,
	}
	return
}

  

 

創建個Mapping:

 

client := es_init()
mapping := `{
   "settings":{
      "number_of_shards":1,
      "number_of_replicas":0
   },
   "mappings":{
      "tweet":{
         "properties":{
            "tags":{
               "type":"string"
            },
            "location":{
               "type":"geo_point"
            }
         }
      }
   }
}`
ctx := context.Background()
createIndex,err := client.CreateIndex("twitter").BodyString(mapping).Do(ctx)
if err != nil {
   panic(err)
}
if !createIndex.Acknowledged {
   fmt.Println("!createIndex.Acknowledged")
} else {
   fmt.Println("createIndex.Acknowledged")
}

  

簡單搜索

 

client := es_init()
ctx := context.Background()
var res *elastic.SearchResult
var err error
//實例化一個bool搜索器
boolQ := elastic.NewBoolQuery()
boolQ.Must(elastic.NewMatchQuery("rootCategory","鞋類"))  //一級類目必須是鞋類
boolQ.Filter(elastic.NewRangeQuery("sellPoint").Gt("0")) //銷量大於0
//打印查詢語句
q,_ := boolQ.Source()
PrintQuery(q)
//組裝查詢,查2條,對應的index 和 type
res,err = client.Search("shihuo_goods").Type("goods_v4").Query(boolQ).From(0).Size(2).Do(ctx)
if err != nil {
   panic(err)
}
//循環查到的數據並且以json方式輸出
for _,item := range res.Hits.Hits {
   //fmt.Printf("%+v",*item.Source)
   fmt.Println(string(*item.Source))
}

//自定義打印函數
func PrintQuery(src interface{}) {
   fmt.Println("*****")
   data, err := json.MarshalIndent(src, "", "  ")
   if err != nil {
      panic(err)
   }
   fmt.Println(string(data))
}

  

批量操作bulk:

數據庫都要支持批量執行的操作,如批量寫入。否則設想有一億條數據,如果一個一個插入並發滿了效率太低,並發高了數據庫負載扛不住。作為開發者好的習慣是在需要的時候應該一次性的寫入一批數據,減少對數據庫寫入頻率。在es里面也支持批量操作:這個「批量」定義要更泛化,不止是指一次多寫,還可以刪除更新等!

  

subjects := []Subject{
    Subject{
        ID:     1,
        Title:  "肖恩克的救贖",
        Genres: []string{"犯罪", "劇情"},
    },
    Subject{
        ID:     2,
        Title:  "千與千尋",
        Genres: []string{"劇情", "喜劇", "愛情", "戰爭"},
    },
}

bulkRequest := client.Bulk()
for _, subject := range subjects {
    doc := elastic.NewBulkIndexRequest().Index(indexName).Id(strconv.Itoa(subject.ID)).Doc(subject)
    bulkRequest = bulkRequest.Add(doc)
}

response, err := bulkRequest.Do(ctx)
if err != nil {
    panic(err)
}
failed := response.Failed()
l := len(failed)
if l > 0 {
    fmt.Printf("Error(%d)", l, response.Errors)
}

  

這樣就可以一次性的把2個記錄寫到es里面。再看一個復雜的例子:

 

subject3 := Subject{
    ID:     3,
    Title:  "這個殺手太冷",
    Genres: []string{"劇情", "動作", "犯罪"},
}
subject4 := Subject{
    ID:     4,
    Title:  "阿甘正傳",
    Genres: []string{"劇情", "愛情"},
}

subject5 := subject3
subject5.Title = "這個殺手不太冷"

index1Req := elastic.NewBulkIndexRequest().Index(indexName).Id("3").Doc(subject3)
index2Req := elastic.NewBulkIndexRequest().OpType("create").Index(indexName).Id("4").Doc(subject4)
delete1Req := elastic.NewBulkDeleteRequest().Index(indexName).Id("1")
update2Req := elastic.NewBulkUpdateRequest().Index(indexName).Id("3").
            Doc(subject5)

bulkRequest = client.Bulk()
bulkRequest = bulkRequest.Add(index1Req)
bulkRequest = bulkRequest.Add(index2Req)
bulkRequest = bulkRequest.Add(delete1Req)
bulkRequest = bulkRequest.Add(update2Req)

_, err = bulkRequest.Refresh("wait_for").Do(ctx)
if err != nil {
    panic(err)
}

if bulkRequest.NumberOfActions() == 0 {
    fmt.Println("Actions all clear!")
}

searchResult, err := client.Search().
    Index(indexName).
    Sort("id", false). // 按id升序排序
    Pretty(true).
    Do(ctx) // 執行
if err != nil {
    panic(err)
}
var subject Subject
for _, item := range searchResult.Each(reflect.TypeOf(subject)) {
    if t, ok := item.(Subject); ok {
        fmt.Printf("Found: Subject(id=%d, title=%s)\n", t.ID, t.Title)
    }
}

  

這個批量操作里面做了4件事:添加subject3(ID為3)、添加subject4(ID為4)、刪除ID為1的記錄、更新ID為三的記錄(subject5,在原來的subject3中Title故意寫錯了)。完成bulk操作之后通過搜索(無term條件,表示全部)驗證下當前es里面的全部文檔:

 

❯ go run bulk.go
Actions all clear!
Found: Subject(id=4, title=阿甘正傳)
Found: Subject(id=3, title=這個殺手不太冷)
Found: Subject(id=2, title=千與千尋)

  

可以看到ID3和ID4這2個文檔插入了,而ID3的條目標題被更新成正確的,ID1的條目被刪除了:這就是批量操作的效果。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM