本次博文發兩塊,前部分是怎樣搭建一個Elastic集群,后半部分是基於Java對數據進行寫入和聚合統計。
一、Elastic集群搭建
1. 環境准備。
該集群環境基於VMware虛擬機、CentOS 7系統,公司目前用的服務器系統基本全是CentOS系統,因此就選了這個。Elasticsearch需要依賴的最低環境就是JDK8,且要配置好環境變量JAVA_HOME. Elasticsearch的安裝也可以查看官網給出的安裝說明。
虛擬機系統采用的是最小化安裝,沒有安裝桌面程序。安裝完程序再安裝JDK,配置環境變量即可。
2. 集群搭建。
2.1 安裝包解壓
下載完成后的Elastic包為elasticsearch-6.3.2.tar.gz,對其解壓。
# 將elastic包加壓到目錄 /data/elastic 下 tar zxvf elasticsearch-6.3.2.tar.gz -C /data/elastic
2.2 配置文件修改
解壓后的路徑為/data/elastic/elasticsearch-6.3.2,在/data/elastic目錄下新增兩個文件夾,為data,logs,其中data用來存儲節點數據,logs用來存儲日志,下面在修改配置文件中需要用到。修改config/elasticsearch.yml如下。
# 集群名稱 cluster.name: elastic_test # 節點名稱 node.name: node-1 # 數據目錄,剛才創建的data目錄 path.data: /data/elastic/data #日志路徑 ,剛才創建的logs目錄 path.logs: /data/elastic/logs #綁定地址,修改為任何機器都能訪問 network.host: 0.0.0.0 #端口,默認9200,不做修改 #http.port: 9200 # 集群節點,當節點啟動后平台就會發現 discovery.zen.ping.unicast.hosts: ["172.16.106.190", "172.16.106.191", "172.16.106.192"] # 最小主節點數量,配置2
# 該配置告訴ELasticsearch當沒有足夠的master候選節點的時候,不進行master節點選舉,等master節點足夠了才進行選舉 discovery.zen.minimum_master_nodes: 2
2.3 其他機器修改
修改完一台機器后,同樣其他兩台機器類似修改,注意把節點名稱改為不一樣的就可以了。
2.4 集群啟動
啟動說明:elasticsearch的啟動不能使用root用戶,所以要新建一個普通用戶。以下是具體操作。
# 新建一個用戶組為elasticgp groupadd elasticgp # 新建一個用戶名為elastic的用戶,並且歸屬到elasticgp用戶組 useradd -g elasticgp elastic # 給用戶設置密碼 passwd elastic # 上面已經減了一個文件夾,/data/elastic,該文件夾存儲了elastic軟件和數據目錄data及日志目錄logs # 現在將elastic目錄的歸屬組修改成elastgp chgrp -R elasticgp elastic/ # 將文件目錄/data/elastic所屬用戶修改為用戶elastic chown -R elastic elastic/
用戶配置好后切換到elastic用戶進行啟動程序。
# 切換到elastic用戶 su elastic # 切換到程序目錄下 cd /data/elastic/elasticsearch-6.3.2 # 后台啟動程序 ./bin/elasticsearch -d # 查看輸出日志 tailf ../logs/elastic_test.log
2.5 問題排查
啟動的時候可能會出現以下兩個問題

問題1:將當前用戶的軟硬限制調大
修改文件 /etc/security/limits.conf
# elastic用戶的軟限制 當然也可用*代替,標識修改所有用戶 elastic soft nofile 65535 # elastic用戶的硬限制 當然也可用*代替,標識修改所有用戶 elastic hard nofile 65537
問題2:修改/etc/sysctl.conf
vm.max_map_count=262144
問題3: 啟動內存設置
在內存不充足的情況下,可以修改elastic的初始內存,在/data/elastic/elasticsearch-6.3.2/config目錄下有配置文件
# 將內存使用設置為512M -Xms512M -Xmx512M
問題4:端口是否開放
elastic需要用到9200和9300兩個端口,可以用telnet來查看端口是否開放,以下是修改防火牆打開端口的命令。
集群中的節點通過端口 9300 彼此通信。如果這個端口沒有打開,節點將無法形成一個集群。
# 永久開放9200端口 firewall-cmd --permanent --zone=public --add-port=9200/tcp # 永久開放9300端口 firewall-cmd --permanent --zone=public --add-port=9300/tcp #重新加載防火牆配置,使開放端口生效 firewall-cmd --reload
2.6 集群狀態查看
如下圖,通過訪問某一個節點,查看所有的節點,其中node-1為主節點。

如下圖,查看集群健康狀態

以上為elasticsearch集群具體安裝過程。具體的API調用說明可以查看官網CAT_API和Cluster_APIs等等。
2.7 kibana 使用
集群搭建好之后,可用通過kibana來訪問集群的一個節點,然后做一下簡單的測試。先去官網下載kibana安裝包
https://www.elastic.co/downloads/kibana
我是下載的mac客戶端,其他客戶端應該也是一樣的。
解壓kibana安裝包后,在bin目錄下執行
# 查看kibana命令幫助
./bin/kibana -h
會看到如下提示:
Usage: bin/kibana [command=serve] [options] Kibana is an open source (Apache Licensed), browser based analytics and search dashboard for Elasticsearch. Commands: serve [options] Run the kibana server help <command> Get the help for a specific command "serve" Options: -h, --help output usage information -e, --elasticsearch <uri> Elasticsearch instance -c, --config <path> Path to the config file, can be changed with the CONFIG_PATH environment variable as well. Use multiple --config args to include multiple config files. -p, --port <port> The port to bind to -q, --quiet Prevent all logging except errors -Q, --silent Prevent all logging --verbose Turns on verbose logging -H, --host <host> The host to bind to -l, --log-file <path> The file to log to --plugin-dir <path> A path to scan for plugins, this can be specified multiple times to specify multiple directories --plugin-path <path> A path to a plugin which should be included by the server, this can be specified multiple times to specify multiple paths --plugins <path> an alias for --plugin-dir
其中我們需要用的就是 -e 參數,來連接指定的elasticsearch
# 啟動kibana,連接到制定的Elastic服務節點 ./bin/kibana -e http://172.16.106.201:9200
啟動成功后可以訪問localhost:5601,如下圖,點擊監控菜單看到集群的一些狀態信息。indices是索引數量,除了自己新建的索引Elasticsearch本身也有一些索引。

如下圖,點擊DevTools菜單,可以對集群節點上的數據進行查詢了。

二、Java客戶端對數據存儲和查詢
1. 客戶端配置,可以查看官網詳細配置
采用maven管理,添加依賴的pom配置即可
<!-- Java High Level REST Client --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.3.2</version> <exclusions> <exclusion> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </exclusion> </exclusions> </dependency> <!-- Client 包缺少一些東西,因此引入此包 可以具體查看ISSUE https://github.com/elastic/elasticsearch/issues/26959 --> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.2</version> </dependency>
2. 以下是測試主要代碼
package com.woasis.elastic.demo; import org.apache.http.HttpHost; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; @RestController public class IndexController { private static RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("172.16.106.201",9200, "http"), new HttpHost("172.16.106.202",9200, "http"), new HttpHost("172.16.106.203",9200, "http") ) ); /** * 向索引下增加數據 * @param indexName * @param type * @return */ @GetMapping("/putdata") public String putDataForIndex(String indexName, String type){ if (StringUtils.isEmpty(indexName)){ return "請指定索引名稱"; } SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); StringBuilder indexBuilder = new StringBuilder(); indexBuilder.append(indexName); indexBuilder.append("-"); indexBuilder.append(simpleDateFormat.format(new Date())); String fullIndexName = indexBuilder.toString(); System.out.println("索引名稱是:"+fullIndexName); Random random = new Random(); try { XContentBuilder contentBuilder = XContentFactory.jsonBuilder(); contentBuilder.startObject(); contentBuilder.field("name", "people"+System.currentTimeMillis()); contentBuilder.field("age", random.nextInt(30)); contentBuilder.field("createDate", new Date()); contentBuilder.endObject(); //索引請求 IndexRequest indexRequest = new IndexRequest(fullIndexName, type).source(contentBuilder); IndexResponse indexResponse = client.index(indexRequest); System.out.println(indexResponse.getIndex()); // client.close(); } catch (IOException e) { e.printStackTrace(); } return "SUCCESS"; } /** * 根據索引名稱,type,id獲取數據 * @return */ @GetMapping("/getdata") public String getData(){ //Get請求 GetRequest getRequest = new GetRequest("people-2018-07-31","student", "DfLL72QBGxN1JyvW1KG4"); try { GetResponse response = client.get(getRequest); System.out.println("index:"+response.getIndex()); System.out.println("type:"+response.getType()); System.out.println("id:"+response.getId()); System.out.println("sourceString:"+response.getSourceAsString()); } catch (IOException e) { e.printStackTrace(); } return "SUCCESS"; } /** * 搜索數據 * @return */ @GetMapping("/searchdata") public String searchData(){ //Search請求 SearchRequest searchRequest = new SearchRequest("people-2018-07-31"); //查詢過濾條件 SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(QueryBuilders.termQuery("name","people1533031470255")); searchRequest.source(searchSourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest); SearchHits searchHits = searchResponse.getHits(); for (SearchHit hit : searchHits){ System.out.println("index:"+hit.getIndex()); System.out.println("type:"+hit.getType()); System.out.println("id:"+hit.getId()); System.out.println("sourceString:"+hit.getSourceAsString()); } } catch (IOException e) { e.printStackTrace(); } return "SUCCESS"; } }
官方在各個API使用方式上都有詳細的講解,有用到的可以在官網查看。跳轉地址

該demo程序使用spring boot搭建,可以查看Github源碼https://github.com/liuzwei/elastic-demo
