上一篇我們說了如何向elasticsearch中創建索引和插入數據(https://www.cnblogs.com/zpy1993-09/p/13380197.html)
今天在做一個延伸,上次說的是單行插入數據,如果數據量小的話還可以,如果大的話,那單行 插入就不能滿足需求了,必須要批量插入。如果做批量插入,一般都是做個緩存,到一定時間,或者一定條數然后批量插入一次,如果按一定時間那就是定時緩存,比如數據量很大是,10分鍾把緩存的數據批量插入。如果按條數那就是如果緩存1000條插入一次。具體情況要根據數據需求及時性了。
今天我們就說一下如何按一定條數批量插入。那么緩存應該怎么做呢,方式很多。你可以見一個消息隊列,或者棧,字典,數組都行。
為了方便理解,我今天就用大家最常見的List數組來做一個緩存。
由於我們是通過API接口往elasticsearch中批量送數據的,所以要做一個List數組的緩存,首先要考慮List的實例不能被覆蓋,也就是說每次調用一次接口插入數據,List數組的實例都要是同一個。不然每調用一個接口都要new 一個新的List對象,那還怎么做緩存 。所以我們首先要保證List實例一直唯一。所以我們都會想到單利,那就是單例模式。
由於接口一般都是通過json接收的,我們要解析json最好做一個模型的映射。
假設通過接口插入elasticsearch中的數據是學生信息。首先我們建一個簡單的學生類。
public class Student { public string name{get;set;}//姓名 public int number{get;set;}//學號 public int age{get;set;}//年齡 }
那我們就要定義一個Student的List數組的單例:
public class ListExample<Student> { private volatile static List<Student> instance = null; private static readonly object _lock = new object(); public static List<Student> GetInstance() { if (instance == null) { lock (_lock) { if (instance == null) { instance = new List<Student>(); } } } return instance; } }
這樣是不是就能保證List對象實例的唯一了,但是有些人會問了,這也太麻煩了,如果多了怎么辦,我總不能每一個接口我都要做一個單例吧,那豈不是太苦逼了。
的確是這樣,所以,我們要做一下優化,把這個單例方法做成通用模式,支持任何對象的實例。所以,我們自然而然的就想到泛型了。
public class ListExample<T> { private volatile static List<T> instance = null; private static readonly object _lock = new object(); public static List<T> GetInstance() { if (instance == null) { lock (_lock) { if (instance == null) { instance = new List<T>(); } } } return instance; } }
這樣不就通用了!
public static class ESHelper { public static readonly string url = "http://IP/"; /// <summary> /// 批量插入 /// </summary> /// <param name="obj">出入的數據</param> /// <param name="index">索引</param> public static void ManyInsert(List<Student> obj, string index) { //設置連接字符串,DefaultIndex中的表名要小寫 var settings = new ConnectionSettings(new Uri(url)).DefaultIndex(index); var client = new ElasticClient(settings); var ndexResponse = client.IndexMany<Student>(obj); } }
還是和單例一樣的問題,我們不可能不通用,所以我們要做的通用,還是泛型:
public static class ESHelper<T> where T : class { public static readonly string url = "http://IP/";//elasticsearch的IP /// <summary> /// 批量插入 /// </summary> /// <param name="obj">出入的數據</param> /// <param name="index">索引</param> public static void ManyInsert(List<T> obj, string index) { //設置連接字符串,DefaultIndex中的表名要小寫 var settings = new ConnectionSettings(new Uri(url)).DefaultIndex(index); var client = new ElasticClient(settings); var ndexResponse = client.IndexMany<T>(obj); } }
那我們可以通過向ES送入學生信息演示一下:
/// <summary> /// 向ES中送入數據 /// </summary> /// <param name="Data">json對象</param> /// <returns></returns> [HttpPost("GetLogList")] public string GetLogList([FromBody] object Data) { //json 數組格式:“{“name”:"張三”,"number",123456,"age":20}” JObject LogList = (JObject)JsonConvert.DeserializeObject(Data.ToString());
Student student=new Student{name=LogList["name"],number=LogList["number"],age=LogList["age"]}
ListExample<Student>.GetInstance().Add(student);
if(ListExample<Student>.GetInstance().Count==1000)
{
ESHelper<Student>.ManyInsert(ListExample<Student>.GetInstance(), "Student-" + DateTime.Now.ToString("yyyy-MM"));//批量插入ES中
ListExample<Student>.GetInstance().Clear();//清空緩存
}
}
這樣批量向ES數據完畢,是不是很簡單。但是通過一定條數批量插入式存在弊端的,比如2000條批量插入一次,如果另一邊調用接口,上傳了1000條數據,突然死掉的話,那么這緩存的1000條數據就送不到ES中了,所以最好用定時往ES中批量送數據,下一章我們說一下如何定時向ES中批量插入數據。