1. Logstash概述
Logstash的官網地址為:https://www.elastic.co/cn/products/logstash,以下是官方對Logstash的描述。
Logstash是與Flume類似,也是一種數據采集工具,區別在於組件和特性兩大方面。常用的數據采集工具有Sqoop、Flume、Logstash,計划將單獨寫一篇博文論述它們之間的區別,所以這里就不贅述,感興趣可關注后期的博文。
2. Kafka概述
Kafka的官網是:http://kafka.apache.org/,官方的介紹如下圖:
總結來說,Kafka是一個分布式消息隊列,具有生產者和消費者的功能,它依賴Zookeeper集群來保存meta數據,根據Topic來歸類存儲的消息,Kafka集群由多個實例組成,每個實例稱為broker。
3. ElasticSearch概述
ElasticSearch是一個分布式的搜索和數據分析引擎。它的官網是:https://www.elastic.co/cn/products/elasticsearch,官方對ElasticSearch的描述如下,通過官方的描述能夠對ElasticSearch有一個整體的了解。
3. 編程實戰
3.1 小項目介紹
在VM的linux本地logserver目錄下存有模擬數據data.log,啟動一個logstash監視Linux的logserver目錄的data.log日志文件,當日志文件發生了修改,將日志文件采集到Kafka消息隊列的名為logs的Topic中,另啟動一個logstash將Kafka的消息采集到ElashticSearch,使用ElasticSearch檢索數據。
3.2 開發環境
系統環境: VM中存在三台Linux機器(bigdata12,bigdata14,bigdata15)
軟件環境:kafka_2.11-0.9.0.1、zookeeper-3.4.10、elasticsearch-2.4.4、logstash-2.3.1
3.3 環境准備
1. 首先在三台機器開啟zookeeper,各機器運行zkServer.sh start,Linux下查看是否有然后使用zkServer.sh status查看zookeeper的狀態,如果看到leader和follower角色的出現就代表運行正常。
2. 三台啟動Kafka,到kafka目錄下,運行 nohup bin/kafka-server-start.sh conf/serverproperties.conf。使用
3. 使用非root用戶啟動elasticsearch,使用非root用戶進入elasticsearch目錄執行: bin/elasticsearch -d
【注意】,必須是非root用戶,否則會報錯。如果沒有,就創建一個用戶。
例如創建一個用戶為zhou的話,執行:
(1) 添加用戶:useradd bigdata,
(2) 為用戶添加密碼 :echo 123456 | passwd --stdin zhou,
(3) 將zhou添加到sudoers: echo "bigdata ALL = (root) NOPASSWD:ALL" | tee /etc/sudoers.d/zhou
(4) 修改權限: chmod 0440 /etc/sudoers.d/zhou
(5) 從root切換成zhou: su - zhou
(6) 然后再執行啟動elasticsearch命令
4. 檢查進程運行情況
在Linux環境下執行jps命令查看進程是否正常啟動,每台機器查看是否有以下進程
在elasticsearch安裝了head的前提下,在windows環境開啟瀏覽器,在地址欄輸入http://ip地址:9200/_plugin/head ,例如,根據我的配置,輸入了http://192.168.243.11:9200/_plugin/head。出現以下界面,表示Elasticsearch啟動正常
在以上環節確認后,就代表環境啟動運行正常,可以進行正常開發程序。
3.4 開發
3.4.1 編寫logstash配置
在bigdata12機器中進入logstash的conf目錄:
vi dataTokafka.conf
1 input { 2 file { 3 codec => plain { 4 charset => "UTF-8" 5 } 6 path => "/root/logserver/supernova.log" 7 discover_interval => 5 8 start_position => "beginning" 9 } 10 } 11 12 output { 13 kafka { 14 topic_id => "supernova" 15 codec => plain { 16 format => "%{message}" 17 charset => "UTF-8" 18 } 19 bootstrap_servers => "bigdata12:9092,bigdata14:9092,bigdata15:9092" 20 } 21 }
在bigdata14機器中進入logstash的conf目錄:
vi dataToElastic.conf
input { kafka { type => "supernova" auto_offset_reset => "smallest" codec => "plain" group_id => "elas2" topic_id => "supernova" zk_connect => "bigdata12:2181,bigdata14:2181,bigdata15:2181" } } filter { if [type] == "supernova" { mutate { split => { "message" => "|" } add_field => { "id" => "%{message[0]}" "time" => "%{message[1]}" "ip" => "%{message[2]}" "user" => "%{message[3]}" } remove_field => [ "message" ] } } } output { if [type] == "supernova" { elasticsearch { index => "supernova" codec => plain { charset => "UTF-16BE" } hosts => ["bigdata12:9200", "bigdata14:9200", "bigdata15:9200"] } } }
3.4.2 運行
(1) 在bigdata12機器中,使用3.4.1中的dataTokakfa.conf啟動logstash。執行:bin/logstash -f conf/dataTokakfa.conf,監聽supernova.log文件
(2) 在bigdata14機器中,使用3.4.1中的dataToElastic.conf啟動logstach。執行:bin/logstash -f conf/dataToElastic.conf,將Kafka數據采集到Elasticsearch。
(3) 為了便於觀察,在bigdata15機器中,啟動kafka消費者,查看Topic中的數據。執行:bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic logs,用於消費Kafka中Topic名為logs的消息。
(4) 編輯修改Logstash監聽的supernova.log文件。
啟動】:
【修改】在bigdata15中修改了數據(右下角窗口)
【監視過程】:bigdata15中(右上),kafka的consumer消費到了supernova.log文件中的數據,在bigdata14中,可以看到將數據傳至ElasticSearch的數據(左下)
【ElasticSeach結果】
可以看到Elastic集群中,產生了一個supernova的type(類似關系數據庫中的table)
【查看ElasticSearch數據】
3.4.2 ElasticSearch檢索
使用Junit單元測試的方法來編寫測試方法,代碼如下:
EalsticSearch.java
package novaself; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.junit.Before; import org.junit.Test; import java.net.InetAddress; import java.util.Iterator; /** * @author Supernova * @date 2018/06/22 */ public class ElasticSearch { private Client client; /** * 獲取客戶端 */ @Before public void getClient() throws Exception { // ElasticSearch服務默認端口9300 Settings settings = Settings.settingsBuilder() .put("cluster.name", "bigdata").build(); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("bigdata12"), 9300)) .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("bigdata14"), 9300)) .addTransportAddress(new InetSocketTransportAddress( InetAddress.getByName("bigdata15"), 9300)); } /** * 詞條查詢: 用戶名中有"新"字的數據 */ @Test public void testTermQuery(){ /* * termQuery詞條查詢: 只匹配指定字段中含有該詞條的文檔 * 查詢user字段為超新星的記錄 */ SearchResponse response = client.prepareSearch("supernova") .setTypes("supernova") .setQuery(QueryBuilders.termQuery("user","新")) .get(); // 獲取結果集對象、命中數 SearchHits hits = response.getHits(); // 使用迭代器遍歷數據 Iterator<SearchHit> iter = hits.iterator(); while(iter.hasNext()){ SearchHit hit = iter.next(); // 以Json格式輸出 String result = hit.getSourceAsString(); System.out.println(result); } //關閉客戶端 client.close(); } /** * 模糊查詢: 星期四的數據 */ @Test public void testWildcardQuery() throws Exception{ /* * wildcardQuery模糊查詢,time字段中包含"四"的數據 */ SearchResponse response = client.prepareSearch("supernova") .setTypes("supernova") .setQuery(QueryBuilders.wildcardQuery("time","四")) .get(); // 獲取結果集對象、命中數 SearchHits hits = response.getHits(); // 使用迭代器遍歷數據 Iterator<SearchHit> iter = hits.iterator(); while(iter.hasNext()){ SearchHit hit = iter.next(); // 以Json格式輸出 String result = hit.getSourceAsString(); System.out.println(result); } //關閉客戶端 client.close(); } }
【檢索結果】:
詞條查詢:testTermQuery( )方法的運行結果:
模糊查詢:testWildcardQuery ( )方法的運行結果: