logstash 小結(2) Logstash結合Kafka、ES數據對接


3.31.8 Logstash配置講解

  1. 定義數據源 寫一個配置文件,可命名為logstash.conf,輸入以下內容:

  2. input {
            file {
                    path => "/data/web/logstash/logFile/*/*"
                    start_position => "beginning" #從文件開始處讀寫
            }
    #       stdin {}  #可以從標准輸入讀數據
    }

    定義的數據源,支持從文件、stdin、kafka、twitter等來源,甚至可以自己寫一個input plugin。如果像上面那樣用通配符寫file,如果有新日志文件拷進來,它會自動去掃描。

    2、數據的格式 根據打日志的格式,用正則表達式進行匹配     

  3. filter {
    
      #定義數據的格式
      grok {
        match => { "message" => "%{DATA:timestamp}\|%{IP:serverIp}\|%{IP:clientIp}\|%{DATA:logSource}\|%{DATA:userId}\|%{DATA:reqUrl}\|%{DATA:reqUri}\|%{DATA:refer}\|%{DATA:device}\|%{DATA:textDuring}\|%{DATA:duringTime:int}\|\|"}
      }
    
    }

    由於打日志的格式是這樣的:

2019-05-07-16:03:04|10.4.29.158|120.131.74.116|WEB|11299073|http://quxue.renren.com/shareApp?isappinstalled=0&userId=11299073&from=groupmessage|/shareApp|null|Mozilla/5.0
(iPhone; CPU iPhone OS 8_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) Mobile/12D508 MicroMessenger/6.1.5 NetType/WIFI|duringTime|98||

以|符號隔開,第一個是訪問時間,timestamp,作為logstash的時間戳,接下來的依次為:服務端IP,客戶端的IP,機器類型(WEB/APP/ADMIN),用戶的ID(沒有用0表示),請求的完整網址,請求的控制器路徑,reference,設備的信息,duringTime,請求所花的時間。

如上面代碼,依次定義字段,用一個正則表達式進行匹配,DATA是logstash定義好的正則,其實就是(.*?),並且定義字段名。

我們將訪問時間作為logstash的時間戳,有了這個,我們就可以以時間為區分,查看分析某段時間的請求是怎樣的,如果沒有匹配到這個時間的話,logstash將以當前時間作為該條記錄的時間戳。需要再filter里面定義時間戳的格式,即打日志用的格式:

filter {

  #定義數據的格式
  grok {#同上... }

  #定義時間戳的格式
  date {
    match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]
    locale => "cn"
  }

}

在上面的字段里面需要跟logstash指出哪個是客戶端IP,logstash會自動去抓取該IP的相關位置信息:

filter {

  #定義數據的格式
  grok {#同上}

  #定義時間戳的格式
  date {#同上}

  #定義客戶端的IP是哪個字段(上面定義的數據格式)
  geoip {
    source => "clientIp"
  }
}

同樣地還有客戶端的UA,由於UA的格式比較多,logstash也會自動去分析,提取操作系統等相關信息

#定義客戶端設備是哪一個字段
  useragent {
    source => "device"
    target => "userDevice"
  }

哪些字段是整型的,也需要告訴logstash,為了后面分析時可進行排序,使用的數據里面只有一個時間

#需要進行轉換的字段,這里是將訪問的時間轉成int,再傳給Elasticsearch
  mutate {
    convert => ["duringTime", "integer"]
  }

3, 輸出配置

最后就是輸出的配置,將過濾扣的數據輸出到elasticsearch

output {
  #將輸出保存到elasticsearch,如果沒有匹配到時間就不保存,因為日志里的網址參數有些帶有換行
  if [timestamp] =~ /^\d{4}-\d{2}-\d{2}/ {
        elasticsearch { host => localhost }
  }

   #輸出到stdout
#  stdout { codec => rubydebug }

   #定義訪問數據的用戶名和密碼
#  user => webService
#  password => 1q2w3e4r
}

我們將上述配置,保存到logstash.conf,然后運行logstash

在logstash啟動完成之后,輸入上面的那條訪問記錄,logstash將輸出過濾后的數據:

 

 

3.31.9 Logstash結合Kafka、ES數據對接

 

 

案例5:使用logstash收集指定文件中的數據,將結果輸出到控制台上;且輸出到kafka消息隊列中。
    核心配置文件:logstash2kafka.properties 
        input {
         file {
            # 將path參數對應的值:可以是具體的文件,也可以是目錄(若是目錄,需要指定目錄下的文件,或者以通配符的形式指定)
            path => "/home/root/data/access_log" 
            #  每間隔5秒鍾從文件中采集一次數據
            discover_interval => 5
            #  默認是end,以追加的形式在文件中添加新數據,只會采集新增的數據;若源文件中數據條數沒有發生變化,即使數據內容發生了變更,也感知不到,不會觸發采集操作
            # 若指定beginning,每次都從頭開始采集數據。若源文件中數據發生了變化(內容或是條數),都會感知到,都會觸發采集操作
            start_position => "beginning"
         }
        }


        output {
            kafka {
              topic_id => "accesslogs"
              # 用於定制輸出的格式,如:對消息格式化,指定字符集等等
              codec => plain { 
                format => "%{message}"
                charset => "UTF-8"
              }
              bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092"
            }
            
            stdout{}
        }

    注意:
        0,前提:
          創建主題: 
                [root@JANSON03 ~]# kafka-topics.sh --create  --topic accesslogs --zookeeper JANSON01:2181 --partitions 3 --replication-factor 3
                Created topic "accesslogs".
                
                [root@JANSON02 soft]# kafka-topics.sh --list --zookeeper JANSON01:2181
                Hbase
                Spark
                __consumer_offsets
                accesslogs
                bbs
                gamelogs
                gamelogs-rt
                hadoop
                hive
                spark
                test
                test2
    
        ①在真實項目中,logstash三大組件通過配置文件進行組裝。不是直接通過-e參數書寫在其后。
        ./logstash -f  配置文件名
        以后台進程的方式啟動:
        nohup ./logstash -f  配置文件名 > /dev/null 2>&1 &
    
         ②需要將指定目錄下所有子目錄中的所有文件都采集到(后綴是.log)
          path => "/home/mike/data/*/*.log" 

 

案例6:真實項目中logstash進行數據對接的過程
步驟: 
    1,使用logstash對指定目錄下的日志信息進行采集,采集到之后,直接輸出到kafka消息隊列中。
         原因:若目錄下的文件是海量的,將數據采集后,直接發送給es的話,es因為承載不了壓力可能會宕機。
                    通用的解決方案是:
                    先將日志信息采集到kafka消息隊列中,然后,再使用logstash從kafka消息隊列中讀取出消息,發送給es
        
    2, 使用logstash從kafka消息隊列中采集數據,發送給es.
    

實施:
    步驟1:游戲日志目錄中所有子目錄下所有的file使用logstash采集到kafka消息隊列 (dir2kafka.properties)
        input {
          file {
            codec => plain { charset => "GB2312" } 
            path => "/home/mike/data/basedir/*/*.txt"
            discover_interval => 5
            start_position => "beginning"
          }
        }

        output {
            kafka {
              topic_id => "gamelogs"
              codec => plain {
                format => "%{message}"
                charset => "GB2312"
              }
              bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092"
            }
        }
           
    步驟2:使用logstash從kafka消息隊列中采集數據,輸出到es集群中 (kafka2es.properties)

    earliest 
    當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 
    latest 
    當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分區下的數據 
    none 
    topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset,則拋出異常
    __

    input {
      kafka {
        client_id => "logstash-1-1"
        type => "accesslogs"
        codec => "plain"
        auto_offset_reset => "earliest"
        group_id => "elas1"
        topics  => "accesslogs"  -- 舊版本的logstash需要使用參數:topic_id
        bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092" -- 舊版本的logstash需要使用參數:zk_connect=>"JANSON01:2181,xx"
      }

      kafka {
        client_id => "logstash-1-2"
        type => "gamelogs"
        auto_offset_reset => "earliest"
        codec => "plain"
        group_id => "elas2"
        topics => "gamelogs"
        bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092"
      }
    }

    filter {
      if [type] == "accesslogs" {
        json {
          source => "message"
          remove_field => [ "message" ]
          target => "access"
        }
      }

      if [type] == "gamelogs" {
        mutate {
          split => { "message" => "    " }
          add_field => {
            "event_type" => "%{message[3]}"
            "current_map" => "%{message[4]}"
            "current_X" => "%{message[5]}"
            "current_y" => "%{message[6]}"
            "user" => "%{message[7]}"
            "item" => "%{message[8]}"
            "item_id" => "%{message[9]}"
            "current_time" => "%{message[12]}"
         }
         remove_field => [ "message" ]
       }
      }
    }

    output {

      if [type] == "accesslogs" {
        elasticsearch {
          index => "accesslogs"
          codec => "json"
          hosts => ["JANSON01:9200", "JANSON02:9200", "JANSON03:9200"]
        } 
      }

      if [type] == "gamelogs" {
        elasticsearch {
          index => "gamelogs"
          codec => plain {
            charset => "UTF-16BE"
          }
          hosts => ["JANSON01:9200", "JANSON02:9200", "JANSON03:9200"]
        } 
      }
    }


以后台進程的方式啟動: 
注意點: 
    1, 以后台的方式啟動Logstash進程:
         [mike@JANSON01 logstash]$ nohup ./bin/logstash -f config/dir2kafka.properties > /dev/null 2>&1 &
         [mike@JANSON01 logstash]$ nohup ./bin/logstash -f  config/kafka2es.properties > /dev/null 2>&1 &
     
     2, LogStash 錯誤:Logstash could not be started because there is already another instance usin...
                             
            [mike@JANSON01 data]$ ls -alh
            total 4.0K
            drwxr-xr-x  5 mike mike  84 Mar 21 19:31 .
            drwxrwxr-x 13 mike mike 267 Mar 21 16:32 ..
            drwxrwxr-x  2 mike mike   6 Mar 21 16:32 dead_letter_queue
            -rw-rw-r--  1 mike mike   0 Mar 21 19:31 .lock
            drwxrwxr-x  3 mike mike  20 Mar 21 17:45 plugins
            drwxrwxr-x  2 mike mike   6 Mar 21 16:32 queue
            -rw-rw-r--  1 mike mike  36 Mar 21 16:32 uuid
            
            刪除隱藏的文件:rm  .lock
            
    3,  logstash消費Kafka消息,報錯:javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=logstash-0
           當input里面有多個kafka輸入源時,client_id => "logstash-1-1",必須添加且需要不同
           如: 
                  kafka {
                    client_id => "logstash-1-2"
                    type => "gamelogs"
                    auto_offset_reset => "earliest"
                    codec => "plain"
                    group_id => "elas2"
                    topics => "gamelogs"
                    bootstrap_servers => "JANSON01:9092,JANSON02:9092,JANSON03:9092"
                  }
                  
     4, 將游戲日志文件中的數據變化一下,就可以被logstash感知到,進而采集數據。

 

 

 

常用兩個配置:

 1. 將日志采集到kafka :

input {
  file {
    codec => plain {
      charset => "UTF-8"
    }
    path => "/root/logserver/gamelog.txt"   //tmp/log/*  路徑下所有
    discover_interval => 5
    start_position => "beginning"
  }
}

output {
    kafka {
      topic_id => "gamelogs"
      codec => plain {
        format => "%{message}"
        charset => "UTF-8"
      }
      bootstrap_servers => "node01:9092,node02:9092,node03:9092"
    }
}

 

 

2.將kafka  的日志保存es :

input {
  kafka {
    type => "accesslogs"
    codec => "plain"
    auto_offset_reset => "smallest"
    group_id => "elas1"
    topics => "accesslogs"
     bootstrap_servers => "node01:9092,node02:9092,node03:9092"
  }

  kafka {
        client_id => "logstash-1-2"
        type => "gamelogs"
        auto_offset_reset => "earliest"
        codec => "plain"
        group_id => "elas2"
        topics => "gamelogs"
        bootstrap_servers => ["192.168.18.129:9092"]
      }
} filter { if [type] == "accesslogs" { json { source => "message" remove_field => [ "message" ] target => "access" } } if [type] == "gamelogs" { mutate { split => { "message" => "|" } add_field => { "event_type" => "%{message[0]}" "current_time" => "%{message[1]}" "user_ip" => "%{message[2]}" "user" => "%{message[3]}" } remove_field => [ "message" ] } } } output { if [type] == "accesslogs" { elasticsearch { index => "accesslogs" codec => "json" hosts => ["node01:9200", "node02:9200", "node03:9200"] } } if [type] == "gamelogs" { elasticsearch { index => "gamelogs" codec => plain { charset => "UTF-16BE" } hosts => ["node01:9200", "node02:9200", "node03:9200"] } } }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM