簡介:
es包地址:https://github.com/olivere/elastic/ 這個版本被廣泛使用,我們也用這個。
注意:es版本不同,要導入不同的包。6.0版本導入“github.com/olivere/elastic”
連接es:
var host = "http://xxx.com:9201" func es_init() { client, err := elastic.NewClient( elastic.SetURL(host), elastic.SetSniff(false), elastic.SetHealthcheckInterval(10*time.Second), elastic.SetGzip(true), elastic.SetErrorLog(log.New(os.Stderr, "ELASTIC ", log.LstdFlags)), elastic.SetInfoLog(log.New(os.Stdout, "", log.LstdFlags))) if err!= nil{ panic(err) } fmt.Println("conn es succ",client) }
連接參數詳解:
1、SetHttpClient(*http.Client)允許您配置自己的http.Client和/或http.Transport(默認為http.DefaultClient);在許多彈性實例中使用相同的http.Client(即使使用http.DefaultClient)是一個好主意,以便有效地使用打開的TCP連接。
2、Seturl(…字符串)允許您指定要連接的URL(默認值是http://127.0.0.1:9200)。
3、StasBaseCuthe(用戶名,密碼字符串)允許您指定HTTP基本身份驗證詳細信息。使用這個,例如用盾牌。
4、SETSNIFF(BOOL)允許您指定彈性是否應該定期檢查集群(默認為真)。
5、StSnIFFEffTimeOUT(時間。持續時間)是嗅探節點彈出時間之前的時間(默認為2秒)。
6、StnSnFiffer-TimeOutExpLoT(時間。持續時間)是創建新客戶端時使用的嗅探超時。它通常比嗅探器超時大,並且證明對慢啟動有幫助(默認為5秒)。
7、StnSnIFFER間隔(時間。持續時間)允許您指定兩個嗅探器進程之間的間隔(默認為15分鍾)。
8、SetHealthcheck(bool)允許您通過嘗試定期連接到它的節點(默認為true)來指定Elastic是否將執行健康檢查。
9、SethalthCuffTimeExt(時間。持續時間)是健康檢查的超時時間(默認值為1秒)。
10、SethalthCuffTimeOutExtudio(時間。持續時間)是創建新客戶端時使用的健康檢查超時。它通常大於健康檢查超時,並可能有助於慢啟動(默認為5秒)。
11、sethealthcheckinterval(time.duration)指定間隔之間的兩個健康檢查(默認是60秒)。
12、SetDecoder(.ic.Decoder)允許您為來自Elasticsearch的JSON消息設置自己的解碼器(默認為&.ic.DefaultDecoder{})。
13、StError日志(*Log.LoggER)將日志記錄器設置為用於錯誤消息(默認為NIL)。錯誤日志將包含例如關於加入群集的節點或標記為“死亡”的消息。
14、SETIN FLUOG(*Log.LoggER)將記錄器設置為用於信息性消息(默認為NIL)。信息日志將包含例如請求和它們的響應時間。
15、StReTraceLoG(*Log.LoggER)設置用於打印HTTP請求和響應(默認為NIL)的記錄器。這有助於調試有線上正在發生的事情
16、StestRealdPuelin(插件…字符串)設置需要注冊的插件列表。彈性將設法在啟動時找到它們。如果沒有找到其中一個,則在啟動時會發現一個類型的彈性錯誤。
17、StReReTrice(…)設置用於處理失敗請求的重試策略。詳情請參閱重試和退避
18、SETGZIP(BOOL)啟用或禁用請求端的壓縮。默認情況下禁用。
打印queryLog
我們在寫代碼調試的時候,有時候需要打印es的query log 看下請求json到底長什么樣。在new的時候我們需要傳一個 實現了 elasticsearch 這個包 Logger 接口的結構體進來。 下面看代碼:
type Dao struct { *elastic.Client } //tracelog 實現 elastic.Logger 接口 type tracelog struct{} //實現輸出 func (tracelog) Printf(format string, v ...interface{}) { fmt.Printf(format, v...) } func NewDao(cfg *elasticsearch.ElasticConfig) (d *Dao) { var ( err error ) //實例化 es client instance, err := elasticsearch.New(cfg,elastic.SetTraceLog(new(tracelog))) if err != nil { panic(err) } d = &Dao{ instance, } return }
創建個Mapping:
client := es_init() mapping := `{ "settings":{ "number_of_shards":1, "number_of_replicas":0 }, "mappings":{ "tweet":{ "properties":{ "tags":{ "type":"string" }, "location":{ "type":"geo_point" } } } } }` ctx := context.Background() createIndex,err := client.CreateIndex("twitter").BodyString(mapping).Do(ctx) if err != nil { panic(err) } if !createIndex.Acknowledged { fmt.Println("!createIndex.Acknowledged") } else { fmt.Println("createIndex.Acknowledged") }
簡單搜索
client := es_init() ctx := context.Background() var res *elastic.SearchResult var err error //實例化一個bool搜索器 boolQ := elastic.NewBoolQuery() boolQ.Must(elastic.NewMatchQuery("rootCategory","鞋類")) //一級類目必須是鞋類 boolQ.Filter(elastic.NewRangeQuery("sellPoint").Gt("0")) //銷量大於0 //打印查詢語句 q,_ := boolQ.Source() PrintQuery(q) //組裝查詢,查2條,對應的index 和 type res,err = client.Search("shihuo_goods").Type("goods_v4").Query(boolQ).From(0).Size(2).Do(ctx) if err != nil { panic(err) } //循環查到的數據並且以json方式輸出 for _,item := range res.Hits.Hits { //fmt.Printf("%+v",*item.Source) fmt.Println(string(*item.Source)) } //自定義打印函數 func PrintQuery(src interface{}) { fmt.Println("*****") data, err := json.MarshalIndent(src, "", " ") if err != nil { panic(err) } fmt.Println(string(data)) }
批量操作bulk:
數據庫都要支持批量執行的操作,如批量寫入。否則設想有一億條數據,如果一個一個插入並發滿了效率太低,並發高了數據庫負載扛不住。作為開發者好的習慣是在需要的時候應該一次性的寫入一批數據,減少對數據庫寫入頻率。在es里面也支持批量操作:這個「批量」定義要更泛化,不止是指一次多寫,還可以刪除更新等!
subjects := []Subject{ Subject{ ID: 1, Title: "肖恩克的救贖", Genres: []string{"犯罪", "劇情"}, }, Subject{ ID: 2, Title: "千與千尋", Genres: []string{"劇情", "喜劇", "愛情", "戰爭"}, }, } bulkRequest := client.Bulk() for _, subject := range subjects { doc := elastic.NewBulkIndexRequest().Index(indexName).Id(strconv.Itoa(subject.ID)).Doc(subject) bulkRequest = bulkRequest.Add(doc) } response, err := bulkRequest.Do(ctx) if err != nil { panic(err) } failed := response.Failed() l := len(failed) if l > 0 { fmt.Printf("Error(%d)", l, response.Errors) }
這樣就可以一次性的把2個記錄寫到es里面。再看一個復雜的例子:
subject3 := Subject{ ID: 3, Title: "這個殺手太冷", Genres: []string{"劇情", "動作", "犯罪"}, } subject4 := Subject{ ID: 4, Title: "阿甘正傳", Genres: []string{"劇情", "愛情"}, } subject5 := subject3 subject5.Title = "這個殺手不太冷" index1Req := elastic.NewBulkIndexRequest().Index(indexName).Id("3").Doc(subject3) index2Req := elastic.NewBulkIndexRequest().OpType("create").Index(indexName).Id("4").Doc(subject4) delete1Req := elastic.NewBulkDeleteRequest().Index(indexName).Id("1") update2Req := elastic.NewBulkUpdateRequest().Index(indexName).Id("3"). Doc(subject5) bulkRequest = client.Bulk() bulkRequest = bulkRequest.Add(index1Req) bulkRequest = bulkRequest.Add(index2Req) bulkRequest = bulkRequest.Add(delete1Req) bulkRequest = bulkRequest.Add(update2Req) _, err = bulkRequest.Refresh("wait_for").Do(ctx) if err != nil { panic(err) } if bulkRequest.NumberOfActions() == 0 { fmt.Println("Actions all clear!") } searchResult, err := client.Search(). Index(indexName). Sort("id", false). // 按id升序排序 Pretty(true). Do(ctx) // 執行 if err != nil { panic(err) } var subject Subject for _, item := range searchResult.Each(reflect.TypeOf(subject)) { if t, ok := item.(Subject); ok { fmt.Printf("Found: Subject(id=%d, title=%s)\n", t.ID, t.Title) } }
這個批量操作里面做了4件事:添加subject3(ID為3)、添加subject4(ID為4)、刪除ID為1的記錄、更新ID為三的記錄(subject5,在原來的subject3中Title故意寫錯了)。完成bulk操作之后通過搜索(無term條件,表示全部)驗證下當前es里面的全部文檔:
❯ go run bulk.go Actions all clear! Found: Subject(id=4, title=阿甘正傳) Found: Subject(id=3, title=這個殺手不太冷) Found: Subject(id=2, title=千與千尋)
可以看到ID3和ID4這2個文檔插入了,而ID3的條目標題被更新成正確的,ID1的條目被刪除了:這就是批量操作的效果。