Mysql大批量數據導入ElasticSearch


https://blog.csdn.net/u013850277/article/details/88904303?depth_1-utm_source=distribute.pc_relevant.none-task&utm_source=distribute.pc_relevant.none-task

注:筆者環境 ES6.6.2、linux centos6.9、mysql8.0、三個節點、節點內存64G、八核CPU
場景:

目前Mysql 數據庫數據量約10億,有幾張大表1億左右,直接在Mysql查詢出現各種效率問題,因此想着將數據導一份到ES,從而實現大數據快速檢索的功能。
通過Logstash插件批量導數據,個人感覺出現各種奇怪的問題,例如ES 內存暴滿,mysql 所在服務器內存暴,最主要的是在一次導數時不能所導的數據量不能太大。經過一次次試探Logstash與優化Logstash導數的最后,終於還是動手直接運用ES提供的api進行導數了。
目前直接模擬測試批量導數據,無論是通過Logstash還是ES 提供的Api峰值均能達到10萬每秒左右。下面上代碼,主要是通過官網提供的api (RestHighLevelClient、BulkProcessor)整理而來。目前由於Mysql 查詢出來的數據需要進行一些處理,基本可達到3.5萬+每秒。這個速度還有不小的優化空間,比如筆者通過稍微修改下述代碼,啟動幾個線程同時執行bulk多張表,從kibana界面看出速度達到了成倍的提升,因為速度已基本達到筆者所想要的,便不怎么進行優化代碼了。
一、Maven 配置如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>ElasticSearchDemo</groupId>
<artifactId>ElasticSearchDemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>ElasticSearchDemo</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.6.2</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>

</dependencies>
</project>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
二、代碼如下
package service;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import org.apache.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import utils.DBHelper;

/**
* @author Ye
* @time 2019年3月29日
*
* 類說明:通過BulkProcess批量將Mysql數據導入ElasticSearch中
*/

public class BulkProcessDemo {

private static final Logger logger = LogManager.getLogger(BulkProcessDemo.class);

public static void main(String[] args) {
try {
long startTime = System.currentTimeMillis();
String tableName = "testTable";
createIndex(tableName);
writeMysqlDataToES(tableName);

logger.info(" use time: " + (System.currentTimeMillis() - startTime) / 1000 + "s");
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}

/**
* 創建索引
* @param indexName
* @throws IOException
*/
public static void createIndex(String indexName) throws IOException {
RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("es01", 9200, "http")));
// ES 索引默認需要小寫,故筆者將其轉為小寫
CreateIndexRequest requestIndex = new CreateIndexRequest(indexName.toLowerCase());
// 注: 設置副本數為0,索引刷新時間為-1對大批量索引數據效率的提升有不小的幫助
requestIndex.settings(Settings.builder().put("index.number_of_shards", 5)
.put("index.number_of_replicas", 0)
.put("index.refresh_interval", "-1"));

// CreateIndexResponse createIndexResponse = client.indices().create(requestIndex, RequestOptions.DEFAULT);
client.close();
}

/**
* 將mysql 數據查出組裝成es需要的map格式,通過批量寫入es中
*
* @param tableName
*/
private static void writeMysqlDataToES(String tableName) {

RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(new HttpHost("eshost", 9200, "http")));// 初始化
BulkProcessor bulkProcessor = getBulkProcessor(client);
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;

try {
conn = DBHelper.getConn();
logger.info("Start handle data :" + tableName);

String sql = "SELECT * from " + tableName;

ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
rs = ps.executeQuery();

ResultSetMetaData colData = rs.getMetaData();

ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>();

// bulkProcessor 添加的數據支持的方式並不多,查看其api發現其支持map鍵值對的方式,故筆者在此將查出來的數據轉換成hashMap方式
HashMap<String, String> map = null;
int count = 0;
String c = null;
String v = null;
while (rs.next()) {
count++;
map = new HashMap<String, String>(128);
for (int i = 1; i <= colData.getColumnCount(); i++) {
c = colData.getColumnName(i);
v = rs.getString(c);
map.put(c, v);
}
dataList.add(map);
// 每10萬條寫一次,不足的批次的最后再一並提交
if (count % 100000 == 0) {
logger.info("Mysql handle data number : " + count);
// 將數據添加到 bulkProcessor 中
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(new IndexRequest(tableName.toLowerCase(), "gzdc", hashMap2.get("S_GUID"))
.source(hashMap2));
}
// 每提交一次便將map與list清空
map.clear();
dataList.clear();
}
}

// count % 100000 處理未提交的數據
for (HashMap<String, String> hashMap2 : dataList) {
bulkProcessor.add(
new IndexRequest(tableName.toLowerCase(), "gzdc", hashMap2.get("S_GUID")).source(hashMap2));
}

logger.info("-------------------------- Finally insert number total : " + count);
// 將數據刷新到es, 注意這一步執行后並不會立即生效,取決於bulkProcessor設置的刷新時間
bulkProcessor.flush();

} catch (Exception e) {
logger.error(e.getMessage());
} finally {
try {
rs.close();
ps.close();
conn.close();
boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
client.close();
logger.info(terminatedFlag);
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}

/**
* 創建bulkProcessor並初始化
* @param client
* @return
*/
private static BulkProcessor getBulkProcessor(RestHighLevelClient client) {

BulkProcessor bulkProcessor = null;
try {

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
logger.info("Try to insert data number : " + request.numberOfActions());
}

@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: "
+ executionId);
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId);
}
};

BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

// bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
// 注意點:在這里感覺有點坑,官網樣例並沒有這一步,而筆者因一時粗心也沒注意,在調試時注意看才發現,上面對builder設置的屬性沒有生效
bulkProcessor = builder.build();

} catch (Exception e) {
e.printStackTrace();
try {
bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
client.close();
} catch (Exception e1) {
logger.error(e1.getMessage());
}
}
return bulkProcessor;
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
數據庫連接類

package utils;

import java.sql.Connection;
import java.sql.DriverManager;

public class DBHelper {

public static final String url = "jdbc:mysql://xx.xx.xx.xx:3306/xxdemo?useSSL=true";
public static final String name = "com.mysql.cj.jdbc.Driver";
public static final String user = "xxx";
public static final String password = "xxxx";

public static Connection conn = null;

public static Connection getConn() {
try {
Class.forName(name);
conn = DriverManager.getConnection(url, user, password);//獲取連接
} catch (Exception e) {
e.printStackTrace();
}
return conn;
}
}


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
日志文件配置文件:log4j2.properties

property.filePath=logs
property.filePattern=logs/%d{yyyy}/%d{MM}
#\u8F93\u51FA\u683C\u5F0F
property.layoutPattern=%-d{yyyy-MM-dd HH:mm:ss SSS} [ %p ] [ %c ] %m%n
rootLogger.level = info


appender.console.type = Console
appender.console.name = STDOUT
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = ${layoutPattern}
rootLogger.appenderRef.stdout.ref = STDOUT

appender.I.type = RollingFile
appender.I.name = InfoRollingFile
appender.I.fileName = ${filePath}/es-info.log
appender.I.filePattern = ${filePattern}/es_info.log
appender.I.layout.type = PatternLayout
appender.I.layout.pattern = ${layoutPattern}
appender.I.policies.type = Policies
appender.I.policies.time.type = TimeBasedTriggeringPolicy
appender.I.policies.time.interval = 1
appender.I.policies.time.modulate = true
appender.I.policies.size.type = SizeBasedTriggeringPolicy
appender.I.policies.size.size=20M
appender.I.strategy.type = DefaultRolloverStrategy
appender.I.strategy.max = 100
#\u8FC7\u6EE4INFO\u4EE5\u4E0A\u4FE1\u606F
appender.I.filter.threshold.type = ThresholdFilter
appender.I.filter.threshold.level = WARN
appender.I.filter.threshold.onMatch = DENY
appender.I.filter.threshold.onMisMatch=NEUTRAL

rootLogger.appenderRef.I.ref = InfoRollingFile
rootLogger.appenderRef.I.level=INFO


appender.E.type = RollingFile
appender.E.name = ErrorRollingFile
appender.E.fileName = ${filePath}/es-error.log
appender.E.filePattern = ${filePattern}/es_error.log
appender.E.layout.type = PatternLayout
appender.E.layout.pattern = ${layoutPattern}
appender.E.policies.type = Policies
appender.E.policies.time.type = TimeBasedTriggeringPolicy
appender.E.policies.time.interval = 1
appender.E.policies.time.modulate = true
appender.E.policies.size.type = SizeBasedTriggeringPolicy
appender.E.policies.size.size=20M
appender.E.strategy.type = DefaultRolloverStrategy
appender.E.strategy.max = 100
#\u8FC7\u6EE4ERROR\u4EE5\u4E0A\u4FE1\u606F
appender.E.filter.threshold.type = ThresholdFilter
appender.E.filter.threshold.level = FATAL
appender.E.filter.threshold.onMatch = DENY
appender.E.filter.threshold.onMisMatch=NEUTRAL

rootLogger.appenderRef.E.ref = ErrorRollingFile
rootLogger.appenderRef.E.level=ERROR

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
核心代碼解說:

三、驗證上述代碼結果
1、起初筆者直接在本地Eclipse中運行,由於電腦配置所限制,寫入es 速度只能達到5000/s左右的樣子。

2、直接將代碼打包成可運行的jar,通過java -jar xxx.jar 方式在linux中運行,基本可以達到2.5萬/s,結果如下圖所示:

注:筆者下圖是將一張具有5400萬的數據,一次性導入ES,該表60多個字段

運行的部分日志如下:
2019-03-29 17:31:34 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2679
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Mysql handle data number : 13500000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2681
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2683
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2682
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:36 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2685
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2686
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2687
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2684
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2688
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2689
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2691
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2693
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2692
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2690
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] Try to insert data number : 5000
2019-03-29 17:31:37 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2696
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2694
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2695
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2697
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2698
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2699
2019-03-29 17:31:38 [ INFO ] [ service.BulkProcessDemo ] ************** Success insert data number : 5000 , id: 2700
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
簡單修改下相應參數進行bulk,查看效率
修改下每個批次上傳 10000條數據,setBulkSize 設置成300M,數據處理批次修改成20萬(if (count % 200000 == 0) ),其它配置一樣,處理另一張大表,效果如下:
該表數據量:該表27個字段
SELECT count(1) from xxxxxxxxx; – 171769567

全量bulk進es用時:5113s
2019-04-01 11:39:04.453 INFO 28080 — [nio-8080-exec-1] s.ElasticServiceImpl : use time: 5113s

es 監控如下圖

 

四、小結
筆者個人不深入的學習感覺 Logstash 對於這種需要一次性大批量將數據導入ES的需求適應性可能不太好。
大批量的數據導入ES 個人推薦BulkProcessor, 不論是什么數據,只需要將其轉換成Map鍵值對的格式便可運用bulkProcessor實現流式導入。
對於導入效率需要看集群的環境以及導入批次的設置,還有ES的相關優化配置。
五、遇到的小坑
問題一
由於表數據量太大,一開始本想以時間段(原數據庫中有時間相關的字段)分批查出數據再將其導入ES(在用Logstash插件導入時就是這么處理的,但由於數據在不同時間的分布情況很不一樣,在運用Logstash插件導入時經常因為某一時間段數據量太大而導致死機問題)。
解決:JDBC resltset中fetchSize的設置。
詳見:[正確使用MySQL JDBC setFetchSize()方法解決JDBC處理大結果集內在溢出]

問題二
通過官網提供的代碼設置了bulder 相關的屬性,比如每個批次提交多少數據,但程序運行起來發現這些設置並沒有生效。
官網代碼如下:
BulkProcessor bulkProcessor =
BulkProcessor.builder(bulkConsumer, listener).build();
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) ->
client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
BulkProcessor.Builder builder =
BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(500);
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
builder.setConcurrentRequests(0);
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
1
2
3
4
5
6
7
8
9
10
11
12
13
解決:其實該坑的出現主要是筆者個人在此太粗心了,還有就是太相信官網了。
在這個問題上稍微留意下上述代碼就可以看出 builder 雖然設置了,但並沒有將其設置后的值轉給那個類調用了,因此bulkProcessor還是使用了默認的配置。

BulkProcessor bulkProcessor =
BulkProcessor.builder(bulkConsumer, listener).build();
其實這一步可以換成如下(該bulkProcessor 的創建應在builder 屬性設置完成之后)
BulkProcessor bulkProcessor = builder.build();

# 完整代碼如下:
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
builder.setBulkActions(5000);
builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));
builder.setConcurrentRequests(10);
builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
bulkProcessor = builder.build();
1
2
3
4
5
6
7
8
9
10
11
12
13
在這里可能會感覺如何將bulkProcessor 轉給監聽者(listener),查看builder.build(); 瞄了下底層會發現其實該方法最終是調用如下方法的:

/**
* Builds a new bulk processor.
*/
public BulkProcessor build() {
return new BulkProcessor(consumer, backoffPolicy, listener, concurrentRequests, bulkActions,
bulkSize, flushInterval, scheduler, onClose, createBulkRequestWithGlobalDefaults());
}
1
2
3
4
5
6
7
完整代碼見:https://github.com/yechunbo/BigdataSearchPro.git

參考文檔:

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.6/java-rest-high-document-bulk.html
https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.6/_log4j_2_logger.html
https://docs.oracle.com/cd/E11882_01/java.112/e16548/resltset.htm#JJDBC28622
正確使用MySQL JDBC setFetchSize()方法解決JDBC處理大結果集內在溢出
JDBC讀取數據優化-fetch size
————————————————
版權聲明:本文為CSDN博主「在屋頂聽歌」的原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/u013850277/article/details/88904303


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM