Logstash利用ruby將有用的日志放到一個ES_INDEX將無用的日志放到另一個ES_INDEX


input{
    kafka {
        bootstrap_servers => "127.0.0.1:9092"
        client_id => "nginxlog"
        auto_offset_reset => "latest"
        consumer_threads => 5
        decorate_events => true
        topics => ["nginx_log"]
        codec => "json"
        type => "nginx_log"
    }
}
filter{
   mutate {
        gsub => ["message", "\\x22", '"']
        gsub => ["message", "\\x09", '']
    }
    json {
        source => "message"
        remove_field=>["message","beat","@version","@timestamp"]
    }
    if [type] == "nginx_log" {
        ruby {
                code => '
                    -- 獲取白名單
                    file = File.open("/usr/local/logstash/config/white.txt", "r")
                    text = file.read
                    file.close
                    -- 判斷日志中request_uri屬性是否在白名單中 -- 也可直接將不在白名單的日志排除 event.cancel if !text.include?(event.get("request_uri"))
                    if !text.include?(event.get("request_uri")) then
                        -- 如果不存在就增加一個屬性es_flag=0表示該日志沒用
                        event.set("es_flag","0")
                    else
                        -- 如果不存在就增加一個屬性es_flag=1表示該日志有用
                        event.set("es_flag","1")
                    end
                '
            }
        }
    }
}
output {
    if [type] == "nginx_log" {
        -- 判斷es_flag=1放到nginx-log-yes索引中
        if [es_flag] =="1" {
            elasticsearch {
                    hosts => "127.0.0.1:9200"
                    index => "nginx-log-yes"
            }
        }
        -- 判斷es_flag=0放到nginx-log-no索引中
        else {
            elasticsearch {
                    hosts => "127.0.0.1:9200"
                    index => "nginx-log-no"
            }
        }
    }
}

 

Lostash event API說明

除了基本的get和set外,還提供了豐富的接口。我們能用到的方法包括:
刪除事件:cancel
取消刪除事件:uncancel
是否刪除:cancelled?
是否包含字段:include?
刪除字段:remove
事件轉字符串:to_s
事件轉hash字典(不含metadata字段):to_hash
事件轉hash字典(含metadata字段):to_hash_with_metadata
事件轉json字符串:to_json
增加tag:tag
取事件時間戳:timestamp

測試配置文件

input{
    stdin{
        codec=>json
    }
}filter{
    ruby{
        code=>'
            event.cancel
            event.set("cancelled",event.cancelled?)
            event.uncancel
            event.set("include",event.include?("hello"))
            event.remove("hello")
            event.set("to_s",event.to_s)
            event.set("to_hash",event.to_hash)
            event.set("to_hash_with_metadata",event.to_hash_with_metadata)
            event.set("to_json",event.to_json)
            event.tag("_test_tag")
            event.set("timestamp",event.timestamp)
        '
    }
}output{
    stdout{
        codec=>rubydebug
    }
}

啟動logstash,然后輸入如下,查看結果

{"hello":"world"}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM