logstash中將kafka數據直接存儲到es中


轉載自:https://help.aliyun.com/document_detail/169275.html

將消息隊列Kafka版接入阿里雲Elasticsearch

更新時間:2020-07-21 16:33:50

隨着時間的積累,消息隊列Kafka版中的日志數據會越來越多。當您需要查看並分析龐雜的日志數據時,可通過阿里雲Logstash將消息隊列Kafka版中的日志數據導入阿里雲Elasticsearch,然后通過Kibana進行可視化展示與分析。本文說明如何進行操作。

前提條件

在開始本教程前,請確保您已完成以下操作:

背景信息

通過阿里雲Logstash將數據從消息隊列Kafka版導入阿里雲Elasticsearch的過程如下圖所示。 elasticsearch
  • 消息隊列Kafka版

    消息隊列Kafka版是阿里雲提供的分布式、高吞吐、可擴展的消息隊列服務。消息隊列Kafka版廣泛用於日志收集、監控數據聚合、流式數據處理、在線和離線分析等大數據領域,已成為大數據生態中不可或缺的部分。詳情請參見什么是消息隊列Kafka版

  • 阿里雲Elasticsearch

    Elasticsearch簡稱ES,是一個基於Lucene的實時分布式的搜索與分析引擎,是遵從Apache開源條款的一款開源產品,是當前主流的企業級搜索引擎。它提供了一個分布式服務,可以使您快速的近乎於准實時的存儲、查詢和分析超大數據集,通常被用來作為構建復雜查詢特性和需求強大應用的基礎引擎或技術。阿里雲Elasticsearch支持5.5.3、6.3.2、6.7.0、6.8.0和7.4.0版本,並提供了商業插件X-Pack服務,致力於數據分析、數據搜索等場景服務。在開源Elasticsearch的基礎上提供企業級權限管控、安全監控告警、自動報表生成等功能。詳情請參見什么是阿里雲Elasticsearch

  • 阿里雲Logstash

    阿里雲Logstash作為服務器端的數據處理管道,提供了100%兼容開源的Logstash功能。Logstash能夠動態地從多個來源采集數據、轉換數據,並且將數據存儲到所選擇的位置。通過輸入、過濾和輸出插件,Logstash可以對任何類型的事件加工和轉換。詳情請參見什么是阿里雲Logstash

步驟一:獲取VPC環境接入點

阿里雲Logstash通過消息隊列Kafka版的接入點與消息隊列Kafka版在VPC環境下建立連接。

  1. 登錄消息隊列Kafka版控制台
  2. 在頂部菜單欄,選擇地域,例如華東1(杭州)。
  3. 在左側導航欄,單擊實例詳情。
  4. 實例詳情頁面,選擇要將數據導入阿里雲Elasticsearch的實例。
  5. 基本信息區域,獲取實例的VPC環境接入點。
    endpoint
    消息隊列Kafka版支持以下VPC環境接入點:
    • 默認接入點:端口號為9092。
    • SASL接入點:端口號為9094。如需使用SASL接入點,請開啟ACL。您可以提交工單申請開啟ACL。

    詳情請參見接入點對比

步驟二:創建Topic

創建用於存儲消息的Topic。

  1. 消息隊列Kafka版控制台的左側導航欄,單擊Topic管理。
  2. Topic管理頁面,單擊創建Topic。
  3. 創建Topic頁面,輸入Topic信息,然后單擊創建。
    create_topic

步驟三:發送消息

向創建的Topic發送消息。

  1. 消息隊列Kafka版控制台的Topic管理頁面,找到創建的Topic,在其右側操作列,單擊發送消息。
  2. 發送消息對話框,輸入消息信息,然后單擊發送。
    send_msg

步驟四:創建Consumer Group

創建阿里雲Elasticsearch所屬的Consumer Group。

  1. 消息隊列Kafka版控制台的左側導航欄,單擊Consumer Group管理。
  2. Consumer Group管理頁面,單擊創建Consumer Group。
  3. 創建Consumer Group頁面,輸入Consumer Group信息,然后單擊創建。
    create_cg

步驟五:創建索引

通過阿里雲Elasticsearch創建索引,接收消息隊列Kafka版的數據。

  1. 登錄阿里雲Elasticsearch控制台
  2. 在頂部菜單欄,選擇地域。
  3. 實例列表頁面,單擊創建的實例。
  4. 在左側導航欄,單擊可視化控制。
  5. Kibana區域,單擊進入控制台。
  6. 在Kibana登錄頁面,輸入Username和Password,然后單擊Log in。
     
    說明
    • Username為您創建阿里雲Elasticsearch實例時設置的用戶名。
    • Password為您創建阿里雲Elasticsearch實例時設置的密碼。
  7. 在Kibana控制台的左側導航欄,單擊Dev Tools。
  8. 執行以下命令創建索引。
     
    PUT /elastic_test
    {}

步驟六:創建管道

通過阿里雲Logstash創建管道。管道部署后,將源源不斷地從消息隊列Kafka版導入數據進阿里雲Elasticsearch。

  1. 登錄阿里雲Logstash控制台
  2. 在頂部菜單欄,選擇地域。
  3. 實例列表頁面,單擊創建的實例。
  4. 在左側導航欄,單擊管道管理。
  5. 管道列表區域,單擊創建管道。
  6. Config配置中,輸入配置。
    配置示例如下。
     
    input {
        kafka {
        bootstrap_servers => ["192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092"] group_id => "elastic_group" topics => ["elastic_test"] consumer_threads => 12 decorate_events => true } } output { elasticsearch { hosts => ["http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200"] index => "elastic_test" password => "XXX" user => "elastic" } }
    表 1. input參數說明
    參數 描述 示例值
    bootstrap_servers 消息隊列Kafka版的VPC環境接入點。 192.168.xx.xx:9092,192.168.xx.xx:9092,192.168.xx.xx:9092
    group_id Consumer Group的名稱。 elastic_group
    topics Topic的名稱。 elastic_test
    consumer_threads 消費線程數。建議與Topic的分區數保持一致。 12
    decorate_events 是否包含消息元數據。默認值為false。 true
    表 2. output參數說明
    參數 描述 示例值
    hosts 阿里雲Elasticsearch服務的訪問地址。您可在阿里雲Elasticsearch實例的基本信息頁面獲取。 http://es-cn-o40xxxxxxxxxxxxwm.elasticsearch.aliyuncs.com:9200
    index 索引的名稱。 elastic_test
    password 訪問阿里雲Elasticsearch服務的密碼。您在創建阿里雲Elasticsearch實例時設置的密碼。 XXX
    user 訪問阿里雲Elasticsearch服務的用戶名。您在創建阿里雲Elasticsearch實例時設置的用戶名。 elastic
  7. 管道參數配置中,輸入配置信息,然后單擊保存並部署。
  8. 提示對話框,單擊確認。

步驟七:搜索數據

您可以在Kibana控制台搜索通過管道導入阿里雲Elasticsearch的數據,確認數據是否導入成功。

  1. 登錄阿里雲Elasticsearch控制台
  2. 在頂部菜單欄,選擇地域。
  3. 實例列表頁面,單擊創建的實例。
  4. 在左側導航欄,單擊可視化控制。
  5. Kibana區域,單擊進入控制台。
  6. 在Kibana登錄頁面,輸入Username和Password,然后單擊Log in。
     
    說明
    • Username為您創建阿里雲Elasticsearch實例時設置的用戶名。
    • Password為您創建阿里雲Elasticsearch實例時設置的密碼。
  7. 在Kibana控制台的左側導航欄,單擊Dev Tools圖標。
  8. 執行以下命令搜索數據。
     
    GET /elastic_test/_search {}
    返回結果如下。 作為Input接入


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM