【大數據實戰】Logstash采集->Kafka->ElasticSearch檢索


 1. Logstash概述

  Logstash的官網地址為:https://www.elastic.co/cn/products/logstash,以下是官方對Logstash的描述。

 Logstash是與Flume類似,也是一種數據采集工具,區別在於組件和特性兩大方面。常用的數據采集工具有Sqoop、Flume、Logstash,計划將單獨寫一篇博文論述它們之間的區別,所以這里就不贅述,感興趣可關注后期的博文。

2. Kafka概述

  Kafka的官網是:http://kafka.apache.org/,官方的介紹如下圖:

  總結來說,Kafka是一個分布式消息隊列,具有生產者和消費者的功能,它依賴Zookeeper集群來保存meta數據,根據Topic來歸類存儲的消息,Kafka集群由多個實例組成,每個實例稱為broker。

3. ElasticSearch概述

 ElasticSearch是一個分布式的搜索和數據分析引擎。它的官網是:https://www.elastic.co/cn/products/elasticsearch,官方對ElasticSearch的描述如下,通過官方的描述能夠對ElasticSearch有一個整體的了解。

3. 編程實戰

 3.1 小項目介紹

 在VM的linux本地logserver目錄下存有模擬數據data.log,啟動一個logstash監視Linux的logserver目錄的data.log日志文件,當日志文件發生了修改,將日志文件采集到Kafka消息隊列的名為logs的Topic中,另啟動一個logstash將Kafka的消息采集到ElashticSearch,使用ElasticSearch檢索數據。

  

 3.2 開發環境

 系統環境: VM中存在三台Linux機器(bigdata12,bigdata14,bigdata15)

 軟件環境:kafka_2.11-0.9.0.1、zookeeper-3.4.10、elasticsearch-2.4.4、logstash-2.3.1

 3.3 環境准備

 1. 首先在三台機器開啟zookeeper,各機器運行zkServer.sh start,Linux下查看是否有然后使用zkServer.sh status查看zookeeper的狀態,如果看到leader和follower角色的出現就代表運行正常。 

 2. 三台啟動Kafka,到kafka目錄下,運行 nohup bin/kafka-server-start.sh conf/serverproperties.conf。使用

 3. 使用非root用戶啟動elasticsearch,使用非root用戶進入elasticsearch目錄執行: bin/elasticsearch -d

 注意,必須是非root用戶,否則會報錯。如果沒有,就創建一個用戶。

    例如創建一個用戶為zhou的話,執行:

  (1) 添加用戶:useradd bigdata,

  (2) 為用戶添加密碼 :echo 123456 | passwd --stdin zhou,

  (3) 將zhou添加到sudoers: echo "bigdata ALL = (root) NOPASSWD:ALL" | tee /etc/sudoers.d/zhou

  (4) 修改權限: chmod 0440 /etc/sudoers.d/zhou

  (5) 從root切換成zhou: su - zhou 

  (6) 然后再執行啟動elasticsearch命令

 4. 檢查進程運行情況

  在Linux環境下執行jps命令查看進程是否正常啟動,每台機器查看是否有以下進程

  

 在elasticsearch安裝了head的前提下,在windows環境開啟瀏覽器,在地址欄輸入http://ip地址:9200/_plugin/head ,例如,根據我的配置,輸入了http://192.168.243.11:9200/_plugin/head。出現以下界面,表示Elasticsearch啟動正常

 

 在以上環節確認后,就代表環境啟動運行正常,可以進行正常開發程序。

 3.4 開發

 3.4.1 編寫logstash配置

 在bigdata12機器中進入logstash的conf目錄:

 vi dataTokafka.conf

 1 input {
 2   file {
 3         codec => plain {
 4         charset => "UTF-8"
 5     }
 6     path => "/root/logserver/supernova.log"
 7     discover_interval => 5
 8     start_position => "beginning"
 9   }
10 }
11 
12 output {
13     kafka {
14           topic_id => "supernova"
15           codec => plain {
16           format => "%{message}"
17           charset => "UTF-8"
18       }
19           bootstrap_servers => "bigdata12:9092,bigdata14:9092,bigdata15:9092"
20     }
21 }

 在bigdata14機器中進入logstash的conf目錄:

  vi dataToElastic.conf

input {
  kafka {
    type => "supernova"
    auto_offset_reset => "smallest"
    codec => "plain"
    group_id => "elas2"
    topic_id => "supernova"
    zk_connect => "bigdata12:2181,bigdata14:2181,bigdata15:2181"
  }
}
filter {
  if [type] == "supernova" {
    mutate {
      split => { "message" => "|" }
      add_field => {
                "id" => "%{message[0]}"
                "time" => "%{message[1]}"
                "ip" => "%{message[2]}"
                "user" => "%{message[3]}"
     }
     remove_field => [ "message" ]
   }
  }
}
output {
  if [type] == "supernova" {
    elasticsearch {
      index => "supernova"
      codec => plain {
        charset => "UTF-16BE"
      }
      hosts => ["bigdata12:9200", "bigdata14:9200", "bigdata15:9200"]
    }
  }
}

 3.4.2 運行

 (1) 在bigdata12機器中,使用3.4.1中的dataTokakfa.conf啟動logstash。執行:bin/logstash -f conf/dataTokakfa.conf,監聽supernova.log文件

 (2) 在bigdata14機器中,使用3.4.1中的dataToElastic.conf啟動logstach。執行:bin/logstash -f conf/dataToElastic.conf,將Kafka數據采集到Elasticsearch。

 (3) 為了便於觀察,在bigdata15機器中,啟動kafka消費者,查看Topic中的數據。執行:bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic logs,用於消費Kafka中Topic名為logs的消息。

 (4) 編輯修改Logstash監聽的supernova.log文件。

 啟動】:

 

 【修改】在bigdata15中修改了數據(右下角窗口)

 

【監視過程】:bigdata15中(右上),kafka的consumer消費到了supernova.log文件中的數據,在bigdata14中,可以看到將數據傳至ElasticSearch的數據(左下)

【ElasticSeach結果】

  可以看到Elastic集群中,產生了一個supernova的type(類似關系數據庫中的table)

 

 【查看ElasticSearch數據】

 3.4.2 ElasticSearch檢索

    使用Junit單元測試的方法來編寫測試方法,代碼如下:

 EalsticSearch.java

package novaself;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.junit.Before;
import org.junit.Test;

import java.net.InetAddress;
import java.util.Iterator;

/**
 * @author Supernova
 * @date 2018/06/22
 */
public class ElasticSearch  {

    private Client client;

    /**
     * 獲取客戶端
     */
    @Before
    public void getClient() throws Exception {
        // ElasticSearch服務默認端口9300
        Settings settings = Settings.settingsBuilder()
                .put("cluster.name", "bigdata").build();
        client = TransportClient.builder().settings(settings).build()
                .addTransportAddress(new InetSocketTransportAddress(
                        InetAddress.getByName("bigdata12"), 9300))
                .addTransportAddress(new InetSocketTransportAddress(
                        InetAddress.getByName("bigdata14"), 9300))
                .addTransportAddress(new InetSocketTransportAddress(
                        InetAddress.getByName("bigdata15"), 9300));
    }

    /**
     * 詞條查詢: 用戶名中有"新"字的數據
     */
    @Test
    public void testTermQuery(){
        /*
         * termQuery詞條查詢: 只匹配指定字段中含有該詞條的文檔
         * 查詢user字段為超新星的記錄
         */
        SearchResponse response = client.prepareSearch("supernova")
                .setTypes("supernova")
                .setQuery(QueryBuilders.termQuery("user","新"))
                .get();

        // 獲取結果集對象、命中數
        SearchHits hits = response.getHits();
        // 使用迭代器遍歷數據
        Iterator<SearchHit> iter = hits.iterator();
        while(iter.hasNext()){
            SearchHit hit = iter.next();
            // 以Json格式輸出
            String result = hit.getSourceAsString();
            System.out.println(result);
        }

        //關閉客戶端
        client.close();
    }
    /**
     * 模糊查詢: 星期四的數據
     */
    @Test
    public void testWildcardQuery() throws Exception{
        /*
         * wildcardQuery模糊查詢,time字段中包含"四"的數據
         */
        SearchResponse response = client.prepareSearch("supernova")
                .setTypes("supernova")
                .setQuery(QueryBuilders.wildcardQuery("time","四"))
                .get();


        // 獲取結果集對象、命中數
        SearchHits hits = response.getHits();
        // 使用迭代器遍歷數據
        Iterator<SearchHit> iter = hits.iterator();
        while(iter.hasNext()){
            SearchHit hit = iter.next();
            // 以Json格式輸出
            String result = hit.getSourceAsString();
            System.out.println(result);
        }

        //關閉客戶端
        client.close();
    }
}

【檢索結果】:

  詞條查詢:testTermQuery( )方法的運行結果:

 模糊查詢:testWildcardQuery ( )方法的運行結果:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM