elasticsearch使用BulkProcessor批量入庫數據


在解決es入庫問題上,之前使用過rest方式,經過一段時間的測試發現千萬級別的數據會存在10至上百條數據的丟失問題,

在需要保證數據的准確性的場景下,rest方式並不能保證結果的准確性,因此采用了elasticsearch的BulkProcessor方式來進行數據入庫,

實際上采用es客戶端不同,rest方式采用的是restClient,基於http協議,BulkProcessor使用的是TransportClient,基於Tcp協議。

 

 

下面是在spring下具體的實現步驟:
1 定義一個student類,並json序列化
2 json的具體實現
 

3 構造BulkProcessor

* setBulkActions(1000):每添加1000個request,執行一次bulk操作
* setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)):每達到5M的請求size時,執行一次bulk操作
* setFlushInterval(TimeValue.timeValueSeconds(5)):每5s執行一次bulk操作
* setConcurrentRequests(1):默認是1,表示積累bulk requests和發送bulk是異步的,其數值表示發送bulk的並發線程數,設置為0表示二者同步的
*setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)):當ES由於資源不足發生異常
EsRejectedExecutionException重試策略:默認(50ms, 8),
* 策略算法:start + 10 * ((int) Math.exp(0.8d * (currentlyConsumed)) - 1)

 

 
package es;
 
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.net.InetAddress;
import java.net.UnknownHostException;
 
@Configuration
public class ESConfiguration {
    public static final Logger logger = LoggerFactory.getLogger(ESConfiguration.class);
 
    @Bean
    public BulkProcessor bulkProcessor() throws UnknownHostException {
 
        Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build();
 
        Client client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("http://192.168.10.33"), Integer.parseInt("9300")));
 
        return BulkProcessor.builder(client, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long l, BulkRequest bulkRequest) {
 
            }
 
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, BulkResponse bulkResponse) {
 
            }
 
            @Override
            public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
                logger.error("{} data bulk failed,reason :{}", bulkRequest.numberOfActions(), throwable);
            }
 
        }).setBulkActions(1000)
                .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
                .setFlushInterval(TimeValue.timeValueSeconds(5))
                .setConcurrentRequests(1)
                .setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
                .build();
    }
}

  

4. 入庫代碼實現

package es;
 
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.index.IndexRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;
 
 
 
@Repository
public class StudentInsertDao{
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
    @Autowired
    private BulkProcessor bulkProcessor;
 
    private ObjectMapper objectMapper = new ObjectMapper();
 
    public void insert(Student student) {
        String type = student.getAge();
        String id = student.getName()+student.getAddr()+student.getAge();
        try {
            byte[] json = objectMapper.writeValueAsBytes(student);
 
            bulkProcessor.add(new IndexRequest("students", type, id).source(json));
 
 
        } catch (Exception e) {
            logger.error("bulkProcessor failed ,reason:{}",e);
        }
    }
}

  

5. 測試代碼

 

@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
@ContextConfiguration(locations = {"classpath:servlet-context.xml", "classpath:applicationContext.xml"})
public class StudentInsertDaoTest {    
    @Autowired
    private StudentInsertDao insertDao;
    
    @Test
    public void insert() throws Exception {
        
        Student student = new Student();
        student.setAge(12);
        student.setAddr("SH");
        student.setName("Jack");
        
        insertDao.insert(student);      
    }
}

  

 



原文鏈接:https://blog.csdn.net/wslyk606/article/details/79413980

 

Elasticsearch-BulkProcessor淺析:

https://blog.csdn.net/baichoufei90/article/details/97117025


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM