Kafka+ELK完成日志采集處理


此文檔為了做一次記錄,按回憶粗略補寫。

環境信息

Centos      V7.6.1810

JDK     V1.8.0_171

Rsyslog    V8.24.0-34.el7

Kafka     V2.12-0.10.2.1

zookeeper  V3.4.10

ELK    V6.2.3

 

服務器分配

配置盡量高點,此次部署kafka+zookeeper和ES皆為集群模式。

 

服務器名 IP地址 配置 備注
node1 192.168.101.55 CPU:2C 內存:4G 磁盤:100G  
node2 192.168.101.56 CPU:2C 內存:4G 磁盤:100G  
node3 192.168.101.57 CPU:2C 內存:4G 磁盤:100G  

 

此文檔主要以部署為主,部署的時候遇到很多問題,忘做記錄了。

一、環境配置(三台機器同樣操作)

如果關閉防火牆那就算了。否則需要配置以下策略。

1、firewall
每台機器加一條策略
[root@node1 home]# firewall-cmd --permanent --add-rich-rule="rule family="ipv4" source address="192.168.101.1/24" accept"
# 此條作用就是打通101網段允許訪問

查看防火牆:
[root@node1 home]# firewall-cmd --list-all public (active) target: default icmp-block-inversion: no interfaces: eth0 sources: services: ssh dhcpv6-client ports: protocols: masquerade: no forward-ports: source-ports: icmp-blocks: rich rules: rule family="ipv4" source address="192.168.101.1/24" accept

注:為了部署不出問題,最好telnet測試一直是否生效可用。

2、關閉selinux

[root@node1 home]# vim /etc/sysconfig/selinux
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of three values:
# targeted - Targeted processes are protected,
# minimum - Modification of targeted policy. Only selected processes are protected.
# mls - Multi Level Security protection.
SELINUXTYPE=targeted

3、設置JAVA環境變量

[root@node1 home]# mkdir /home/jdk  #此處是個人習慣,我喜歡放到/home下

[root@node1 home]# tar xf jdk-8u171-linux-x64.tar.gz -C /home/jdk/

[root@node1 home]# vim /etc/profile
...在最下面加入這行

export JAVA_HOME=/home/jdk/jdk1.8.0_171
export JRE_HOME=/home/jdk/jdk1.8.0_171/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$PATH

[root@node1 home]# source /etc/profile

# 檢查環境變量是否生效
[root@node1 opt]# java -version
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 1.8.0_171-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)

4、加兩條系統優化參數(因為后面ES服務會用到,所以別說那么多加上吧。)

[root@node1 opt]# vim /etc/sysctl.conf
vm.max_map_count=262144
[root@node1 opt]# vim /etc/security/limits.conf

...最下面加
* soft nofile 65536
* hard nofile 65536
* soft nproc 65536
* hard nproc 65536

二、Kafka+Zookeeper集群部署

1、Zookeeper
[root@node1 opt]# mkdir /home/zookeeper
[root@node1 opt]# tar xf zookeeper-3.4.10.tar.gz -C /home/zookeeper/ && cd /home/zookeeper/zookeeper-3.4.10
[root@node1 zookeeper-3.4.10]# vim conf/zoo.cfg

  tickTime=2000
  initLimit=10
  syncLimit=10
  dataDir=/home/zookeeper/data
  dataLogDir=/home/zookeeper/log
  clientPort=2181
  server.1=192.168.101.55:2888:3888
  server.2=192.168.101.56:2888:3888
  server.3=192.168.101.57:2888:3888

 注:echo "1" > /home/zookeeper/data/myid 三台機器上必須都要創建myid文件。看着點,1~3節點ID是不一樣的(按上面配置server.*去每台機器做配置) 

   # 批量拷貝文件到各節點
   [root@node1 zookeeper-3.4.10]# for i in {55, 56, 57};do scp conf/zoo.cfg root@192.168.101.$i:/home/zookeeper/conf/ ;done

重要事說三遍:每台機器都要做myid

 啟動三台zookeeper服務

 # 報什么先不用管,啟動完在講 
 [root@node1 zookeeper-3.4.10]# bin/zkServer.sh start

 # 每台機器都執行一下,總會有一個leader(無報錯則啟動完成。有報錯先看日志。。。日志。。。日志)
 [root@node1 zookeeper-3.4.10]# bin/zkServer.sh status
 ZooKeeper JMX enabled by default
 Using config: /home/zookeeper/zookeeper-3.4.10/bin/../conf/zoo.cfg
 Mode: follower

 2、kafka
 [root@node1 opt]# mkdir /home/kafka
  # 先備份
 [root@node1 opt]# tar xf kafka_2.12-0.10.2.1.tgz -C /home/kafka/ && cd /home/kafka/kafka_2.12-0.10.2.1/

  #每個節點都要改(標紅的哈),別忘了。
  broker.id=1
  delete.topic.enable=true
  listeners=PLAINTEXT://192.168.101.55:9092
  num.network.threads=4
  num.io.threads=8
  socket.send.buffer.bytes=102400
  socket.receive.buffer.bytes=102400
  socket.request.max.bytes=104857600
  log.dirs=/home/kafka/kafka-logs
  num.partitions=3
  num.recovery.threads.per.data.dir=1
  log.retention.hours=168
  log.segment.bytes=1073741824
  log.retention.check.interval.ms=300000
  zookeeper.connect=192.168.101.55:2181,192.168.101.56:2181,192.168.101.57:2181
  zookeeper.connection.timeout.ms=6000 

 # 是時候啟動了
 [root@node1 kafka_2.12-0.10.2.1]# nohup bin/kafka-server-start.sh config/server.properties &

 # 創建一個topic測試
 bin/kafka-topics.sh --create --topic tg_system_log --zookeeper 192.168.101.55:2181,192.168.101.56:2181,192.168.101.57:2181 --partitions 3 --replication-factor 1

 # 創建一個生產者

 bin/kafka-console-producer.sh --broker-list 192.169.101.57:9092 --topic tg_system_log
 # 創建一個消費者

 bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.57:9092 --topic tg_system_log

 注:生產者里發消息,消費者如果有接收那這個架構也部署完成了。(有問題請先看日志。。。日志。。。日志)

三、配置Logstash(數據采集)

1、檢查安裝包(兩個包必須都要有)

[root@node1 kafka_2.12-0.10.2.1]# tar xf logstash-6.2.3.tar.gz -C /home && cd /home

# 創建此采集文件,本次案例采集的message和docker日志(注意標紅點)
[root@node1 logstash-6.2.3]# vim conf/system_up.conf

input {
  file {
    path => "/var/log/messages"
    start_position => "beginning"
    type => "system-log"
    discover_interval => 2
  }
  file {
    path => "/var/lib/docker/containers/*/*-json.log"
    start_position => "beginning"
    type => "docker-log"
    discover_interval => 2
  }
}
output {
  if [type] == "system-log" {
    kafka {
      bootstrap_servers => "192.168.101.55:9092"
      topic_id => "tg_system_log"
      compression_type => "snappy"
    }
  }
  else if [type] == "docker-log" {
    kafka {
      bootstrap_servers => "192.168.101.55:9092"
      topic_id => "tg_docker_log"
      compression_type => "snappy"
    }
  }
}

[root@node1 kafka_2.12-0.10.2.1]# systemctl start rsyslog.service

# 功能測試

[root@node1 kafka_2.12-0.10.2.1]# bin/kafka-console-consumer.sh --bootstrap-server 192.168.101.55:9092 --topic tg_system_log

# 為了快捷手動創建日志

向/var/log/message里插數據,看topic里是否有數據,如果有則配置成功。

 四、配置ES(node1~node3都要配置

注:創建一個普通用戶,把包放到該用戶下。
1、切記需要安裝x-pack

[cube@node1 es]$ elasticsearch-6.2.3/bin/elasticsearch-plugin install file:///home/cube/es/x-pack-6.2.3.zip (三台機器要安裝)


[cube@node1 es]$ vim elasticsearch-6.2.3/config/elasticsearch.yml

cluster.name: master-cluster
node.name: node1 (三台機器要改動)
node.master: true
node.data: true
path.data: /home/cube/es/elasticsearch-6.2.3/data
path.logs: /home/cube/es/elasticsearch-6.2.3/log
network.host: 192.168.101.55 (三台機器要改動)
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.101.55", "192.168.101.57", "192.168.101.57"]

# 選舉時需要的節點連接數
discovery.zen.minimum_master_nodes: 2
# 一個節點多久ping一次,默認1s
discovery.zen.fd.ping_interval: 1s
# 等待ping返回時間,默認30s
discovery.zen.fd.ping_timeout: 10s
# ping超時重試次數,默認3次
discovery.zen.fd.ping_retries: 3

 2、啟動檢測(三台都要啟動)

[cube@node1 elasticsearch-6.2.3]$ bin/elasticsearch -d

# 查看master-cluster.log日志,無報錯則啟動無問題

 3、設置密碼

[cube@node1 ~]$ elasticsearch-6.2.4/bin/x-pack/setup-passwords interactive

五、配置kibana

 1、安裝x-pack

[root@node1 kibana]# bin/kibana-plugin install file:///home/kibana/x-pack-6.2.3.zip

 2、修改配置kibana.yml

[root@node1 kibana]# vim config/kibana.yml

server.port: 5601
server.host: "192.168.101.55"
elasticsearch.url: "http://192.168.101.55:9200"
elasticsearch.username: "elastic"
elasticsearch.password: "elastic"

 3、啟動kibana

[root@node1 kibana]# bin/kibana

# 瀏覽器打開URL:http://192.168.101.55:5601

# 登錄后找Monitoring>>Nodes:3可以看到ES的節點數。

六、配置Logstash(數據整合中間件)

1、創建conf目錄,然后在里面創建kafka_to_es.conf文件
[root@node1 logstash-6.2.3]# vim conf/kafka_to_es.conf

input {
  kafka {
    bootstrap_servers => ["192.168.101.55:9082"]
    topics => ["tg_system_log"]
    codec => "json"
    type => "system_log"
    consumer_threads => 5
    decorate_events => true
  }
  kafka {
    bootstrap_servers => ["192.168.101.55:9082"]
    topics => ["tg_docker_log"]
    codec => "json"
    type => "docker_log"
    consumer_threads => 5
    decorate_events => true
  }
}
output {
  if [type] == "system_log"{
    elasticsearch {
    hosts => ["192.168.101.55:9200","192.168.101.56:9200","192.168.101.56:9200"]
    index => "systems-logs-%{+YYY.MM.dd}"
    user => elastic
    password => elastic
    }
  }
  else if [type] == "docker_log" {
    elasticsearch {
    hosts => ["192.168.101.55:9200","192.168.101.56:9200","192.168.101.56:9200"]
    index => "dockers-logs-%{+YYY.MM.dd}"
    user => elastic
    password => elastic
   }
  }
}

 這里直接啟動logstash即可

七、打開kibana頁面

點開Management>>index Patterns創建一個新的Index這里會出現中間件output的index配置名字。直接創建index即可。到此配置已完在。

 

 補充內容:

本來想着用fluentd把docker輸出日志傳到kafka,但是沒成功這里直接傳到ES,后續在研究吧。或許有其他大神完成也可以分享一下文檔我學習一下。

1、配置fluentd服務

[root@node1 ~]# rpm -qa | grep td-agent
td-agent-3.4.0-0.el7.x86_64

 2、需要先安裝fluent-plugin-elasticsearch(更新ruby2.5 看下面文獻

[root@node1 ~]# gem install fluent-plugin-elasticsearch

[root@node1 ~]# vim /etc/td-agent/td-agent.conf

<source>
@type debug_agent
@id input_debug_agent
bind 127.0.0.1
port 24230
</source>
<match docker.**>
type stdout
</match>
<match nginx-test.**>
type elasticsearch
host 192.168.101.55
port 9200
user elastic
password elastic
logstash_format true
logstash_prefix docker
logstash_dateformat %Y_%m
index_name docker_log
flush_interval 5s
type_name docker
include_tag_key true
</match>

3、啟動docker

docker run -d --log-driver fluentd --log-opt fluentd-address=localhost:24224 --log-opt tag="nginx-test" --log-opt fluentd-async-connect --name nginx-test -p 9080:80 nginx

其他按第七步操作。

 

更新ruby看:https://blog.csdn.net/qq_26440803/article/details/82717244

其他文獻:https://blog.csdn.net/qq_26440803/article/details/82717244


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM