logstash日志采集工具的安裝部署


 1.從官網下載安裝包,並通過Xftp5上傳到機器集群上

下載logstash-6.2.3.tar.gz版本,並通過Xftp5上傳到hadoop機器集群的第一個節點node1上的/opt/uploads/目錄:

2、解壓logstash-6.2.3.tar.gz,並把解壓的安裝包移動到/opt/app/目錄上

tar zxvf  logstash-6.2.3.tar.gz

 mv logstash-6.2.3 /opt/app/  &&  cd /opt/app/

3、修改環境變量,編輯/etc/profile,並生效環境變量,輸入如下命令:

sudo vi /etc/profile

添加如下內容:

export LOGSTASH_HOME=/opt/app/logstash-6.2.3
export PATH=:$PATH:$LOGSTASH_HOME/bin

使環境變量生效:source /etc/profile

4、配置文件類型

4.1 log-kafka配置文件

輸入源為nginx的日志文件,輸出源為kafka

input {
    file {
        path => "/var/logs/nginx/*.log"
        discover_interval => 5
        start_position => "beginning"
    }
}

output {
    kafka {
       topic_id => "accesslog"
       codec => plain {
          format => "%{message}"
          charset => "UTF-8"
       }
       bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
    }
}

4.2 file-kafka配置文件

輸入源為txt文件,輸出源為kafka

input {
   file {
      codec => plain {
	    charset => "GB2312"
	  }
	  path => "D:/GameLog/BaseDir/*/*.txt"
	  discover_interval => 30
	  start_position => "beginning"
   }
}

output {
   kafka {
       topic_id => "gamelog"
	   codec => plain {
	      format => "%{message}"
		  charset => "GB2312"
	   }
	   bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
   }
}

4.3 log-elasticsearch配置文件

輸入源為nginx的日志文件,輸出源為elasticsearch

input {
     file {
         type => "flow"
         path => "var/logs/nginx/*.log"
         discover_interval => 5
         start_position => "beginning"
     }
}

output {
    if [type] == "flow" {
        elasticsearch {
             index => "flow-%{+YYYY.MM.dd}"
             hosts => ["hadoop1:9200", "hadoop2:9200", "hadoop3:9200"]
        }
    }
}

4.4 kafka-elasticsearch配置文件

輸入源為kafka的accesslog和gamelog主題,並在中間分別針對accesslog和gamelog進行過濾,輸出源為elasticsearch。當input里面有多個kafka輸入源時,client_id => "es*"必須添加且需要不同,否則會報錯javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0。

input {
  kafka {
     type => "accesslog"
     codec => "plain"
     auto_offset_reset => "earliest"
     client_id => "es1"
     group_id => "es1"
     topics => ["accesslog"]
     bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
  }

  kafka {
     type => "gamelog"
     codec => "plain"
     auto_offset_reset => "earliest"
     client_id => "es2"
     group_id => "es2"
     topics => ["gamelog"]
     bootstrap_servers => "hadoop1:9092,hadoop2:9092,hadoop3:9092"
  }
}

filter {
  if [type] == "accesslog" {
     json {
	source => "message"
	remove_field => ["message"]
	target => "access"
     }
  }
  
  if [type] == "gamelog" {
     mutate {
         split => { "message" => "	" }
         add_field => {
	      "event_type" => "%{message[3]}"
	      "current_map" => "%{message[4]}"
	      "current_x" => "%{message[5]}"
	      "current_y" => "%{message[6]}"
	      "user" => "%{message[7]}"
	      "item" => "%{message[8]}"
	      "item_id" => "%{message[9]}"
	      "current_time" => "%{message[12]}"
         }
         remove_field => ["message"]
     }
  }
}

output {
  if [type] == "accesslog" {
     elasticsearch {
	   index => "accesslog"
	   codec => "json"
	   hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"]
     }
  }
  
  if [type] == "gamelog" {
     elasticsearch {
	    index => "gamelog"
	    codec => plain {
	         charset => "UTF-16BE"
	    }
	    hosts => ["hadoop1:9200","hadoop2:9200","hadoop3:9200"]
     }
  }
}

:UTF-16BE為解決中文亂碼,而不是UTF-8

5、logstash啟動

logstash  -f  /opt/app/logstash-6.2.3/conf/flow-kafka.conf

 

6、logstash遇到的問題

1) 在使用logstash采集日志時,如果我們采用file為input類型,采用不能反復對一份文件進行測試!第一次會成功,之后就會失敗!

 

 參考資料:

https://blog.csdn.net/lvyuan1234/article/details/78653324


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM