關於logstash, 網上的博客資料還算不少, 但是之前在做項目的時候, 碰到個需求, 需要將kafka中讀到的數據放到elasticsearch中, 而這個index的值是根據kafka中創建時間來定的。
比如說kafka中傳遞的創建時間是2021-10-10的時間戳, 而我這條數據需要放入es中index為xxx_2021.10.10的表中, 這其中就需要將時間戳對象轉成自定義時間格式數據, 然后再將該數據賦值給index, 但是就是這么個簡單的事, 我查了半天資料才發現事情並不簡單, 網上基本都是時間格式類型轉時間戳, 基本沒見到有時間戳轉時間格式的, 所以這次就把自己做的發出來給各位小伙伴們作為參考, 以免都跟我一樣卡半天。
上代碼
filter {
mutate{
add_field => {"index_time"=>"error_timestamp"}
}
ruby{
code=>"event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))"
}
}
output {
elasticsearch {
hosts => ["xxx:9092"]
index => "xxxx_%{index_time}"
document_type=>"log"
}
}
重要代碼基本就這一行:event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))
to_i的效果是將String轉成int或long類型, 因為這邊kafka傳遞的是字符串的, 所以進行轉化
Time.at是秒級時間戳轉時間格式的方法, /1000是除以1000, 這邊時間戳是毫秒級的
最后的strftime是時間格式轉換, 里面的是格式, 更詳細的可以參考ruby文檔
以上基本就是咋們處理這個小問題的方式, 希望對大家有用
