ELK 架構介紹
集群服務版本
服務 | 版本 |
---|---|
java | 1.8.0_221 |
elasticsearch | 7.10.1 |
filebeat | 7.10.1 |
kibana | 7.10.1 |
logstash | 7.10.1 |
cerebro | 0.9.2-1 |
kafka | 2.12-2.3.0 |
zookeeper | 3.5.6 |
服務器環境說明
IP地址 | 主機名 | 配置 | 角色 |
---|---|---|---|
10.0.11.172 | elk-master | 4C16G | es-master、kafka+zookeeper1 |
10.0.21.117 | elk-node1 | 4C16G | es-node1、kafka+zookeeper2 |
10.0.11.208 | elk-node2 | 4C16G | es-node2、kafka+zookeeper3 |
10.0.10.242 | elk-kibana | 4C16G | logstash、kibana、cerebro |
系統參數優化
{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}
修改主機名
hostnamectl set-hostname elk-master
hostnamectl set-hostname elk-node1
hostnamectl set-hostname elk-node2
增加文件描述符
cat >>/etc/security/limits.conf<< EOF
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536
* hard memlock unlimited
* soft memlock unlimited
EOF
修改默認限制內存
cat >>/etc/systemd/system.conf<< EOF
DefaultLimitNOFILE=65536
DefaultLimitNPROC=32000
DefaultLimitMEMLOCK=infinity
EOF
優化內核,對es支持
cat >>/etc/sysctl.conf<< EOF
# 關閉交換內存
vm.swappiness =0
# 影響java線程數量,建議修改為262144或者更高
vm.max_map_count= 262144
# 優化內核listen連接
net.core.somaxconn=65535
# 最大打開文件描述符數,建議修改為655360或者更高
fs.file-max=655360
# 開啟ipv4轉發
net.ipv4.ip_forward= 1
EOF
修改Hostname配置文件
cat >>/etc/hosts<< EOF
elk-master 10.0.11.172
elk-node1 10.0.21.117
elk-node2 10.0.11.208
EOF
重啟使配置生效
reboot
部署Zookeeper
{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}
創建Zookeeper項目目錄
#存放快照日志
mkdir zkdata
#存放事物日志
mkdir zklogs
下載解壓zookeeper
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz
mv apache-zookeeper-3.5.6-bin zookeeper
修改配置文件
[root@elk-master zookeeper]# cat conf/zoo.cfg |grep -v ^#
# 服務器之間或客戶端與服務器之間維持心跳的時間間隔
# tickTime以毫秒為單位。
tickTime=2000
# 集群中的follower服務器(F)與leader服務器(L)之間的初始連接心跳數
initLimit=10
# 集群中的follower服務器與leader服務器之間請求和應答之間能容忍的最多心跳數
syncLimit=5
# 數據保存目錄
dataDir=../zkdata
# 日志保存目錄
dataLogDir=../zklogs
# 客戶端連接端口
clientPort=2181
# 客戶端最大連接數。# 根據自己實際情況設置,默認為60個
maxClientCnxns=60
# 三個接點配置,格式為: server.服務編號=服務地址、LF通信端口、選舉端口
server.1=10.0.11.172:2888:3888
server.2=10.0.21.117:2888:3888
server.3=10.0.11.208:2888:3888
寫入節點標記
{{< notice warning "注意" >}}
分別在三個節點/home/tools/zookeeper/zkdata/myid
寫入節點標記
{{< /notice >}}
{{< tabs master節點 node1節點 node2節點 >}}
{{< tab >}}
master的操作
echo "1" > /home/tools/zookeeper/zkdata/myid
{{< /tab >}}
{{< tab >}}
node1的操作
echo "2" > /home/tools/zookeeper/zkdata/myid
{{< /tab >}}
{{< tab >}}
node2的操作
echo "3" > /home/tools/zookeeper/zkdata/myid
{{< /tab >}}
{{< /tabs >}}
啟動zookeeper集群
[root@elk-master zookeeper]# cd /home/tools/zookeeper/bin/
[root@elk-master bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
檢查集群狀態
[root@elk-master bin]# sh /home/tools/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /home/tools/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
設置全局變量
cat >>/etc/profile<< EOF
export ZOOKEEPER_INSTALL=/home/tools/zookeeper/
export PATH=$PATH:$ZOOKEEPER_INSTALL/bin
export PATH
EOF
- 使配置生效
source /etc/profile
這樣就可以全局使用zkServer.sh
命令了
部署 Kafka
{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}
下載解壓kafka壓縮包
[root@elk-master tools]# mkdir kafka
[root@elk-master tools]# cd kafka/
[root@elk-master kafka]# wget https://www-eu.apache.org/dist/kafka/2.3.0/kafka_2.12-2.3.0.tgz
[root@elk-master kafka]# tar xf kafka_2.12-2.3.0.tgz
[root@elk-master kafka]# mv kafka_2.12-2.3.0 kafka
[root@elk-master kafka]# cd kafka/config/
配置kafka
[root@elk-master config]# cat /home/tools/kafka/kafka/config/server.properties
############################# Server Basics #############################
# broker的id,值為整數,且必須唯一,在一個集群中不能重復
broker.id=1
############################# Socket Server Se:ttings #############################
# kafka默認監聽的端口為9092 (默認與主機名進行連接)
listeners=PLAINTEXT://:9092
# 處理網絡請求的線程數量,默認為3個
num.network.threads=3
# 執行磁盤IO操作的線程數量,默認為8個
num.io.threads=8
# socket服務發送數據的緩沖區大小,默認100KB
socket.send.buffer.bytes=102400
# socket服務接受數據的緩沖區大小,默認100KB
socket.receive.buffer.bytes=102400
# socket服務所能接受的一個請求的最大大小,默認為100M
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka存儲消息數據的目錄
log.dirs=../kfkdata
# 每個topic默認的partition數量
num.partitions=3
# 在啟動時恢復數據和關閉時刷新數據時每個數據目錄的線程數量
num.recovery.threads.per.data.dir=1
############################# Log Flush Policy #############################
# 消息刷新到磁盤中的消息條數閾值
#log.flush.interval.messages=10000
# 消息刷新到磁盤中的最大時間間隔,1s
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# 日志保留小時數,超時會自動刪除,默認為7天
log.retention.hours=168
# 日志保留大小,超出大小會自動刪除,默認為1G
#log.retention.bytes=1073741824
# 日志分片策略,單個日志文件的大小最大為1G,超出后則創建一個新的日志文件
log.segment.bytes=1073741824
# 每隔多長時間檢測數據是否達到刪除條件,300s
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# Zookeeper連接信息,如果是zookeeper集群,則以逗號隔開
zookeeper.connect=10.0.11.172,10.0.21.117,10.0.11.208
# 連接zookeeper的超時時間,6s
zookeeper.connection.timeout.ms=6000
創建數據存儲的目錄
[root@elk-master config]# mkdir ../kfkdata
修改broker.id
{{< notice warning "注意" >}}
分別在三個節點依次修改/home/tools/kafka/kafka/config/server.properties
配置文件
{{< /notice >}}
{{< tabs master節點 node1節點 node2節點 >}}
{{< tab >}}
master的配置
broker.id=1
{{< /tab >}}
{{< tab >}}
node1的配置
broker.id=2
{{< /tab >}}
{{< tab >}}
node2的配置
broker.id=3
{{< /tab >}}
{{< /tabs >}}
啟動kafka集群
cd /home/tools/kafka/kafka/bin/
#啟動測試
./kafka-server-start.sh ../config/server.properties
#放入后台
./kafka-server-start.sh -daemon ../config/server.properties
測試
{{< notice warning "注意" >}}
任意節點均可執行
{{< /notice >}}
在創建topic在集群中的任意節點 發布消息訂閱消息驗證結果
{{< tabs 創建topic 消息發布 topic消息訂閱 >}}
{{< tab >}}
[root@elk-master bin]# ./kafka-topics.sh \
--create \
--zookeeper 10.0.11.172:2181,10.0.21.117:2181,10.0.11.208:2181 \
--partitions 3 \
--replication-factor 1 \
--topic logs
{{< /tab >}}
{{< tab >}}
[root@elk-master bin]# ./kafka-console-producer.sh \
--broker-list 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \
--topic logs
{{< /tab >}}
{{< tab >}}
[root@elk-master bin]# ./kafka-console-consumer.sh \
--bootstrap-server 10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092 \
--topic logs \
--from-beginning
{{< /tab >}}
{{< /tabs >}}
部署elasticsearch
{{< notice warning "注意" >}}
三個節點都需要執行
{{< /notice >}}
下載安裝elasticsearch
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.10.1-x86_64.rpm
[root@elk-master package]# rpm -ivh elasticsearch-7.10.1-x86_64.rpm
備份配置文件
cd /etc/elasticsearch
cp elasticsearch.yml elasticsearch.yml.bak
修改配置文件
cat >/etc/elasticsearch/elasticsearch.yml << EOF
#集群名
cluster.name: elk-cluster
#node名
node.name: elk-1
#數據存儲路徑
path.data: /home/elasticsearch/esdata
#數據快照路徑
path.repo: /home/backup/essnapshot
#日志存儲路徑
path.logs: /home/elasticsearch/eslogs
#es綁定的ip地址,根據自己機器ip進行修改
network.host: 0.0.0.0
#服務端口
http.port: 9200
#集群master需要和node名設置一致
discovery.seed_hosts: ["10.0.11.172", "10.0.21.117", "10.0.11.208"]
cluster.initial_master_nodes: ["10.0.11.172","10.0.21.117","10.0.11.208"]
#允許跨域請求
http.cors.enabled: true
#* 表示支持所有域名
http.cors.allow-origin: "*"
#添加請求header
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
#生產必須為true,內存鎖定檢查,目的是內存地址直接映射,減少一次copy時間
bootstrap.memory_lock: true
#系統過濾檢查,防止數據損壞,考慮集群安全,生產設置成false
bootstrap.system_call_filter: false
#xpack配置
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.transport.ssl.verification_mode: certificate
xpack.security.transport.ssl.keystore.path: /etc/elasticsearch/elastic-certificates.p12
xpack.security.transport.ssl.truststore.path: /etc/elasticsearch/elastic-certificates.p12
EOF
修改JVM
- 將
jvm.options
文件中22-23行的8g
設置為你的服務內存的一半
[root@elk-node1 elasticsearch]# cat -n jvm.options |grep 8g
22 -Xms8g
23 -Xmx8g
修改其他節點配置
{{< notice warning "注意" >}}
分別在三個節點修改/etc/elasticsearch/elasticsearch.yml
配置文件
{{< /notice >}}
{{< tabs master節點 node1節點 node2節點 >}}
{{< tab >}}
master的配置
node.name: "es-master"
{{< /tab >}}
{{< tab >}}
node1的配置
node.name: "es-node1"
{{< /tab >}}
{{< tab >}}
node2的配置
node.name: "es-node2"
{{< /tab >}}
{{< /tabs >}}
最終展示
分配權限
因為自定義數據、日志存儲目錄,所以要把權限給到目錄
mkdir -p /home/elasticsearch/{esdata,eslogs}
chown elasticsearch:elasticsearch /home/elasticsearch/*
mkdir -p /home/backup/essnapshot
chown elasticsearch:elasticsearch /home/backup/essnapshot
啟動服務
三個節點全部啟動並加入開機啟動
systemctl start elasticsearch
systemctl enable elasticsearch
使用xpack進行安全認證
xpack的安全功能
- TLS 功能。 可對通信進行加密
- 文件和原生 Realm。 可用於創建和管理用戶
- 基於角色的訪問控制。 可用於控制用戶對集群 API 和索引的訪問權限
- 通過針對 Kibana Spaces 的安全功能,還可允許在Kibana 中實現多租戶。
在我配置過程中,發現集群認證需要首先配置秘鑰才行,否則在給內置用戶創建秘鑰的時候將會報錯。
{{< notice warning "error" >}}
Cause: Cluster state has not been recovered yet, cannot write to the [null] index
{{< /notice >}}
Unexpected response code [503] from calling PUT http://10.0.11.172:9200/_security/user/apm_system/_password?pretty
Cause: Cluster state has not been recovered yet, cannot write to the [null] index
Possible next steps:
* Try running this tool again.
* Try running with the --verbose parameter for additional messages.
* Check the elasticsearch logs for additional error details.
* Use the change password API manually.
ERROR: Failed to set password for user [apm_system].
申請證書
{{< notice warning "注意" >}}
下面的操作,在其中一個節點操作即可
{{< /notice >}}
/usr/share/elasticsearch/bin/elasticsearch-certutil ca
/usr/share/elasticsearch/bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12
兩條命令均一路回車即可,不需要給秘鑰再添加密碼
證書創建完成之后,默認在es的數據目錄。
將證書拷貝到etc下,並給上權限。
[root@elk-master ~]# ls /usr/share/elasticsearch/elastic-*
/usr/share/elasticsearch/elastic-certificates.p12
/usr/share/elasticsearch/elastic-stack-ca.p12
cp /usr/share/elasticsearch/elastic-* /etc/elasticsearch/
chown elasticsearch.elasticsearch /etc/elasticsearch/elastic*
做完之后,將證書拷貝到其他節點
為內置賬號添加密碼
ES中內置了幾個管理其他集成組件的賬號apm_system, beats_system, elastic, kibana, logstash_system, remote_monitoring_user
使用之前,首先需要設置下密碼。
/usr/share/elasticsearch/bin/elasticsearch-setup-passwords interactive
Initiating the setup of passwords for reserved users elastic,apm_system,kibana,logstash_system,beats_system,remote_monitoring_user.
You will be prompted to enter passwords as the process progresses.
Please confirm that you would like to continue [y/N]y
Enter password for [elastic]:
Reenter password for [elastic]:
Enter password for [apm_system]:
Reenter password for [apm_system]:
Enter password for [kibana]:
Reenter password for [kibana]:
Enter password for [logstash_system]:
Reenter password for [logstash_system]:
Enter password for [beats_system]:
Reenter password for [beats_system]:
Enter password for [remote_monitoring_user]:
Reenter password for [remote_monitoring_user]:
Changed password for user [apm_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]
部署Curator
下載安裝
wget https://github.com/lmenezes/cerebro/releases/download/v0.9.2/cerebro-0.9.2-1.noarch.rpm
rpm -ivh cerebro-0.9.2-1.noarch.rpm
修改配置文件
修改/etc/cerebro/application.conf
配置文件
找到對應配置修改為以下內容
{{< codes 修改內容一 修改內容二>}}
{{}}
data.path: "/var/lib/cerebro/cerebro.db"
#data.path = "./cerebro.db"
{{
}}
{{}}
hosts = [
#{
# host = "http://localhost:9200"
# name = "Localhost cluster"
# headers-whitelist = [ "x-proxy-user", "x-proxy-roles", "X-Forwarded-For" ]
#}
# Example of host with authentication
{
host = "http://10.0.11.172:9200"
name = "elk-cluster"
auth = {
username = "elastic"
password = "123"
}
}
]
{{
}}
{{}}
報錯
{{< notice warning "error" >}}
cerebro[8073]: No java installations was detected.
{{< /notice >}}
啟動服務后報錯No java
,但是我的環境是有JAVA
的。也做了全局變量
感覺很奇怪...
解決方法
{{< notice success "解決方法" >}}
在啟動服務文件中加入JAVA_HOME
{{< /notice >}}
- 找到服務啟動文件
/usr/share/cerebro/bin/cerebro
- 修改
/usr/share/cerebro/bin/cerebro
中的JAVA_HOME
具體如下,根據自己的JAVA_HOME
填寫路徑
{{< codes 修改前 修改后>}}
{{}}
if [[ -n "$bundled_jvm" ]]; then
echo "$bundled_jvm/bin/java"
elif [[ -n "$JAVA_HOME" ]] && [[ -x "$JAVA_HOME/bin/java" ]]; then
echo "$JAVA_HOME/bin/java"
else
echo "java"
fi
{{
}}
{{}}
if [[ -n "$bundled_jvm" ]]; then
echo "$bundled_jvm/bin/java"
elif [[ -n "/home/tools/jdk1.8.0_221" ]] && [[ -x "/home/tools/jdk1.8.0_221/bin/java" ]]; then
echo "/home/tools/jdk1.8.0_221/bin/java"
else
echo "java"
fi
{{
}}
{{}}
啟動服務
systemctl start cerebro.service
systemctl enable cerebro.service
systemctl status cerebro.service
可以看到監聽的是9000
端口
訪問試下
部署Kibana
下載安裝
https://artifacts.elastic.co/downloads/kibana/kibana-7.10.1-x86_64.rpm
rpm -ivh kibana-7.10.1-x86_64.rpm
修改備份配置文件
- 備份配置文件
cd /etc/kibana/
mv kibana.yml kibana.yml.bak
- 修改配置文件
vim kibana.yml
server.port: 5601
server.host: 0.0.0.0
elasticsearch.hosts: ["http://10.0.11.172:9200/","http://10.0.21.117:9200/","http://10.0.11.208:9200/"]
elasticsearch.username: "elastic"
elasticsearch.password: "123"
i18n.locale: "zh-CN"
啟動服務器
systemctl start kibana.service
systemctl enable kibana.service
systemctl status kibana.service
訪問WEB
部署filebeat
wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.10.1-x86_64.rpm
rpm -ivh filebeat-7.10.1-x86_64.rpm
cd /etc/filebeat/
cp filebeat.yml filebeat.yml.bak
修改配置文件
修改filebeat配置文件,把日志推送到kafka
#=========================== Filebeat inputs =============================
max_procs: 1 #限制filebeat的進程數量,其實就是內核數,避免過多搶占業務資源
queue.mem.events: 256 # 存儲於內存隊列的事件數,排隊發送 (默認4096)
queue.mem.flush.min_events: 128 # 小於 queue.mem.events ,增加此值可提高吞吐量 (默認值2048)
filebeat.inputs: # inputs為復數,表名type可以有多個
- type: log # 輸入類型
enable: true # 啟用這個type配置
paths:
- /home/homeconnect/logs/AspectLog/aspect.log # 監控tomcat 的業務日志
json.keys_under_root: true #默認Flase,還會將json解析的日志存儲至messages字段
json.overwrite_keys: true #覆蓋默認的key,使用自定義json格式的key
max_bytes: 20480 # 單條日志的大小限制,建議限制(默認為10M,queue.mem.events * max_bytes 將是占有內存的一部)
fields: # 額外的字段
source: test-prod-tomcat-aspect-a # 自定義source字段,用於es建議索引(字段名小寫,我記得大寫好像不行)
- type: log # 輸入類型
enable: true # 啟用這個type配置
paths:
- /home/tools/apache-tomcat-8.5.23/logs/localhost_access_log.*.log # 監控tomcat access日志
json.keys_under_root: true #默認Flase,還會將json解析的日志存儲至messages字段
json.overwrite_keys: true #覆蓋默認的key,使用自定義json格式的key
max_bytes: 20480 # 單條日志的大小限制,建議限制(默認為10M,queue.mem.events * max_bytes 將是占有內存的一部分)
fields: # 額外的字段
source: test-prod-tomcat-access-a # 自定義source字段,用於es建議索引
# 自定義es的索引需要把ilm設置為false
setup.ilm.enabled: false
#=============================== output ===============================
output.kafka: # 輸出到kafka
enabled: true # 該output配置是否啟用
hosts: ["10.0.11.172:9092","10.0.21.117:9092","10.0.11.208:9092"] # kafka節點列表
topic: 'logstash-%{[fields.source]}' # kafka會創建該topic,然后logstash(可以過濾修改)會傳給es作為索引名稱
partition.hash:
reachable_only: true # 是否只發往可達分區
compression: gzip # 壓縮
max_message_bytes: 1000000 # Event最大字節數。默認1000000。應小於等於kafka broker message.max.bytes值
required_acks: 1 # kafka ack等級
worker: 1 # kafka output的最大並發數
bulk_max_size: 2048 # 單次發往kafka的最大事件數
logging.to_files: true # 輸出所有日志到file,默認true, 達到日志文件大小限制時,日志文件會自動限制替換
#=============================== other ===============================
close_older: 30m # 如果文件在某個時間段內沒有發生過更新,則關閉監控的文件handle。默認1h
force_close_files: false # 這個選項關閉一個文件,當文件名稱的變化。只在window建議為true
# 沒有新日志采集后多長時間關閉文件句柄,默認5分鍾,設置成1分鍾,加快文件句柄關閉
close_inactive: 1m
# 傳輸了3h后荏沒有傳輸完成的話就強行關閉文件句柄,這個配置項是解決以上案例問題的key point
close_timeout: 3h
# 這個配置項也應該配置上,默認值是0表示不清理,不清理的意思是采集過的文件描述在registry文件里永不清理,在運行一段時間后,registry會變大,可能會帶來問題
clean_inactive: 72h
# 設置了clean_inactive后就需要設置ignore_older,且要保證ignore_older < clean_inactive
ignore_older: 70h
啟動服務
systemctl start filebeat.service
systemctl enable filebeat.service
systemctl status filebeat.service
部署logstash
下載安裝
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.10.1-x86_64.rpm
rpm -ivh filebeat-7.10.1-x86_64.rpm
mv filebeat.yml filebeat.yml.bak
修改配置文件
修改logstash.yml
vim logstash.yml
http.host: "0.0.0.0"
# 指發送到Elasticsearch的批量請求的大小,值越大,處理則通常更高效,但增加了內存開銷
pipeline.batch.size: 3000
# 指調整Logstash管道的延遲,過了該時間則logstash開始執行過濾器和輸出
pipeline.batch.delay: 200
修改配置文件,從kafka獲取日志
[root@elk-kibana conf.d]# cat /etc/logstash/conf.d/get-kafka-logs.conf
input { # 輸入組件
kafka { # 從kafka消費數據
bootstrap_servers => ["10.0.11.172:9092,10.0.21.117:9092,10.0.11.208:9092"]
codec => "json" # 數據格式
#topics => ["3in1-topi"] # 使用kafka傳過來的topic
topics_pattern => "logstash-.*" # 使用正則匹配topic
consumer_threads => 3 # 消費線程數量
decorate_events => true # 可向事件添加Kafka元數據,比如主題、消息大小的選項,這將向logstash事件中添加一個名為kafka的字段
auto_offset_reset => "latest" # 自動重置偏移量到最新的偏移量
#group_id => "logstash-node" # 消費組ID,多個有相同group_id的logstash實例為一個消費組
#client_id => "logstash1" # 客戶端ID
fetch_max_wait_ms => "1000" # 指當沒有足夠的數據立即滿足fetch_min_bytes時,服務器在回答fetch請求之前將阻塞的最長時間
}
}
filter{
# 當非業務字段時,無traceId則移除
#if ([message] =~ "traceId=null") { # 過濾組件,這里只是展示,無實際意義,根據自己的業務需求進行過濾
# drop {}
#}
mutate {
convert => ["Request time", "float"]
}
if [ip] != "-" {
geoip {
source => "ip"
target => "geoip"
# database => "/usr/share/GeoIP/GeoIPCity.dat"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float"]
}
}
}
output { # 輸出組件
elasticsearch { # Logstash輸出到es
hosts => ["10.0.11.172:9200","10.0.21.117:9200","10.0.11.208:9200"]
index => "logstash-%{[fields][source]}-%{+YYYY-MM-dd}" # 直接在日志中匹配
#index => "%{[@metadata][topic]}-%{+YYYY-MM-dd}" # 以日期建索引
user => "elastic"
password => "123"
}
#stdout {
# codec => rubydebug
#}
}
測試接收日志
測試是否能接收到數據
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/get-kafka-logs.conf
下邊把logstash
設置為使用systemd
啟動
修改/etc/systemd/system/logstash.service
文件
[Unit]
Description=root
[Service]
Type=simple
User=root
Group=root
# Load env vars from /etc/default/ and /etc/sysconfig/ if they exist.
# Prefixing the path with '-' makes it try to load, but if the file doesn't
# exist, it continues onward.
EnvironmentFile=-/etc/default/logstash
EnvironmentFile=-/etc/sysconfig/logstash
ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"
Restart=alway
WorkingDirectort=/
Nice=19
LimitNOFILE=16384
[Install]
WantedBy=multi-user.target
在啟動程序/usr/share/logstash/bin/logstash.lib.sh
中加入JAVA_HOME
在文件86行左右的if [ -z "$JAVACMD" ]; then
代碼上方插入一行JAVACMD="/home/tools/jdk1.8.0_221/bin/java"
具體的路徑需要你根據自己的JAVA
來修改。
[root@elk-kibana ~]# cat -n /usr/share/logstash/bin/logstash.lib.sh |grep JAVACMD
85 # set the path to java into JAVACMD which will be picked up by JRuby to launch itself
86 JAVACMD="/home/tools/jdk1.8.0_221/bin/java"
87 if [ -z "$JAVACMD" ]; then
啟動服務
systemctl reload logstash.service
systemctl restart logstash.service
systemctl enable logstash.service
最后檢查
登錄kibana創建索引
選擇管理
--索引模式
--創建索引模式
輸入索引名稱
--下一步
選擇@timestamp
--創建索引模式
然后就可以看到日志了