logstash-時間戳轉自定義時間格式


關於logstash, 網上的博客資料還算不少, 但是之前在做項目的時候, 碰到個需求, 需要將kafka中讀到的數據放到elasticsearch中, 而這個index的值是根據kafka中創建時間來定的。

比如說kafka中傳遞的創建時間是2021-10-10的時間戳, 而我這條數據需要放入es中index為xxx_2021.10.10的表中, 這其中就需要將時間戳對象轉成自定義時間格式數據, 然后再將該數據賦值給index, 但是就是這么個簡單的事, 我查了半天資料才發現事情並不簡單, 網上基本都是時間格式類型轉時間戳, 基本沒見到有時間戳轉時間格式的, 所以這次就把自己做的發出來給各位小伙伴們作為參考, 以免都跟我一樣卡半天。

上代碼

filter {

    mutate{

        add_field => {"index_time"=>"error_timestamp"}

    }

    ruby{

        code=>"event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))"

    }

}

output {

    elasticsearch {

        hosts => ["xxx:9092"]

        index => "xxxx_%{index_time}"

        document_type=>"log"

    }

}

重要代碼基本就這一行:event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))

to_i的效果是將String轉成int或long類型, 因為這邊kafka傳遞的是字符串的, 所以進行轉化

Time.at是秒級時間戳轉時間格式的方法, /1000是除以1000, 這邊時間戳是毫秒級的

最后的strftime是時間格式轉換, 里面的是格式, 更詳細的可以參考ruby文檔

 

以上基本就是咋們處理這個小問題的方式, 希望對大家有用


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM