ELK+kafka+filebeat搭建生產ELFK集群


文章原文

ELK 架構介紹

集群服務版本

服務 版本
java 1.8.0_221
elasticsearch 7.10.1
filebeat 7.10.1
kibana 7.10.1
logstash 7.10.1
cerebro 0.9.2-1
kafka 2.12-2.3.0
zookeeper 3.5.6

服務器環境說明

IP地址 主機名 配置 角色
10.0.11.172 elk-master 4C16G es-master、kafka+zookeeper1
10.0.21.117 elk-node1 4C16G es-node1、kafka+zookeeper2
10.0.11.208 elk-node2 4C16G es-node2、kafka+zookeeper3
10.0.10.242 elk-kibana 4C16G logstash、kibana、cerebro

系統參數優化

{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}

修改主機名

hostnamectl set-hostname elk-master
hostnamectl set-hostname elk-node1
hostnamectl set-hostname elk-node2

增加文件描述符

cat >>/etc/security/limits.conf<< EOF
*               soft      nofile          65536
*               hard      nofile          65536
*               soft      nproc           65536
*               hard      nproc           65536
*               hard      memlock         unlimited
*               soft      memlock         unlimited
EOF

修改默認限制內存

cat >>/etc/systemd/system.conf<< EOF
DefaultLimitNOFILE=65536
DefaultLimitNPROC=32000
DefaultLimitMEMLOCK=infinity
EOF

優化內核,對es支持

cat >>/etc/sysctl.conf<< EOF
# 關閉交換內存
vm.swappiness =0
# 影響java線程數量,建議修改為262144或者更高
vm.max_map_count= 262144
# 優化內核listen連接
net.core.somaxconn=65535
# 最大打開文件描述符數,建議修改為655360或者更高
fs.file-max=655360
# 開啟ipv4轉發
net.ipv4.ip_forward= 1
EOF

修改Hostname配置文件

cat >>/etc/hosts<< EOF
elk-master  10.0.11.172
elk-node1   10.0.21.117
elk-node2   10.0.11.208
EOF

重啟使配置生效

reboot

部署Zookeeper

{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}

創建Zookeeper項目目錄

#存放快照日志
mkdir zkdata 
#存放事物日志
mkdir zklogs 

下載解壓zookeeper

wget  http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz 
mv apache-zookeeper-3.5.6-bin zookeeper

修改配置文件

[root@elk-master zookeeper]# cat conf/zoo.cfg  |grep  -v ^#
# 服務器之間或客戶端與服務器之間維持心跳的時間間隔
# tickTime以毫秒為單位。
tickTime=2000 
# 集群中的follower服務器(F)與leader服務器(L)之間的初始連接心跳數
initLimit=10
# 集群中的follower服務器與leader服務器之間請求和應答之間能容忍的最多心跳數
syncLimit=5
# 數據保存目錄
dataDir=../zkdata
# 日志保存目錄
dataLogDir=../zklogs
# 客戶端連接端口
clientPort=2181
# 客戶端最大連接數。# 根據自己實際情況設置,默認為60個
maxClientCnxns=60
# 三個接點配置,格式為: server.服務編號=服務地址、LF通信端口、選舉端口
server.1=10.0.11.172:2888:3888
server.2=10.0.21.117:2888:3888
server.3=10.0.11.208:2888:3888

寫入節點標記

{{< notice warning "注意" >}}
分別在三個節點/home/tools/zookeeper/zkdata/myid寫入節點標記
{{< /notice >}}

{{< tabs master節點 node1節點 node2節點 >}}
{{< tab >}}

master的操作
echo "1" > /home/tools/zookeeper/zkdata/myid

{{< /tab >}}
{{< tab >}}

node1的操作
echo "2" > /home/tools/zookeeper/zkdata/myid

{{< /tab >}}
{{< tab >}}

node2的操作
echo "3" > /home/tools/zookeeper/zkdata/myid

{{< /tab >}}
{{< /tabs >}}

啟動zookeeper集群

[root@elk-master zookeeper]# cd /home/tools/zookeeper/bin/
[root@elk-master bin]# ./zkServer.sh start 
ZooKeeper JMX enabled by default
Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

檢查集群狀態

[root@elk-master bin]# sh /home/tools/zookeeper/bin/zkServer.sh status 
ZooKeeper JMX enabled by default
Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

設置全局變量

cat >>/etc/profile<< EOF
export ZOOKEEPER_INSTALL=/home/tools/zookeeper/
export PATH=$PATH:$ZOOKEEPER_INSTALL/bin
export PATH
EOF
  • 使配置生效
source /etc/profile

這樣就可以全局使用zkServer.sh命令了

部署 Kafka

{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}

下載解壓kafka壓縮包

[root@elk-master tools]# mkdir kafka
[root@elk-master tools]# cd kafka/
[root@elk-master kafka]# wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
[root@elk-master kafka]# tar xf kafka_2.12-2.3.0.tgz 
[root@elk-master kafka]# mv kafka_2.12-2.3.0 kafka
[root@elk-master kafka]# cd kafka/config/

配置kafka

[root@elk-master config]# cat /home/tools/kafka/kafka/config/server.properties
############################# Server Basics ############################# 
# broker的id,值為整數,且必須唯一,在一個集群中不能重復
broker.id=1

############################# Socket Server Se:ttings ############################# 
# kafka默認監聽的端口為9092 (默認與主機名進行連接)
listeners=PLAINTEXT://:9092

# 處理網絡請求的線程數量,默認為3個
num.network.threads=3

# 執行磁盤IO操作的線程數量,默認為8個 
num.io.threads=8

# socket服務發送數據的緩沖區大小,默認100KB
socket.send.buffer.bytes=102400

# socket服務接受數據的緩沖區大小,默認100KB
socket.receive.buffer.bytes=102400

# socket服務所能接受的一個請求的最大大小,默認為100M
socket.request.max.bytes=104857600

############################# Log Basics ############################# 
# kafka存儲消息數據的目錄
log.dirs=../kfkdata

# 每個topic默認的partition數量
num.partitions=3

# 在啟動時恢復數據和關閉時刷新數據時每個數據目錄的線程數量
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy ############################# 

# 消息刷新到磁盤中的消息條數閾值
#log.flush.interval.messages=10000

# 消息刷新到磁盤中的最大時間間隔,1s
#log.flush.interval.ms=1000

############################# Log Retention Policy ############################# 

# 日志保留小時數,超時會自動刪除,默認為7天
log.retention.hours=168

# 日志保留大小,超出大小會自動刪除,默認為1G
#log.retention.bytes=1073741824

# 日志分片策略,單個日志文件的大小最大為1G,超出后則創建一個新的日志文件
log.segment.bytes=1073741824

# 每隔多長時間檢測數據是否達到刪除條件,300s
log.retention.check.interval.ms=300000

############################# Zookeeper ############################# 
# Zookeeper連接信息,如果是zookeeper集群,則以逗號隔開
zookeeper.connect=10.0.11.172,10.0.21.117,10.0.11.208

# 連接zookeeper的超時時間,6s
zookeeper.connection.timeout.ms=6000

創建數據存儲的目錄

[root@elk-master config]# mkdir ../kfkdata

修改broker.id

{{< notice warning "注意" >}}
分別在三個節點依次修改/home/tools/kafka/kafka/config/server.properties配置文件
{{< /notice >}}
{{< tabs master節點 node1節點 node2節點 >}}
{{< tab >}}

master的配置
broker.id=1

{{< /tab >}}
{{< tab >}}

node1的配置
broker.id=2

{{< /tab >}}
{{< tab >}}

node2的配置
broker.id=3

{{< /tab >}}
{{< /tabs >}}

啟動kafka集群

cd /home/tools/kafka/kafka/bin/
#啟動測試
./kafka-server-start.sh ../config/server.properties
#放入后台
./kafka-server-start.sh -daemon ../config/server.properties

測試

{{< notice warning "注意" >}}
任意節點均可執行
{{< /notice >}}
在創建topic在集群中的任意節點 發布消息訂閱消息驗證結果

{{< tabs 創建topic 消息發布 topic消息訂閱 >}}
{{< tab >}}

[root@elk-master bin]# ./kafka-topics.sh \
--create \
--zookeeper 10.0.11.172:2181,10.0.21.117:2181,10.0.11.208:2181 \
--partitions 3 \
--replication-factor 1 \
--topic logs

{{< /tab >}}
{{< tab >}}

[root@elk-master bin]# ./kafka-console-producer.sh \
--broker-list 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \
--topic logs

{{< /tab >}}
{{< tab >}}

[root@elk-master bin]#  ./kafka-console-consumer.sh \
--bootstrap-server 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \
--topic logs \
--from-beginning

{{< /tab >}}
{{< /tabs >}}

部署elasticsearch

{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}

下載安裝elasticsearch

wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.1-x86_64.rpm
[root@elk-master package]#  rpm -ivh elasticsearch-7.10.1-x86_64.rpm 

備份配置文件

cd /etc/elasticsearch
cp elasticsearch.yml  elasticsearch.yml.bak

修改配置文件

cat >/etc/elasticsearch/elasticsearch.yml << EOF
#集群名
cluster.name: elk-cluster

#node名
node.name: elk-1

#數據存儲路徑
path.data: /home/elasticsearch/esdata

#數據快照路徑
path.repo: /home/backup/essnapshot

#日志存儲路徑
path.logs: /home/elasticsearch/eslogs

#es綁定的ip地址,根據自己機器ip進行修改
network.host: 0.0.0.0

#服務端口
http.port: 9200

#集群master需要和node名設置一致
discovery.seed_hosts: ["10.0.11.172", "10.0.21.117", "10.0.11.208"]
cluster.initial_master_nodes: ["10.0.11.172","10.0.21.117","10.0.11.208"]

#允許跨域請求
http.cors.enabled: true
#* 表示支持所有域名
http.cors.allow-origin: "*"
#添加請求header
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type

#生產必須為true,內存鎖定檢查,目的是內存地址直接映射,減少一次copy時間
bootstrap.memory_lock: true
#系統過濾檢查,防止數據損壞,考慮集群安全,生產設置成false
bootstrap.system_call_filter: false

#xpack配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12
EOF

修改JVM

  • jvm.options文件中22-23行的8g設置為你的服務內存的一半
[root@elk-node1 elasticsearch]# cat -n  jvm.options |grep 8g
    22  -Xms8g
    23  -Xmx8g

修改其他節點配置

{{< notice warning "注意" >}}
分別在三個節點修改/etc/elasticsearch/elasticsearch.yml配置文件
{{< /notice >}}

{{< tabs master節點 node1節點 node2節點 >}}
{{< tab >}}

master的配置
node.name: "es-master"

{{< /tab >}}
{{< tab >}}

node1的配置
node.name: "es-node1"

{{< /tab >}}
{{< tab >}}

node2的配置
node.name: "es-node2"

{{< /tab >}}
{{< /tabs >}}
最終展示

分配權限

因為自定義數據、日志存儲目錄,所以要把權限給到目錄

mkdir  -p /home/elasticsearch/{esdata,eslogs}
chown  elasticsearch:elasticsearch  /home/elasticsearch/*
mkdir  -p /home/backup/essnapshot
chown elasticsearch:elasticsearch /home/backup/essnapshot

啟動服務

三個節點全部啟動並加入開機啟動

systemctl start elasticsearch
systemctl enable  elasticsearch

使用xpack進行安全認證

xpack的安全功能

  • TLS 功能。 可對通信進行加密
  • 文件和原生 Realm。 可用於創建和管理用戶
  • 基於角色的訪問控制。 可用於控制用戶對集群 API 和索引的訪問權限
  • 通過針對 Kibana Spaces 的安全功能,還可允許在Kibana 中實現多租戶。
    在我配置過程中,發現集群認證需要首先配置秘鑰才行,否則在給內置用戶創建秘鑰的時候將會報錯。
    {{< notice warning "error" >}}
    Cause: Cluster state has not been recovered yet, cannot write to the [null] index
    {{< /notice >}}
Unexpected response code [503] from calling PUT http://10.0.11.172:9200/_security/user/apm_system/_password?pretty
Cause: Cluster state has not been recovered yet, cannot write to the [null] index

Possible next steps:
* Try running this tool again.
* Try running with the --verbose parameter for additional messages.
* Check the elasticsearch logs for additional error details.
* Use the change password API manually. 

ERROR: Failed to set password for user [apm_system].

申請證書

{{< notice warning "注意" >}}
下面的操作,在其中一個節點操作即可
{{< /notice >}}

/usr/share/elasticsearch/bin/elasticsearch-certutil ca
/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12

兩條命令均一路回車即可,不需要給秘鑰再添加密碼
證書創建完成之后,默認在es的數據目錄。
將證書拷貝到etc下,並給上權限。

[root@elk-master ~]# ls /usr/share/elasticsearch/elastic-*
/usr/share/elasticsearch/elastic-certificates.p12
/usr/share/elasticsearch/elastic-stack-ca.p12
cp /usr/share/elasticsearch/elastic-* /etc/elasticsearch/
chown elasticsearch.elasticsearch /etc/elasticsearch/elastic*

做完之后,將證書拷貝到其他節點

為內置賬號添加密碼

ES中內置了幾個管理其他集成組件的賬號apm_system, beats_system, elastic, kibana, logstash_system, remote_monitoring_user使用之前,首先需要設置下密碼。

/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y
Enter password for [elastic]:
Reenter password for [elastic]:
Enter password for [apm_system]:
Reenter password for [apm_system]:
Enter password for [kibana]:
Reenter password for [kibana]:
Enter password for [logstash_system]:
Reenter password for [logstash_system]:
Enter password for [beats_system]:
Reenter password for [beats_system]:
Enter password for [remote_monitoring_user]:
Reenter password for [remote_monitoring_user]:
Changed password for user [apm_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]

部署Curator

下載安裝

wget https://github.com/lmenezes/cerebro/releases/download/v0.9.2/cerebro-0.9.2-1.noarch.rpm
rpm -ivh cerebro-0.9.2-1.noarch.rpm 

修改配置文件

修改/etc/cerebro/application.conf配置文件
找到對應配置修改為以下內容
{{< codes 修改內容一 修改內容二>}}
{{}}

data.path: "/var/lib/cerebro/cerebro.db"
#data.path = "./cerebro.db"

{{}}
{{}}

hosts = [
  #{
  #  host = "http://localhost:9200"
  #  name = "Localhost cluster"
  #  headers-whitelist = [ "x-proxy-user", "x-proxy-roles", "X-Forwarded-For" ]
  #}
  # Example of host with authentication
  {
    host = "http://10.0.11.172:9200"
    name = "elk-cluster"
    auth = {
      username = "elastic"
      password = "123"
    }
  }
]

{{}}
{{}}

報錯

{{< notice warning "error" >}}
cerebro[8073]: No java installations was detected.
{{< /notice >}}
啟動服務后報錯No java,但是我的環境是有JAVA的。也做了全局變量
感覺很奇怪...

解決方法

{{< notice success "解決方法" >}}
在啟動服務文件中加入JAVA_HOME
{{< /notice >}}

  • 找到服務啟動文件/usr/share/cerebro/bin/cerebro
  • 修改/usr/share/cerebro/bin/cerebro中的JAVA_HOME
    具體如下,根據自己的JAVA_HOME填寫路徑
    {{< codes 修改前 修改后>}}
    {{}}
  if [[ -n "$bundled_jvm" ]];  then
    echo "$bundled_jvm/bin/java"
  elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]];  then
    echo "$JAVA_HOME/bin/java"
  else
    echo "java"
  fi

{{}}
{{}}

  if [[ -n "$bundled_jvm" ]];  then
    echo "$bundled_jvm/bin/java"
  elif [[ -n "/home/tools/jdk1.8.0_221" ]] && [[ -x "/home/tools/jdk1.8.0_221/bin/java" ]];  then
    echo "/home/tools/jdk1.8.0_221/bin/java"
  else
    echo "java"
  fi

{{}}
{{}}

啟動服務

systemctl  start cerebro.service
systemctl  enable cerebro.service
systemctl  status  cerebro.service

可以看到監聽的是9000端口
訪問試下

部署Kibana

下載安裝

https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-x86_64.rpm
rpm -ivh kibana-7.10.1-x86_64.rpm

修改備份配置文件

  • 備份配置文件
cd /etc/kibana/
mv kibana.yml kibana.yml.bak
  • 修改配置文件
vim kibana.yml
server.port: 5601
server.host: 0.0.0.0
elasticsearch.hosts: ["http://10.0.11.172:9200/","http://10.0.21.117:9200/","http://10.0.11.208:9200/"]
elasticsearch.username: "elastic"
elasticsearch.password: "123"
i18n.locale: "zh-CN"

啟動服務器

systemctl  start   kibana.service
systemctl  enable  kibana.service
systemctl  status  kibana.service

訪問WEB

訪問http://IP:5601

部署filebeat

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.10.1-x86_64.rpm
rpm -ivh filebeat-7.10.1-x86_64.rpm 
cd /etc/filebeat/
cp filebeat.yml filebeat.yml.bak

修改配置文件

修改filebeat配置文件,把日志推送到kafka

#=========================== Filebeat inputs =============================
max_procs: 1                     #限制filebeat的進程數量,其實就是內核數,避免過多搶占業務資源
queue.mem.events: 256            # 存儲於內存隊列的事件數,排隊發送 (默認4096)
queue.mem.flush.min_events: 128  # 小於 queue.mem.events ,增加此值可提高吞吐量 (默認值2048)
filebeat.inputs:                 # inputs為復數,表名type可以有多個
- type: log                      # 輸入類型
  enable: true                   # 啟用這個type配置
  paths:
    - /home/homeconnect/logs/AspectLog/aspect.log  # 監控tomcat  的業務日志
  json.keys_under_root: true     #默認Flase,還會將json解析的日志存儲至messages字段
  json.overwrite_keys: true      #覆蓋默認的key,使用自定義json格式的key
  max_bytes: 20480               # 單條日志的大小限制,建議限制(默認為10M,queue.mem.events * max_bytes 將是占有內存的一部)
  fields:                        # 額外的字段
     source: test-prod-tomcat-aspect-a    # 自定義source字段,用於es建議索引(字段名小寫,我記得大寫好像不行)

- type: log      # 輸入類型
  enable: true   # 啟用這個type配置
  paths:
    - /home/tools/apache-tomcat-8.5.23/logs/localhost_access_log.*.log  # 監控tomcat access日志
  json.keys_under_root: true   #默認Flase,還會將json解析的日志存儲至messages字段
  json.overwrite_keys: true    #覆蓋默認的key,使用自定義json格式的key
  max_bytes: 20480             # 單條日志的大小限制,建議限制(默認為10M,queue.mem.events * max_bytes 將是占有內存的一部分)
  fields:                      # 額外的字段
     source: test-prod-tomcat-access-a    # 自定義source字段,用於es建議索引

# 自定義es的索引需要把ilm設置為false
setup.ilm.enabled: false

#=============================== output ===============================
output.kafka:         # 輸出到kafka
    enabled: true     # 該output配置是否啟用
    hosts: ["10.0.11.172:9092","10.0.21.117:9092","10.0.11.208:9092"] # kafka節點列表
    topic: 'logstash-%{[fields.source]}'  # kafka會創建該topic,然后logstash(可以過濾修改)會傳給es作為索引名稱
    partition.hash:
      reachable_only: true # 是否只發往可達分區
    compression: gzip      # 壓縮
    max_message_bytes: 1000000  # Event最大字節數。默認1000000。應小於等於kafka broker message.max.bytes值
    required_acks: 1  # kafka ack等級
    worker: 1  # kafka output的最大並發數
    bulk_max_size: 2048    # 單次發往kafka的最大事件數
    logging.to_files: true   # 輸出所有日志到file,默認true, 達到日志文件大小限制時,日志文件會自動限制替換

#=============================== other ===============================
close_older: 30m         # 如果文件在某個時間段內沒有發生過更新,則關閉監控的文件handle。默認1h
force_close_files: false # 這個選項關閉一個文件,當文件名稱的變化。只在window建議為true
# 沒有新日志采集后多長時間關閉文件句柄,默認5分鍾,設置成1分鍾,加快文件句柄關閉
close_inactive: 1m
# 傳輸了3h后荏沒有傳輸完成的話就強行關閉文件句柄,這個配置項是解決以上案例問題的key point
close_timeout: 3h
# 這個配置項也應該配置上,默認值是0表示不清理,不清理的意思是采集過的文件描述在registry文件里永不清理,在運行一段時間后,registry會變大,可能會帶來問題
clean_inactive: 72h
# 設置了clean_inactive后就需要設置ignore_older,且要保證ignore_older < clean_inactive
ignore_older: 70h                                                                                         

啟動服務

systemctl  start  filebeat.service
systemctl  enable  filebeat.service
systemctl  status  filebeat.service

部署logstash

下載安裝

wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.1-x86_64.rpm
rpm -ivh filebeat-7.10.1-x86_64.rpm 
mv filebeat.yml filebeat.yml.bak

修改配置文件

修改logstash.yml

vim logstash.yml
http.host: "0.0.0.0"
# 指發送到Elasticsearch的批量請求的大小,值越大,處理則通常更高效,但增加了內存開銷
pipeline.batch.size: 3000
# 指調整Logstash管道的延遲,過了該時間則logstash開始執行過濾器和輸出
pipeline.batch.delay: 200

修改配置文件,從kafka獲取日志

[root@elk-kibana conf.d]# cat /etc/logstash/conf.d/get-kafka-logs.conf 
input {                                   # 輸入組件
  kafka {                                  # 從kafka消費數據
    bootstrap_servers => ["10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092"]
    codec => "json"                        # 數據格式
    #topics => ["3in1-topi"]                   # 使用kafka傳過來的topic
    topics_pattern => "logstash-.*"             # 使用正則匹配topic
    consumer_threads => 3                  # 消費線程數量
    decorate_events => true                # 可向事件添加Kafka元數據,比如主題、消息大小的選項,這將向logstash事件中添加一個名為kafka的字段
    auto_offset_reset => "latest"          # 自動重置偏移量到最新的偏移量
    #group_id => "logstash-node"            # 消費組ID,多個有相同group_id的logstash實例為一個消費組
    #client_id => "logstash1"               # 客戶端ID
    fetch_max_wait_ms => "1000"            # 指當沒有足夠的數據立即滿足fetch_min_bytes時,服務器在回答fetch請求之前將阻塞的最長時間
  }
}

filter{
   # 當非業務字段時,無traceId則移除
   #if ([message] =~ "traceId=null") {          # 過濾組件,這里只是展示,無實際意義,根據自己的業務需求進行過濾
   #   drop {}
   #}
mutate {
    convert => ["Request time", "float"]
    }
        if [ip] != "-" {
        geoip {
                       source => "ip"
                        target => "geoip"
                       # database => "/usr/share/GeoIP/GeoIPCity.dat"
                        add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
                        add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
                }
                   mutate {
                        convert => [ "[geoip][coordinates]", "float"]
                }
        }
 }


output {           # 輸出組件
  elasticsearch {   # Logstash輸出到es
    hosts => ["10.0.11.172:9200","10.0.21.117:9200","10.0.11.208:9200"]
    index => "logstash-%{[fields][source]}-%{+YYYY-MM-dd}"  # 直接在日志中匹配
    #index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}"  # 以日期建索引
    user => "elastic"
    password => "123"
  }
  #stdout {
  #    codec => rubydebug
 #}
}

測試接收日志

測試是否能接收到數據

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/get-kafka-logs.conf

下邊把logstash設置為使用systemd啟動
修改/etc/systemd/system/logstash.service文件

[Unit]
Description=root

[Service]
Type=simple
User=root
Group=root
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with '-' makes it try to load, but if the file doesn't
# exist, it continues onward.
EnvironmentFile=-/etc/default/logstash
EnvironmentFile=-/etc/sysconfig/logstash
ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"
Restart=alway
WorkingDirectort=/
Nice=19
LimitNOFILE=16384

[Install]
WantedBy=multi-user.target

在啟動程序/usr/share/logstash/bin/logstash.lib.sh中加入JAVA_HOME
在文件86行左右的if [ -z "$JAVACMD" ]; then代碼上方插入一行JAVACMD="/home/tools/jdk1.8.0_221/bin/java" 具體的路徑需要你根據自己的JAVA來修改。

[root@elk-kibana ~]# cat  -n /usr/share/logstash/bin/logstash.lib.sh |grep  JAVACMD
    85    # set the path to java into JAVACMD which will be picked up by JRuby to launch itself
    86    JAVACMD="/home/tools/jdk1.8.0_221/bin/java"
    87    if [ -z "$JAVACMD" ]; then

啟動服務

systemctl reload logstash.service
systemctl restart  logstash.service
systemctl enable  logstash.service

最后檢查

登錄kibana創建索引

選擇管理--索引模式--創建索引模式


輸入索引名稱--下一步

選擇@timestamp--創建索引模式

然后就可以看到日志了

文章原文


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM