-
定義數據源 寫一個配置文件,可命名為logstash.conf,輸入以下內容:
-
input { file { path => "/data/web/logstash/logFile/*/*" start_position => "beginning" #從文件開始處讀寫 } # stdin {} #可以從標准輸入讀數據 }
2、數據的格式
-
filter { #定義數據的格式 grok { match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"} } }
由於打日志的格式是這樣的:
2019-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0
(iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||
如上面代碼,依次定義字段,用一個正則表達式進行匹配,DATA是
filter { #定義數據的格式 grok {#同上... } #定義時間戳的格式 date { match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ] locale => "cn" } }
在上面的字段里面需要跟logstash指出哪個是客戶端IP,logstash會自動去抓取該IP的相關位置信息:
filter { #定義數據的格式 grok {#同上} #定義時間戳的格式 date {#同上} #定義客戶端的IP是哪個字段(上面定義的數據格式) geoip { source => "clientIp" } }
同樣地還有客戶端的UA,由於UA的格式比較多,logstash也會自動去分析,提取操作系統等相關信息
#定義客戶端設備是哪一個字段 useragent { source => "device" target => "userDevice" }
哪些字段是整型的,也需要告訴logstash,為了后面分析時可進行排序,使用的數據里面只有一個時間
#需要進行轉換的字段,這里是將訪問的時間轉成int,再傳給Elasticsearch mutate { convert => ["duringTime", "integer"] }
output { #將輸出保存到elasticsearch,如果沒有匹配到時間就不保存,因為日志里的網址參數有些帶有換行 if [timestamp] =~ /^\d{4}-\d{2}-\d{2}/ { elasticsearch { host => localhost } } #輸出到stdout # stdout { codec => rubydebug } #定義訪問數據的用戶名和密碼 # user => webService # password => 1q2w3e4r }
案例5:使用logstash收集指定文件中的數據,將結果輸出到控制台上;且輸出到kafka消息隊列中。 核心配置文件:logstash2kafka.properties input { file { # 將path參數對應的值:可以是具體的文件,也可以是目錄(若是目錄,需要指定目錄下的文件,或者以通配符的形式指定) path => "/home/root/data/access_log" # 每間隔5秒鍾從文件中采集一次數據 discover_interval => 5 # 默認是end,以追加的形式在文件中添加新數據,只會采集新增的數據;若源文件中數據條數沒有發生變化,即使數據內容發生了變更,也感知不到,不會觸發采集操作 # 若指定beginning,每次都從頭開始采集數據。若源文件中數據發生了變化(內容或是條數),都會感知到,都會觸發采集操作 start_position => "beginning" } } output { kafka { topic_id => "accesslogs" # 用於定制輸出的格式,如:對消息格式化,指定字符集等等 codec => plain { format => "%{message}" charset => "UTF-8" } bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092" } stdout{} } 注意: 0,前提: 創建主題: [root@JANSON03 ~]# kafka-topics.sh --create --topic accesslogs --zookeeper JANSON01:2181 --partitions 3 --replication-factor 3 Created topic "accesslogs". [root@JANSON02 soft]# kafka-topics.sh --list --zookeeper JANSON01:2181 Hbase Spark __consumer_offsets accesslogs bbs gamelogs gamelogs-rt hadoop hive spark test test2 ①在真實項目中,logstash三大組件通過配置文件進行組裝。不是直接通過-e參數書寫在其后。 ./logstash -f 配置文件名 以后台進程的方式啟動: nohup ./logstash -f 配置文件名 > /dev/null 2>&1 & ②需要將指定目錄下所有子目錄中的所有文件都采集到(后綴是.log) path => "/home/mike/data/*/*.log"
案例6:真實項目中logstash進行數據對接的過程 步驟: 1,使用logstash對指定目錄下的日志信息進行采集,采集到之后,直接輸出到kafka消息隊列中。 原因:若目錄下的文件是海量的,將數據采集后,直接發送給es的話,es因為承載不了壓力可能會宕機。 通用的解決方案是: 先將日志信息采集到kafka消息隊列中,然后,再使用logstash從kafka消息隊列中讀取出消息,發送給es 2, 使用logstash從kafka消息隊列中采集數據,發送給es. 實施: 步驟1:游戲日志目錄中所有子目錄下所有的file使用logstash采集到kafka消息隊列 (dir2kafka.properties) input { file { codec => plain { charset => "GB2312" } path => "/home/mike/data/basedir/*/*.txt" discover_interval => 5 start_position => "beginning" } } output { kafka { topic_id => "gamelogs" codec => plain { format => "%{message}" charset => "GB2312" } bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092" } } 步驟2:使用logstash從kafka消息隊列中采集數據,輸出到es集群中 (kafka2es.properties) earliest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 latest 當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 none topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常 __ input { kafka { client_id => "logstash-1-1" type => "accesslogs" codec => "plain" auto_offset_reset => "earliest" group_id => "elas1" topics => "accesslogs" -- 舊版本的logstash需要使用參數:topic_id bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092" -- 舊版本的logstash需要使用參數:zk_connect=>"JANSON01:2181,xx" } kafka { client_id => "logstash-1-2" type => "gamelogs" auto_offset_reset => "earliest" codec => "plain" group_id => "elas2" topics => "gamelogs" bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092" } } filter { if [type] == "accesslogs" { json { source => "message" remove_field => [ "message" ] target => "access" } } if [type] == "gamelogs" { mutate { split => { "message" => " " } add_field => { "event_type" => "%{message[3]}" "current_map" => "%{message[4]}" "current_X" => "%{message[5]}" "current_y" => "%{message[6]}" "user" => "%{message[7]}" "item" => "%{message[8]}" "item_id" => "%{message[9]}" "current_time" => "%{message[12]}" } remove_field => [ "message" ] } } } output { if [type] == "accesslogs" { elasticsearch { index => "accesslogs" codec => "json" hosts => ["JANSON01:9200", "JANSON02:9200", "JANSON03:9200"] } } if [type] == "gamelogs" { elasticsearch { index => "gamelogs" codec => plain { charset => "UTF-16BE" } hosts => ["JANSON01:9200", "JANSON02:9200", "JANSON03:9200"] } } } 以后台進程的方式啟動: 注意點: 1, 以后台的方式啟動Logstash進程: [mike@JANSON01 logstash]$ nohup ./bin/logstash -f config/dir2kafka.properties > /dev/null 2>&1 & [mike@JANSON01 logstash]$ nohup ./bin/logstash -f config/kafka2es.properties > /dev/null 2>&1 & 2, LogStash 錯誤:Logstash could not be started because there is already another instance usin... [mike@JANSON01 data]$ ls -alh total 4.0K drwxr-xr-x 5 mike mike 84 Mar 21 19:31 . drwxrwxr-x 13 mike mike 267 Mar 21 16:32 .. drwxrwxr-x 2 mike mike 6 Mar 21 16:32 dead_letter_queue -rw-rw-r-- 1 mike mike 0 Mar 21 19:31 .lock drwxrwxr-x 3 mike mike 20 Mar 21 17:45 plugins drwxrwxr-x 2 mike mike 6 Mar 21 16:32 queue -rw-rw-r-- 1 mike mike 36 Mar 21 16:32 uuid 刪除隱藏的文件:rm .lock 3, logstash消費Kafka消息,報錯:javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0 當input里面有多個kafka輸入源時,client_id => "logstash-1-1",必須添加且需要不同 如: kafka { client_id => "logstash-1-2" type => "gamelogs" auto_offset_reset => "earliest" codec => "plain" group_id => "elas2" topics => "gamelogs" bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092" } 4, 將游戲日志文件中的數據變化一下,就可以被logstash感知到,進而采集數據。
常用兩個配置:
1. 將日志采集到kafka :
input { file { codec => plain { charset => "UTF-8" } path => "/root/logserver/gamelog.txt" //tmp/log/* 路徑下所有 discover_interval => 5 start_position => "beginning" } } output { kafka { topic_id => "gamelogs" codec => plain { format => "%{message}" charset => "UTF-8" } bootstrap_servers => "node01:9092,node02:9092,node03:9092" } }
2.將kafka 的日志保存es :
input { kafka { type => "accesslogs" codec => "plain" auto_offset_reset => "smallest" group_id => "elas1" topics => "accesslogs" bootstrap_servers => "node01:9092,node02:9092,node03:9092" } kafka {
client_id => "logstash-1-2"
type => "gamelogs"
auto_offset_reset => "earliest"
codec => "plain"
group_id => "elas2"
topics => "gamelogs"
bootstrap_servers => ["192.168.18.129:9092"]
} } filter { if [type] == "accesslogs" { json { source => "message" remove_field => [ "message" ] target => "access" } } if [type] == "gamelogs" { mutate { split => { "message" => "|" } add_field => { "event_type" => "%{message[0]}" "current_time" => "%{message[1]}" "user_ip" => "%{message[2]}" "user" => "%{message[3]}" } remove_field => [ "message" ] } } } output { if [type] == "accesslogs" { elasticsearch { index => "accesslogs" codec => "json" hosts => ["node01:9200", "node02:9200", "node03:9200"] } } if [type] == "gamelogs" { elasticsearch { index => "gamelogs" codec => plain { charset => "UTF-16BE" } hosts => ["node01:9200", "node02:9200", "node03:9200"] } } }