如何把數據快速批量添加到Elasticsearch中
問題來源
最近新做一個項目,有部分搜索比較頻繁的數據,而且量級比較大,預計一兩年時間很可能達到100G,項目要求不要存在數據庫中,最終出來有兩個方案,一個是使用Protocol Buffers存儲在文件上,另外就是存在Elasticsearch中,也方便搜索,但這兩個方案需要驗證,到底哪個方案好,從存儲速度,搜索響應,占用空間方面做對比,而我負責給出Elasticsearch的部分技術建議!
驗證需求
1、數據量:初步只算52億條
2、寫數據速度:需要超過1W條每秒
遇到問題以及解決辦法
而在驗證過程中遇到了無論是使用Elasticsearch.Net或者PlainElastic.Net來寫數據,並且是使用了Bulk的api,加上多線程,都是太慢了,粗略算了一下,大概一秒插入3千條左右,這樣的話,52億條數據,得插到何年何月啊,太慢了,根據查閱資料,網上也有人說插入數據還是挺快 的,一秒可以插入18w條,但具體也沒說是用什么辦法插入的,所以只能到官方看看了,發現用REST API的_bulk來批量插入,這樣速度明顯快了,可以達到5到10w條每秒,速度還可以,但問題是這方法是先定義一定格式的json文件,然后再用curl命令去執行Elasticsearch的_bulk來批量插入,所以得把數據寫進json文件,然后再通過批處理,執行文件插入數據,另外在生成json文件,文件不能過大,過大會報錯,所以建議生成10M一個文件,然后分別去執行這些小文件就可以了,說了這么多都是文字,真的有點暈乎乎的,看圖吧!

json數據文件內容的定義
|
1
2
3
4
5
6
7
8
9
10
|
{
"index"
:{
"_index"
:
"meterdata"
,
"_type"
:
"autoData"
}}
{
"Mfid "
:1,
"TData"
:172170,
"TMoney"
:209,
"HTime"
:
"2016-05-17T08:03:00"
}
{
"index"
:{
"_index"
:
"meterdata"
,
"_type"
:
"autoData"
}}
{
"Mfid "
:1,
"TData"
:172170,
"TMoney"
:209,
"HTime"
:
"2016-05-17T08:04:00"
}
{
"index"
:{
"_index"
:
"meterdata"
,
"_type"
:
"autoData"
}}
{
"Mfid "
:1,
"TData"
:172170,
"TMoney"
:209,
"HTime"
:
"2016-05-17T08:05:00"
}
{
"index"
:{
"_index"
:
"meterdata"
,
"_type"
:
"autoData"
}}
{
"Mfid "
:1,
"TData"
:172170,
"TMoney"
:209,
"HTime"
:
"2016-05-17T08:06:00"
}
{
"index"
:{
"_index"
:
"meterdata"
,
"_type"
:
"autoData"
}}
{
"Mfid "
:1,
"TData"
:172170,
"TMoney"
:209,
"HTime"
:
"2016-05-17T08:07:00"
}
|
批處理內容的定義
|
1
2
3
4
5
6
7
|
cd E:\curl-7.50.3-win64-mingw\bin
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\437714060.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\743719428.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\281679894.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\146257480.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\892018760.json
pause
|
工具代碼
1 private void button1_Click(object sender, EventArgs e)
2 {
3 //Application.StartupPath + "\\" + NextFile.Name
4 Task.Run(() => { CreateDataToFile(); });
5 }
6 public void CreateDataToFile()
7 {
8 StringBuilder sb = new StringBuilder();
9 StringBuilder sborder = new StringBuilder();
10 int flag = 1;
11 sborder.Append(@"cd E:\curl-7.50.3-win64-mingw\bin" + Environment.NewLine);
12 DateTime endDate = DateTime.Parse("2016-10-22");
13 for (int i = 1; i <= 10000; i++)//1w個點
14 {
15 DateTime startDate = DateTime.Parse("2016-10-22").AddYears(-1);
16 this.Invoke(new Action(() => { label1.Text = "生成第" + i + "個"; }));
17
18 while (startDate <= endDate)//每個點生成一年數據,每分鍾一條
19 {
20 if (flag > 100000)//大於10w分割一個文件
21 {
22 string filename = new Random(GetRandomSeed()).Next(900000000) + ".json";
23
24 FileStream fs3 = new FileStream(Application.StartupPath + "\\testdata\\" + filename, FileMode.OpenOrCreate);
25 StreamWriter sw = new StreamWriter(fs3, Encoding.GetEncoding("GBK"));
26 sw.WriteLine(sb.ToString());
27 sw.Close();
28 fs3.Close();
29 sb.Clear();
30 flag = 1;
31 sborder.Append(@"curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\" + filename + Environment.NewLine);
32
33 }
34 else
35 {
36 sb.Append("{\"index\":{\"_index\":\"meterdata\",\"_type\":\"autoData\"}}" + Environment.NewLine);
37 sb.Append("{\"Mfid \":" + i + ",\"TData\":" + new Random().Next(1067500) + ",\"TMoney\":" + new Random().Next(1300) + ",\"HTime\":\"" + startDate.ToString("yyyy-MM-ddTHH:mm:ss") + "\"}" + Environment.NewLine);
38 flag++;
39 }
40 startDate = startDate.AddMinutes(1);//
41 }
42
43 }
44 sborder.Append("pause");
45 FileStream fs1 = new FileStream(Application.StartupPath + "\\testdata\\order.bat", FileMode.OpenOrCreate);
46 StreamWriter sw1 = new StreamWriter(fs1, Encoding.GetEncoding("GBK"));
47 sw1.WriteLine(sborder.ToString());
48 sw1.Close();
49 fs1.Close();
50 MessageBox.Show("生成完畢");
51
52 }
53 static int GetRandomSeed()
54 {//隨機生成不重復的編號
55 byte[] bytes = new byte[4];
56 System.Security.Cryptography.RNGCryptoServiceProvider rng = new System.Security.Cryptography.RNGCryptoServiceProvider();
57 rng.GetBytes(bytes);
58 return BitConverter.ToInt32(bytes, 0);
59 }
總結
本次測試結果,發現Elasticsearch的搜索速度是挺快的,生成過程中,在17億數據時查了一下,根據Mid和時間在幾個月范圍的數據,查十條數據兩秒多完成查詢,而且同一查詢條件查詢越多,查詢就越快,應該是Elasticsearch緩存了,52億條數據,大概占用500G空間左右,還是挺大的,相比Protocol Buffers存儲的數據,要大三倍左右,但搜索速度還是比較滿意的。

(有問題,大家多多交流,加群163259145,加群請記得說:博客園)
你只看到我在不停的忙碌,卻沒看到我奮斗的熱情。你有朝九晚五,我有通宵達旦。你否定我的現在,我決定我的未來。你可以輕視我的存在,我會用代碼證明這是誰的時代!Coding是注定痛苦的旅行,路上少不了Bug和Change,但!那又怎樣?哪怕執行不了,也要編譯得漂亮!我是屌絲程序猿,我為自己代言.

