logstash知識點


  1. Logstash是位於Data和Elasticsearch之間的一個中間件。Logstash是一個功能強大的工具,可與各種部署集成。 它提供了大量插件。 它從數據源實時地把數據進行采集,可幫助您解析,豐富,轉換和緩沖來自各種來源的數據,並最終把數據傳入到Elasticsearch之中。 如果您的數據需要Beats中沒有的其他處理,則需要將Logstash添加到部署中。Logstash部署於ingest node之中。
    0.1 默認情況下,Logstash在管道(pipeline)階段之間使用內存中有界隊列(輸入到過濾器和過濾器到輸出)來緩沖事件。 如果Logstash不安全地終止,則存儲在內存中的所有事件都將丟失。 為防止數據丟失,您可以使Logstash通過使用持久隊列將正在進行的事件持久化到磁盤上。可以通過在logstash.yml文件中設置queue.type:persisted屬性來啟用持久隊列,該文件位於LOGSTASH_HOME/config文件夾下。 logstash.yml是一個配置文件,其中包含與Logstash相關的設置。 默認情況下,文件存儲在LOGSTASH_HOME/data/queue中。 您可以通過在logstash.yml中設置path.queue屬性來覆蓋它。
  2. 在使用logstash之前,必須要先安裝JAVA
  3. 下載地址:https://artifacts.elastic.co/downloads/logstash/logstash-7.3.0.tar.gz (里面的版本號可以根據實際情況進行修改)
  4. 運行最基本的Logstash管道
cd logstash-7.3.0
bin/logstash -e 'input { stdin { } } output { stdout {} }'
  1. 創建logstash.conf文件來運行管道
# logstash.conf文件內容
input { 
    stdin{ }
}
 
output {
    stdout {
       codec => rubydebug
}

# 運行
./bin/logstash -f logstash.conf (path_to_logstash_conf_file)

提示:在運行Logstash時使用-r標志可讓您在更改和保存配置后自動重新加載配置。 在測試新配置時,這將很有用,因為您可以對其進行修改,這樣就不必在每次更改配置時都手動啟動Logstash。

  1. 獲得所有的plugins
bin/logstash-plugin list
  1. input讀取csv文件

input {
	file {
		path => "/Users/liuxg/data/cars.csv"
		start_position => "beginning"
		sincedb_path => "null"
	}

在input中,定義了一個文件,它的path指向csv文件的位置。start_position指向beginning。如果對於一個實時的數據源來說,它通常是ending,這樣表示它每次都是從最后拿到那個數據。sincedb_path通常指向一個文件。這個文件保存上次操作的位置。設置為/dev/null,表明不存儲這個數據
7. 在Logstash中,按照順序執行的處理方式被叫做一個pipeline。一個pipeline含有一個按照順序執行的邏輯數據流。pipeline從input里獲取數據,並傳送給一個隊列,並接着傳入到一些worker去處理

  1. 官方提供的lostash關於apache,nginx應用的日志處理樣本,網站: https://github.com/elastic/examples/tree/master/Common Data Formats
# apache_logstash.conf
input {  
  stdin { } 
}


filter {
  grok {
    match => {
      "message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
    }
  }

  date {
    match => [ "timestamp", "dd/MMM/YYYY:HH:mm:ss Z" ]
    locale => en
  }

  geoip {
    source => "clientip"
  }

  useragent {
    source => "agent"
    target => "useragent"
  }
}

output {
  stdout {
    codec => dots {}
  }

  elasticsearch {
    index => "apache_elastic_example"
    template => "./apache_template.json"
    template_name => "apache_elastic_example"
    template_overwrite => true
  }
}
# apache_template.json
{

  "template": "apache_elastic_example",
  "settings": {
     "index.refresh_interval": "5s"
  },
  "mappings": {
     "_default_": {
        "dynamic_templates": [
           {
              "message_field": {
                 "mapping": {
                    "norms": false,
                    "type": "text"
                 },
                 "match_mapping_type": "string",
                 "match": "message"
              }
           },
           {
              "string_fields": {
                 "mapping": {
                    "norms": false,
                    "type": "text",
                    "fields": {
                       "keyword": {
                          "ignore_above": 256,
                          "type": "keyword"
                       }
                    }
                 },
                 "match_mapping_type": "string",
                 "match": "*"
              }
           }
        ],
        "properties": {
           "geoip": {
              "dynamic": true,
              "properties": {
                 "location": {
                    "type": "geo_point"
                 },
                 "ip": {
                   "type": "ip"
                 },
                 "continent_code": {
                  "type": "keyword"
                 },
                 "country_name": {
                   "type": "keyword"
                 }
              },
              "type": "object"
           },
           "@version": {
              "type": "keyword"
           }
        }
     }
  }
}
# nginx_logstash.conf
input {
  stdin { }
}

filter {
  grok {
    match => {
      "message" => '%{IPORHOST:remote_ip} - %{DATA:user_name} \[%{HTTPDATE:time}\] "%{WORD:request_action} %{DATA:request} HTTP/%{NUMBER:http_version}" %{NUMBER:response} %{NUMBER:bytes} "%{DATA:referrer}" "%{DATA:agent}"'
    }
  }

  date {
    match => [ "time", "dd/MMM/YYYY:HH:mm:ss Z" ]
    locale => en
  }

  geoip {
    source => "remote_ip"
    target => "geoip"
  }

  useragent {
    source => "agent"
    target => "user_agent"
  }
}

output {
stdout {
 codec => dots {}
 }
  elasticsearch {
    index => "nginx_elastic_stack_example"
    document_type => "logs"
    template => "./nginx_template.json"
    template_name => "nginx_elastic_stack_example"
    template_overwrite => true
  }
}
# nginx_template.json
{

  "template": "nginx_elastic_stack_example",
  "settings": {
     "index.refresh_interval": "5s"
  },
  "mappings": {
     "_default_": {
        "dynamic_templates": [
           {
              "message_field": {
                 "mapping": {
                    "index": "analyzed",
                    "norms": false,
                    "type": "string"
                 },
                 "match_mapping_type": "string",
                 "match": "message"
              }
           },
           {
              "string_fields": {
                 "mapping": {
                    "norms": false,
                    "type": "text",
                    "fields": {
                       "raw": {
                          "type": "keyword"
                       }
                    }
                 },
                 "match_mapping_type": "string",
                 "match": "*"
              }
           }
        ],
        "properties": {
           "geoip": {
              "dynamic": true,
              "properties": {
                 "location": {
                    "type": "geo_point"
                 }
              },
              "type": "object"
           },
           "bytes": {
              "type": "float"
           },
           "request": {
              "type": "keyword"
           }
        },
        "_all": {
           "enabled": true
        }
     }
  }
}
# nginx_json_logstash.conf
input {
  stdin {
    codec => json
    }
}

filter {

  date {
    match => ["time", "dd/MMM/YYYY:HH:mm:ss Z" ]
    locale => en
  }

  geoip {
    source => "remote_ip"
    target => "geoip"
  }

  useragent {
    source => "agent"
    target => "user_agent"
  }

  grok {
    match => [ "request" , "%{WORD:request_action} %{DATA:request1} HTTP/%{NUMBER:http_version}" ]
  }
}

output {
  stdout  {
    codec => dots {}
  }

  elasticsearch {
    index => "nginx_json_elastic_stack_example"
    document_type => "logs"
    template => "./nginx_json_template.json"
    template_name => "nginx_json_elastic_stack_example"
    template_overwrite => true
  }

}
# nginx_json_template.json
{
  "index_patterns": "nginx_json_elastic",
  "settings": {
    "index.refresh_interval": "5s"
  },
  "mappings": {
    "doc": {
      "dynamic_templates": [
        {
          "message_field": {
            "mapping": {
              "norms": false,
              "type": "text"
            },
            "match_mapping_type": "string",
            "match": "message"
          }
        },
        {
          "string_fields": {
            "mapping": {
              "type": "text",
              "norms": false,
              "fields": {
                "keyword": {
                  "type": "keyword",
                  "ignore_above": 256
                }
              }
            },
            "match_mapping_type": "string",
            "match": "*"
          }
        }
      ],
      "properties": {
        "geoip": {
          "dynamic": true,
          "properties": {
            "location": {
              "type": "geo_point"
            }
          },
          "type": "object"
        },
        "request": {
          "type": "keyword"
        }
      }
    }
  }
}
  1. 處理多個input
# multi-input.conf
input {
  file {
    path => "/data/multi-input/apache.log"
  	start_position => "beginning"
    sincedb_path => "/dev/null"
    # ignore_older => 100000
    type => "apache"
  }
}
 
input {
  file {
    path => "/data/multi-input/apache-daily-access.log"
  	start_position => "beginning"
    sincedb_path => "/dev/null"
    type => "daily"
  }
}
 
filter {
  	grok {
    	match => {
      		"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
    	}
  	}
 
	if [type] == "apache" {
		mutate {
	  		add_tag => ["apache"]
	  	}
	}
 
	if [type] == "daily" {
		mutate {
			add_tag => ["daily"]
		}
	} 
}
 
 
output {
	stdout {
		codec => rubydebug
	}
 
	if "apache" in [tags] {
	  	elasticsearch {
	    	index => "apache_log"
	    	template => "/data/apache_template.json"
	    	template_name => "apache_elastic_example"
	    	template_overwrite => true
	  }	
	}
 
	if "daily" in [tags] {
	  	elasticsearch {
	    	index => "apache_daily"
	    	template => "/data/apache_template.json"
	    	template_name => "apache_elastic_example"
	    	template_overwrite => true
	  }	
	}	
}

# 運行
./bin/logstash -f multi-input.conf

使用了兩個input。它們分別對應不同的log文件。對於這兩個input,使用了不同的type來表示:apache和daily。盡管它們的格式是一樣的,它們共同使用同樣的一個grok filter,但是還是想分別對它們進行處理。為此,添加了一個tag。也可以添加一個field來進行區別。在output的部分,根據在filter部分設置的tag來對它們輸出到不同的index里。
daily的事件最早被處理及輸出,接着apache的數據才開始處理.

  1. 處理多個配置文件(conf)
    一個pipeline含有一個邏輯的數據流,它從input接收數據,並把它們傳入到隊列里,經過worker的處理,最后輸出到output。這個output可以是Elasticsearch或其它
  • 多個pipeline

兩個不同的conf配置文件

# apache.conf
input {
    file {
        path => "/data/multi-input/apache.log"
        start_position => "beginning"
        sincedb_path => "/dev/null"
        # ignore_older => 100000
        type => "apache"
    }
}
 
filter {
  	grok {
    	match => {
      		"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
    	}
  	}
}
 
 
output {
	stdout {
		codec => rubydebug
	}
 
  	elasticsearch {
    	index => "apache_log"
    	template => "/data/apache_template.json"
    	template_name => "apache_elastic_example"
    	template_overwrite => true
  }	
}

# daily.conf
input {
    file {
        path => "/data/multi-pipeline/apache-daily-access.log"
        start_position => "beginning"
        sincedb_path => "/dev/null"
        type => "daily"
    }
}
 
filter {
  	grok {
    	match => {
      		"message" => '%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}'
    	}
  	}
}
 
 
output {
	stdout {
		codec => rubydebug
	}
 
  	elasticsearch {
    	index => "apache_daily"
    	template => "/data/multi-pipeline/apache_template.json"
    	template_name => "apache_elastic_example"
    	template_overwrite => true
    }	
}

在logstash的安裝目錄下的config文件目錄下,修改pipelines.yml文件.

# pipelines.yml
- pipeline.id: daily
  pipeline.workers: 1
  pipeline.batch.size: 1
  path.config: "/data/multi-pipeline/daily.conf"
  
- pipeline.id: apache
  queue.type: persisted
  path.config: "/data/multi-pipeline/apache.conf"

啟動,注意:不使用-f參數指定配置文件

/bin/logstash

在終端中可以看到有兩個piple在同時運行。

  • 一個pipeline
    修改位於Logstash安裝目錄下的config子目錄里的pipleline.yml文件
# pipelines.yml
- pipeline.id: my_logs
  queue.type: persisted
  path.config: "/data/multi-pipeline/*.conf"

這里把所有位於/data/multi-pipeline/下的所有的conf文件都放於一個pipeline里。
啟動,注意:不使用-f參數指定配置文件

/bin/logstash

在終端中會看到兩個同樣的輸出,這是因為把兩個.conf文件放於一個pipleline里運行,那么有兩個stdout的輸出分別位於兩個.conf文件了。
apache_log里有20條數據,它包括兩個log文件里所有的事件,這是因為它們都是一個pipleline。同樣可以在apache_daily看到同樣的20條數據。

采用這種方式意味着會把兩個不同的配置文件獲取的日志輸出到同一個索引中。合並數據的話可以使用這種方式。

  1. 把MySQL數據導入到Elasticsearch中

官方文檔地址: https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#plugins-inputs-jdbc-parameters

  • MySQL安裝,准備一些測試數據
  • Logstash安裝
    根據mysql的版本信息下載相應的JDBC connector驅動,下載網站: https://dev.mysql.com/downloads/connector/j/
    下載完這個Connector后,把這個connector存入到Logstash安裝目錄下的logstash-core/lib/jars/子目錄中。
    最終地址是這樣的:logstash-7.3.0/logstash-core/lib/jars/mysql-connector-java-8.0.17.jar
  • Logstash 配置
# sales.conf
input {
	jdbc {
       jdbc_connection_string => "jdbc:mysql://localhost:3306/data"
       jdbc_user => "root"
       jdbc_password => "YourMyQLPassword"
       jdbc_validate_connection => true
       jdbc_driver_library => ""
       jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
       parameters => { "Product_id" => "Product1" }
       statement => "SELECT * FROM SalesJan2009 WHERE Product = :Product_id"
    }    
}
 

output {
	stdout {
	}
 
   	elasticsearch {
     	index => "sales"
     	hosts => "localhost:9200"
     	document_type => "_doc"
	} 
}

替換jdbc_user和jdbc_password為自己的MySQL賬號的用戶名及密碼。特別值得指出的是jdbc_driver_library按elastic的文檔是可以放入JDBC驅動的路徑及驅動名稱。實踐證明如果這個驅動不在JAVA的classpath里,是不能被正確地加載。
正因為這樣的原因,在上一步里把驅動mysql-connector-java-8.0.17.jar放入到Logstash的jar目錄里,所以這里就直接填入空字符串。

  • 運行Logstash加載數據
./bin/logstash --debug -f sales.conf 

注意:在MySQL中刪除數據的話則不會自動同步刪除es中的數據,需要另作處理


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM