前言:本文源於天天是霧霾新聞,我想利用kibana畫一下一線城市霧霾圖,希望對想利用經緯度在kibana繪圖和獲取日志本身時間繪圖的同學有所幫助。有什么疑問或者糾錯,可以給我發郵件
一、數據准備
為了方便起見,我模擬臆造了json格式的數據
{"timestamp":"2017-01-13T13:13:32.2516955+08:00","deviceId":"myFirstDevice","windSpeed":17,"haze":284,"city":"Beijing","lat":33.9402,"lon":116.40739}
模擬數據我用的是c#,大概如下:
static void SendingRandomMessages()
{
//var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
int len = 4;
string[] citys = { "Beijing", "Shangjhai", "Guangzhou", "Shenzhen" };
int[] avgWindSpeed = { 10, 16, 5, 7 };
int[] avgWindSpeed1 = { 10, 16, 5, 7 };
int[] avgHaze1 = { 200, 100, 50, 49 };
int[] avgHaze = { 200, 100, 50, 49 };
double[] latitude = { 39.3402, 31.23042, 23.13369, 22.54310 };
double[] longitude = { 116.40739, 121.47370, 113.28880, 114.057860 };
Random rand = new Random();
while (true)
{
try
{
for (int i = 0; i < len; i++)
{
avgWindSpeed[i] = avgWindSpeed1[i] + rand.Next(1, 11);
avgHaze[i] = avgHaze1[i] + rand.Next(10, 100);
var telemetryDataPoint = new
{
timestamp = DateTime.Now,
deviceId = "myFirstDevice",
windSpeed = avgWindSpeed[i],
haze = avgHaze[i],
city = citys[i],
lat = latitude[i],
lon = longitude[i]
};
var message = JsonConvert.SerializeObject(telemetryDataPoint);
//eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message)));
Console.WriteLine("{0} > Get message: {1}", "eventHubName", message);
}
}
catch (Exception exception)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message);
Console.ResetColor();
}
Thread.Sleep(200);
}
}
此處我是作為消息發到一個eventhub中,你正確的做法可以將json寫到文本文件中,再通過logstash讀取即可
我的目的有兩個:
- 獲取數據中的lat、lon經緯度數據在kibana Map中進行繪圖
- 獲取數據中的timestamp作為我在kibana中的搜索時間,默認情況下是@timestamp
二、解決問題的整體思路
- lat、lon本質上是float類型,此處需要設計一個mapping
- 日志內的時間,本質上應該是個字符串。我們得先卡出這個字段,然后用date match進行轉換
三、解決實例
1. mapping的設計,我給出一個template
{
"template": "geo-*",
"settings": {
"index.refresh_interval": "5s"
},
"mappings": {
"_default_": {
"_all": {"enabled": true, "omit_norms": true},
"dynamic_templates": [ {
"message_field": {
"match": "message",
"match_mapping_type": "string",
"mapping": {
"type": "string", "index": "analyzed", "omit_norms": true
}
}
}, {
"string_fields": {
"match": "*",
"match_mapping_type": "string",
"mapping": {
"type": "string", "index": "analyzed", "omit_norms": true,
"fields": {
"raw": {"type": "string", "index": "not_analyzed", "ignore_above": 256}
}
}
}
} ],
"properties": {
"@version": { "type": "string", "index": "not_analyzed" },
"lonlat": { "type": "geo_point" }
}
}
}
}
大概解釋如下:
-
"template": "geo-*",所有geo開頭的索引,都將會套用這個template配置
-
"lonlat": { "type": "geo_point" } 這個定義了lonlat為geo_point類型,為以后Map繪制奠定基礎,這個是關鍵。
NOTE: 這個lonlat名字不能取成特定的關鍵名字?,我取成location一直報錯。
更加詳細的介紹你可以查看官網文檔
2. 給出logstash的配置文件
input {
file {
path => "/opt/logstash/1.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
json {
source => "message"
}
mutate {
add_field => [ "[lonlat]", "%{lon}" ]
add_field => [ "[lonlat]", "%{lat}" ]
}
date{
match=>["timestamp","ISO8601"]
timezone => "Asia/Shanghai"
"target" => "logdate" }
}
output {
stdout { codec => rubydebug }
elasticsearch
{
hosts =>"wb-elk"
index => "geo-%{+YYYY.MM.dd}"
# template => "/opt/logstash/monster.json"
# template_overwrite => true
}
}
大概解釋一下:
-
json {source => "message"}這個能將json數據格式分解出一個個字段
-
mutate 這個是向geo_point中加入經緯度數據
-
date {match=>}這個是將匹配json數據分解出來的timestamp,並以時間格式賦值給logdate
-
output中注釋掉的是template文件。我采用的是直接put template的方式,因此注釋掉了。兩個方法都可行
更加細致的理解,需要你去查看文檔,努力學習
3.運行過程
上圖中的location 應該為lonlat。大致stdout應該如上
4. 運行結果