Elasticsearch Java Client連接池


按照Elasticsearch API,在Java端使用是ES服務需要創建Java Client,但是每一次連接都實例化一個client,對系統的消耗很大,即使在使用完畢之后將client close掉,由於服務器不能及時回收socket資源,極端情況下會導致服務器達到最大連接數。

為了解決上述問題並提高client利用率,可以參考使用池化技術復用client。

 1 import java.io.IOException;
 2 import java.net.InetSocketAddress;
 3 import java.util.ArrayList;
 4 import java.util.HashMap;
 5 import java.util.List;
 6 import java.util.Map;
 7 import java.util.concurrent.ConcurrentHashMap;
 8 
 9 import org.elasticsearch.client.Client;
10 import org.elasticsearch.common.settings.Settings;
11 import org.elasticsearch.common.transport.InetSocketTransportAddress;
12 import org.elasticsearch.common.xcontent.XContentBuilder;
13 import org.elasticsearch.common.xcontent.XContentFactory;
14 import org.elasticsearch.index.mapper.Mapping;
15 import org.elasticsearch.transport.client.PreBuiltTransportClient;
16 
17 import com.chinadigitalvideo.esagent.servlet.WebServiceInit;
18 
19 /**
20  * Created by tgg on 16-3-17.
21  */
22 public class ClientHelper {
23     private static String ip;
24     private static int port;
25 
26     private Settings setting;
27     private Mapping mapping;
28     
29     private Map<String, Client> clientMap = new ConcurrentHashMap<String, Client>();
30 
31     private Map<String, Integer> ips = new HashMap<String, Integer>(); // hostname port
32 
33     private String clusterName = WebServiceInit.clusterName;
34 
35     private ClientHelper(String ip,Integer port) {
36         init(ip,port);
37         //TO-DO 添加你需要的client到helper
38     }
39 
40     public static final ClientHelper getInstance(String ipConf ,Integer portConf) {
41         ip=ipConf;
42         port=portConf;
43         return ClientHolder.INSTANCE;
44     }
45 
46     private static class ClientHolder {
47         private static final ClientHelper INSTANCE = new ClientHelper(ip,port);
48     }
49 
50     /**
51      * 初始化默認的client
52      */
53     public void init(String ip,int port) {
54         
55         ips.put(ip, port);
56         setting =Settings.builder()
57                 .put("client.transport.sniff",true)
58                 .put("cluster.name",clusterName).build();
59         addClient(setting, getAllAddress(ips));
60     }
61 
62     /**
63      * 獲得所有的地址端口
64      *
65      * @return
66      */
67     public List<InetSocketTransportAddress> getAllAddress(Map<String, Integer> ips) {
68         List<InetSocketTransportAddress> addressList = new ArrayList<InetSocketTransportAddress>();
69         for (String ip : ips.keySet()) {
70             addressList.add(new InetSocketTransportAddress(new InetSocketAddress(ip, ips.get(ip))));
71         }
72         return addressList;
73     }
74 
75     public Client getClient() {
76         return getClient(clusterName);
77     }
78 
79     public Client getClient(String clusterName) {
80         return clientMap.get(clusterName);//通過集群名稱得到一個Client
81     }
82 
83     public void addClient(Settings setting, List<InetSocketTransportAddress> transportAddress) {
84         Client client = new PreBuiltTransportClient(setting)
85                 .addTransportAddresses(transportAddress.toArray(new InetSocketTransportAddress[transportAddress.size()]));
86         
87         clientMap.put(setting.get("cluster.name"), client);
88     }
89 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM