关于logstash, 网上的博客资料还算不少, 但是之前在做项目的时候, 碰到个需求, 需要将kafka中读到的数据放到elasticsearch中, 而这个index的值是根据kafka中创建时间来定的。
比如说kafka中传递的创建时间是2021-10-10的时间戳, 而我这条数据需要放入es中index为xxx_2021.10.10的表中, 这其中就需要将时间戳对象转成自定义时间格式数据, 然后再将该数据赋值给index, 但是就是这么个简单的事, 我查了半天资料才发现事情并不简单, 网上基本都是时间格式类型转时间戳, 基本没见到有时间戳转时间格式的, 所以这次就把自己做的发出来给各位小伙伴们作为参考, 以免都跟我一样卡半天。
上代码
filter {
mutate{
add_field => {"index_time"=>"error_timestamp"}
}
ruby{
code=>"event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))"
}
}
output {
elasticsearch {
hosts => ["xxx:9092"]
index => "xxxx_%{index_time}"
document_type=>"log"
}
}
重要代码基本就这一行:event.set('index_time',Time.at((event.get('create_time').to_i)/1000).strftime('%Y.%m.%d'))
to_i的效果是将String转成int或long类型, 因为这边kafka传递的是字符串的, 所以进行转化
Time.at是秒级时间戳转时间格式的方法, /1000是除以1000, 这边时间戳是毫秒级的
最后的strftime是时间格式转换, 里面的是格式, 更详细的可以参考ruby文档
以上基本就是咋们处理这个小问题的方式, 希望对大家有用