golang 操作es 批量索引數據 Bulk


減少開銷 提高效率 現有130萬條數據 一條一條索引的話需要不停的跑需要兩天左右 如果使用bulk 五分鍾就完事兒了 

 

func IndexPrice() {
	es := tool.ES{
		Index: "financials.us.gama",
		Type:  "esstockprice",
	}
	var MaxId int
	MaxId = 0
	var price *model.PriceRecord
	// 每次取兩萬條數據索引兩萬條 記錄最大id 下一次根據maxid查詢
	for {
		bulkRequest := tool.Client.Bulk()
		// 每次只取2萬條數據
		priceList := price.GetPrice2(MaxId)
		if len(priceList) == 0 {
			break
		}
		var upDateList []int
		for _, i := range priceList {
			upDateList = append(upDateList, i.ID)
			if i.ID > MaxId {
				MaxId = i.ID
			}
			id := i.Symbol + "_" + i.DateTime
			var esPrice *ESPrice
			esPrice = &ESPrice{
				ShortName:    i.Symbol,
				TradeDate:    i.DateTime,
				StockPriceId: id,
				IsTradeDate:  1,
				Open:         i.Open,
				Close:        i.Close,
				Volume:       i.Volume,
				High:         i.High,
				Low:          i.Low,
			}
			req := elastic.NewBulkIndexRequest().
				Index(es.Index).
				Type(es.Type).
				Id(id).
				Doc(esPrice)
			bulkRequest = bulkRequest.Add(req)
		}
		bulkResponse, err := bulkRequest.Do(context.Background())
		if err != nil {
			fmt.Println(err)
		}else{
			// 索引成功就修改數據庫狀態 is_index = 1
			if !price.UpdatePriceList(upDateList){
				fmt.Println("更新數據庫狀態錯誤")
			}
		}
		fmt.Println("耗時:",bulkResponse.Took, "索引了:",len(bulkResponse.Items))
	}
}

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM