ES transport client批量導入


從bulk.txt文件中按行讀取,然后bulk導入。首先通過調用client.prepareBulk()實例化一個BulkRequestBuilder對象,調用BulkRequestBuilder對象的add方法添加數據。實現代碼:

import java.io.BufferedReader; import java.io.File; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; public class ElasticSearchBulkIn { public static void main(String[] args) { try { Settings settings = Settings.settingsBuilder() .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置 Client client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("127.0.0.1"), 9300)); File article = new File("files/bulk.txt"); FileReader fr=new FileReader(article); BufferedReader bfr=new BufferedReader(fr); String line=null; BulkRequestBuilder bulkRequest=client.prepareBulk(); int count=0; while((line=bfr.readLine())!=null){ bulkRequest.add(client.prepareIndex("test","article").setSource(line)); if (count%10==0) { bulkRequest.execute().actionGet(); } count++; //System.out.println(line); } bulkRequest.execute().actionGet(); bfr.close(); fr.close(); } catch (UnknownHostException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }

setSource里其實就是json的字符串!!!!
見:http://www.cnblogs.com/bonelee/p/6956138.html

Settings settings=ImmutableSettings.settingsBuilder()  
        .put("client.transport.sniff",true).put("cluster.name","myelasticsearch").build();  
//設置客戶端連接transport  
        Client client=new TransportClient(settings).addTransportAddress(  
                new InetSocketTransportAddress("192.168.1.100",9300));  
//建立批量提交類  
BulkRequestBuilder bulkRequest=client.prepareBulk();  
                while(rs.next()){  
//建立批量json對象  
                    bulkRequest.add(client.prepareIndex("ryxx","tweet",rs.getString("id")).setSource(jsonBuilder().startObject() .field("name",rs.getString("name"))  
                            .field("age",rs.getString("age"))  
                            .field("address",rs.getString("address"))  
                            .field("phone",rs.getString("phone")) .endObject()  
                    ));  
                }  
//批量提交到服務器  
                BulkResponse bulkResponse=bulkRequest.execute().actionGet();  
//提交過程是否產生錯誤  
                if(bulkResponse.hasFailures()){  
                    System.out.println(bulkResponse.buildFailureMessage());  
                      
                }  

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM