參考網址:官方文檔、logstash示例
Logstash是一個開源數據收集引擎,具有實時管道功能。可以動態地將來自不同數據源的數據統一起來,並將數據標准化到你所選擇的目的地。Logstash 是一個接收、處理、轉發日志的工具,支持系統日志、webserver 日志、錯誤日志、應用日志,總之包括所有可以拋出來的日志類型。在一個典型的使用場景下(ELK):用 Elasticsearch 作為后台數據的存儲,kibana用來前端的報表展示。Logstash 在其過程中擔任搬運工的角色,它為數據存儲、報表查詢和日志解析創建了一個功能強大的管道鏈。Logstash 提供了多種多樣的 input,filters,codecs 和 output 組件,讓使用者輕松實現強大的功能。Logstash 收集數據對象就是日志文件,使用 Logstash 對日志文件進行收集和統一過濾,變成可讀性高的內容,方便開發者或運維人員觀察,從而有效的分析系統或項目運行的性能,做好監控和預警的准備工作等。
1.工作原理
Logstash 通過管道進行運作,管道有兩個必需的元素,輸入和輸出,還有一個可選的元素,過濾器。輸入插件從數據源獲取數據,過濾器插件根據用戶指定的數據格式修改數據,輸出插件則將數據寫入到目的地。

先了解一個概念:事件。Logstash 每讀取一次數據的行為叫做事件。
Logstash管道中的每個輸入階段都在自己的線程中運行。輸入將事件寫入位於內存(默認)或磁盤上的中心隊列。每個管道工作線程從隊列中取出一批事件,通過配置的過濾器運行這批事件,然后輸出經過過濾的事件。批處理的大小和管道工作線程的數量是可配置的。
默認情況下,Logstash使用內存有限隊列之間的管道(輸入→過濾器和過濾器→輸出)緩沖事件。如果Logstash不安全終止,那么存儲在內存中的任何事件都將丟失。為了防止數據丟失,您可以啟用Logstash將正在運行的事件持久化到磁盤。
Logstash 工作的三個階段:input => filter => output
input :數據輸入端,可以接收來自任何地方的源數據。
- file:從文件中讀取
- syslog:監聽在514端口的系統日志信息,並解析成RFC3164格式。
- kafka:從kakfka topic 中獲取數據
- beat:接收來自Filebeat的事件
Filter :數據中轉層,主要進行格式處理,數據類型轉換、數據過濾、字段添加,修改等,常用的過濾器如下。
-
grok: 通過正則解析和結構化任何文本。Grok 目前是logstash最好的方式對非結構化日志數據解析成結構化和可查詢化。logstash內置了120個匹配模式,https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns,滿足大部分需求。
-
mutate: 在事件字段執行一般的轉換。可以重命名、刪除、替換和修改事件字段。
-
drop: 完全丟棄事件,如debug事件。
-
clone: 復制事件,可能添加或者刪除字段。
-
geoip: 添加有關IP地址地理位置信息。
output :是logstash工作的最后一個階段,負責將數據輸出到指定位置,兼容大多數應用,常用的有:
-
elasticsearch: 發送事件數據到 Elasticsearch,便於查詢,分析,繪圖。
-
file: 將事件數據寫入到磁盤文件上。
-
kafka: 將事件數據發送至高性能kafka分布式系統,便於臨時存儲,查詢,分析,大數據消費場景。
-
redis:將數據發送至redis-server,常用於中間層暫時緩存。
-
influxdb: 發送指標數據到influxdb。
-
statsd: 發送事件數據到 statsd。
2.Logstash配置
2.1主配置
logstash.yml:
node.name:節點的描述性名稱。默認值為機器的主機名
path.data:Logstash及其插件用於滿足任何持久需求的目錄,默認值LOGSTASH_HOME/data。
pipeline.id: 管道的ID,默認值為main,可以更改;
pipeline.workers:
並行執行管道過濾器和輸出階段的工作器數量。如果發現事件正在備份,或者CPU未飽和,請考慮增加此數量以更好地利用機器處理能力,默認值為主機的CPU核心數
pipeline.batch.size:
在嘗試執行其過濾器和輸出之前,單個工作線程將從輸入收集的最大事件數。較大的批量通常更有效,但代價是增加了內存開銷,需要在jvm.options配置文件中增加JVM堆空間,默認值為125。
pipeline.batch.delay:
創建管道事件批處理時,在將小型批處理分派給管道工作者之前等待每個事件的時間以毫秒為單位,默認值為50
pipeline.unsafe_shutdown:
設置true為時,強制Logstash在關閉期間退出,即使內存中仍有事件。默認情況下,Logstash將拒絕退出,直到所有已接收的事件都被推送到輸出。啟用此選項可能會導致關閉期間數據丟失,默認值為false
path.config:
主管道的Logstash配置的路徑。如果指定目錄或通配符,則會按字母順序從目錄中讀取配置文件。
config.test_and_exit:
設置為時true,檢查配置是否有效,然后退出。使用此設置不會檢查grok模式的正確性。Logstash可以從目錄中讀取多個配置文件,默認值為false
config.reload.automatic:
設置true為時,定期檢查配置是否已更改,並在配置發生更改時重新加載配置。這也可以通過SIGHUP信號手動觸發,默認值為false
config.reload.interval:幾秒鍾內,Logstash會檢查配置文件中的更改,默認值為3s
config.debug:
設置為時true,將完全編譯的配置顯示為調試日志消息。你還必須設置log.level:
debug。警告:日志消息將包含作為純文本傳遞給插件配置的任何密碼選項,並可能導致明文密碼出現在您的日志中,默認值為false
config.support_escapes
設置true為時,帶引號的字符串將處理以下轉義序列:\n成為文字換行符(ASCII 10)。\r成為文字回車(ASCII13)。\t成為文字標簽(ASCII 9)。\\成為一個字面反斜杠\。\"成為字面雙引號。\'成為字面引號,默認值為false
modules
配置時,modules必須在此表中描述的嵌套YAML結構中。
queue.type:
用於事件緩沖的內部排隊模型。指定memory基於內存的傳統隊列,或persisted基於磁盤的ACKed隊列(持久隊列),默認值為memory,推薦使用持久隊列
path.queue:
啟用持久隊列時將存儲數據文件的目錄路徑(queue.type: persisted)。
queue.page_capacity:
啟用持久隊列時使用的頁面數據文件的大小(queue.type: persisted)。隊列數據由分成頁面的僅附加數據文件組成,默認值為64MB
queue.max_events:
啟用持久隊列時隊列中未讀事件的最大數量(queue.type: persisted),默認值為0(無限制)
queue.max_bytes:
隊列的總容量,以字節數表示。確保磁盤驅動器的容量大於此處指定的值。如果同時指定了兩者queue.max_events,queue.max_bytes則Logstash將使用先達到的標准,默認值為1024mb
queue.checkpoint.acks:
啟用持久隊列時強制檢查點之前的最大ACK事件數(queue.type: persisted)。指定queue.checkpoint.acks: 0將此值設置為無限制,默認值為1024
queue.checkpoint.writes:
啟用持久隊列時強制檢查點之前寫入事件的最大數量(queue.type: persisted)。指定queue.checkpoint.writes:
0將此值設置為無限制,默認值為1024
queue.checkpoint.retry:
啟用后,對於任何失敗的檢查點寫入,Logstash將針對每次嘗試檢查點寫入重試一次。不會重試任何后續錯誤。這是僅在具有非標准行為(如SAN)的文件系統上看到的失敗檢查點寫入的解決方法,除特殊情況外,不建議這樣做,默認值為false
dead_letter_queue.enable:
用於指示Logstash啟用插件支持的DLQ功能的標志,默認值為false
dead_letter_queue.max_bytes:
每個死信隊列的最大大小。如果條目超過此設置會增加死信隊列的大小,則會刪除條目。默認值為1024mb
path.dead_letter_queue:
將為死信隊列存儲數據文件的目錄路徑。
http.host:綁定地址,默認值為"127.0.0.1"
http.port:綁定端口,默認值為9600
log.level:日志級別。有效選項包括:fatal,error,warn,info,debug,trace,info
log.format:日志格式。設置為json或plain,默認值為plain
path.logs:Logstash將其日志寫入的目錄,默認值為LOGSTASH_HOME/logs
path.plugins:
哪里可以找到自定義插件。您可以多次指定此設置以包含多個路徑。插件預計將在一個特定的目錄層次結構: PATH/logstash/TYPE/NAME.rb其中TYPE是inputs,filters,outputs,或codecs,並且NAME是插件的名稱。
2.2Input配置
Beat:從beat采集
input {
beats {
port => 5044
}
}
File:從文件采集
input {
file {
path => "/var/log/*/*.log"
}
}
Elasticsearch:從es集群采集數據
input {
elasticsearch {
hosts => "localhost"
query => '{ "query": {"match": { "statuscode": 200 } }, "sort": ["_doc" ] }'
}
}
Kafka:從kafka獲取數據
input {
kafka {
bootstrap_servers =>"10.22.67.5:9992 "
topics => ["test"]
codec => "json"
security_protocol =>"SASL_PLAINTEXT"
sasl_mechanism => "PLAIN"
jaas_path =>"/etc/logstash/kafka-client-jaas.conf"
consumer_threads => 2
group_id => "logstash-bigdata"
auto_offset_reset => "latest"
max_poll_records => "500"
max_poll_interval_ms =>"30000"
session_timeout_ms => "30000"
request_timeout_ms => "60000"
auto_commit_interval_ms =>"5000"
check_crcs => "false"
heartbeat_interval_ms =>"9000"
partition_assignment_strategy =>"org.apache.kafka.clients.consumer.RoundRobinAssignor"
}
}
Jdbc:從數據庫采集數據
input {
jdbc {
jdbc_driver_library =>"mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class =>"com.mysql.jdbc.Driver"
jdbc_connection_string =>"jdbc:mysql://localhost:3306/mydb"
jdbc_user => "mysql"
parameters => {"favorite_artist" => "Beethoven" }
schedule => "* * * * *"
statement => "SELECT * from songswhere artist = :favorite_artist"
}
}
2.3Output配置
Elasticsearch:輸出至ES集群
output {
if [log_type] == "tomcat" and([department] == "zxbi" or [department] == "jfbi") {
elasticsearch {
hosts =>["https://rjjd-node01:9200"]
index =>"jfbi.prd-%{+YYYY.MM}"
user => "beatuser"
password =>"1iQ7w3LQlOhHR6Rg1iQ7w3L"
http_compression => "true"
sniffing => "false"
ssl => true
ssl_certificate_verification =>"true"
cacert => "/etc/ssl/xxx_ca.crt"
id => "zxbi"
}
}
}
Influxdb:輸出至influxdb數據庫
output {
influxdb {
host => "10.1.2.2:8086"
data_points => “{ }”
}
}
File:輸出至文件
output {
file {
path => “/tmp/stdout.log”
codec => line { format => "customformat: %{message}"}
}
}
Opentsdb:輸出至opentsdb
output {
opentsdb {
host => "localhost"
metrics => [“%{host}”, “%{hostname}”]
port => “4242”
}
}
Stdout:輸出至控制台
output {
stdout { codec => json }
}
3.Logstash解析
最常用的解析插件:grok、date、drop、geoip、json、kv、ruby、split、mutate
3.1Grok
grok匹配模式語法為:%{SYNTAX:SEMANTIC:TYPE}
- SYNTAX: 正則表達式、預定義的正則表達式名稱
- SEMANTIC: 標識符,標識匹配后的數據
- TYPE: 可選的類型,目前支持int、float
例如:NUMBER可以匹配3.44,IP可以匹配:192.168.21.2
一個簡單的日志格式如下:
192.168.21.2 GET /index.html 15823 0.023
grok匹配模式可以為:
${IP:client} %{WORD:method}%{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
假設該條日志可能來自一個文件:
input {
file {
path => "/var/log/http.log"
}
}
filter {
grok {
match => { "message" =>"%{IP:client} %{WORD:method} %{URIPATHPARAM:request}%{NUMBER:bytes} %{NUMBER:duration}" }
}
}
在grok過濾后,可以得到額外一下字段:
client: 192.168.21.2
method: GET
request: /index.html
bytes: 15823
duration: 0.023
grok表達式支持的可選參數:
- break_on_match:默認為true代表匹配成功后則不匹配后面的表達式,false相反
- keep_empty_captures:默認為false,字段沒有值時則不輸出字段,true則相反
- match:表達式模式
- overwrite:字段重新命名
- patterns_dir:表達式文件夾
- patterns_files_glob:表達式文件夾正則匹配
- tag_on_failure:匹配失敗后添加tag
- tag_on_timeout:匹配超時添加tag
- add_field:自定義添加字段
- add_tag: 自定義添加tag
- enable_metric:啟用監控
- id:表達式id
- remove_field:移除字段
- remove_tag:移除tag
3.2Date
日期過濾器用於解析字段中的日期,然后使用該日期或時間戳作為事件的logstash時間戳。
filter {
date {
match => [ "logdate", "MMM dd yyyyHH:mm:ss" ]
target => “localtime”
}
}
支持的參數:
Locale:日期語言環境
Match:表達式
Timezone:時間時區
Tag_on_failure:匹配失敗后添加tag
Target:匹配成功后目標字段
時間匹配參數:
字母 | 含義 |
---|---|
y | year |
yyyy full year number. Example: 2015. yy two-digit year. Example: 15 for the year 2015. |
|
M | month of the year |
M minimal-digit month. Example: 1 for January and 12 for December. MM two-digit month. zero-padded if needed. Example: 01 for January and 12 for December MMM abbreviated month text. Example: Jan for January. Note: The language used depends on your local. See the locale setting for how to change the language. MMMM full month text, Example: January. Note: The language used depends on your locale. |
|
d | day of the month |
d minimal-digit day. Example: 1 for the 1st of the month. dd two-digit day, zero-padded if needed. Example: 01 for the 1st of the month. |
|
H | hour of the day (24-hour clock) |
H minimal-digit hour. Example: 0 for midnight. HH two-digit hour, zero-padded if needed. Example: 00 for midnight. |
|
m | minutes of the hour (60 minutes per hour) |
m minimal-digit minutes. Example: 0. mm two-digit minutes, zero-padded if needed. Example: 00. |
|
s | seconds of the minute (60 seconds per minute) |
s minimal-digit seconds. Example: 0. ss two-digit seconds, zero-padded if needed. Example: 00. |
|
S | fraction of a second Maximum precision is milliseconds (SSS). Beyond that, zeroes are appended. |
S tenths of a second. Example: 0 for a subsecond value 012 SS hundredths of a second. Example: 01 for a subsecond value 01 SSS thousandths of a second. Example: 012 for a subsecond value 012 |
|
Z | time zone offset or identity |
Z Timezone offset structured as HHmm (hour and minutes offset from Zulu/UTC). Example: -0700. ZZ Timezone offset structured as HH:mm (colon in between hour and minute offsets). Example: -07:00. ZZZ Timezone identity. Example: America/Los_Angeles. Note:Valid IDs are listed on theJoda.orgavailable time zones page. |
|
z | time zone names.Time zone names (z) cannot be parsed. |
w | week of the year |
w minimal-digit week. Example: 1. ww two-digit week, zero-padded if needed. Example: 01. |
|
D | day of the year |
e | day of the week (number) |
E | day of the week (text) |
E, EE, EEE Abbreviated day of the week. Example: Mon, Tue, Wed, Thu, Fri, Sat, Sun. Note:The actual language of this will depend on your locale. EEEE The full text day of the week. Example: Monday, Tuesday, Note:The actual language of this will depend on your locale. |
時間匹配練習:
2019-08 12:22:22 239 ---yyyy-MM HH:mm:ss SSS
Jul 2 2019 22:30 ---MMM d yyyy HH:mm
12 Mar 2019 22:30:30 +08:00 ---dd MMM yyyy HH:mm:ss ZZ
3.3Drop
當不需要某些數據的時候,可以使用drop插件丟棄,例如:
filter {
if [loglevel] == "debug" {
drop { }
}
}
3.4Geoip
GeoIP過濾器根據Maxmind
GeoLite2數據庫中的數據添加有關IP地址的地理位置的信息。
配置表達式:
filter {
geoip {
database =>“/tmp/xx.db”
fields => [“ip”,”longitute”,”city_name”]
source => “ip”
target => “geoip”
}
}
3.5Json
默認情況下,它會將解析后的JSON放在Logstash事件的根(頂層)中,但可以使用配置將此過濾器配置為將JSON放入任意任意事件字段 target。
當在解析事件期間發生不良事件時,此插件有一些回退場景。如果JSON解析在數據上失敗,則事件將不受影響,並將標記為 _jsonparsefailure; 然后,您可以使用條件來清理數據。您可以使用該tag_on_failure選項配置此標記。
如果解析的數據包含@timestamp字段,則插件將嘗試將其用於事件@timestamp,如果解析失敗,則字段將重命名為,_@timestamp並且事件將使用a標記 _timestampparsefailure。
filter {
json {
source => "message"
}
}
3.6Kv
此過濾器有助於自動解析各種消息(或特定事件字段)類似foo=bar。
例如,如果您有一條包含的日志消息ip=1.2.3.4 error=REFUSED,則可以通過配置來自動解析這些消息。
filter {
kv {
default_keys => [“host”,”ip”]
exclude_keys => [“times”]
field_split => “&?”
}
}
支持的參數:
allow_duplicate_values
default_keys
exclude_keys
field_split
field_split_pattern
include_brackets
include_keys
prefix
recursive
remove_char_key
remove_char_value
source
target
tag_on_failure
tag_on_timeout
timeout_millis
transform_key
transform_value
trim_key
trim_value
value_split
value_split_pattern
whitespace
3.7Ruby
執行ruby代碼。此過濾器接受內聯ruby代碼或ruby文件。這兩個選項是互斥的,具有稍微不同的工作方式。
例如:
filter {
ruby {
code => "event.cancel if rand <=0.90"
}
}
3.8Split
拆分篩選器通過拆分其中一個字段並將拆分產生的每個值放入原始事件的克隆來克隆事件。要拆分的字段可以是字符串或數組。
此過濾器的一個示例用例是從exec輸入插件獲取輸出,該插件為命令的整個輸出發出一個事件,並按換行分割該輸出 - 使每一行成為事件。
拆分過濾器還可用於將事件中的數組字段拆分為單個事件。JSON和XML中一種非常常見的模式是利用列表將數據組合在一起。
filter {
split {
field => "results"
target=> “messages”
terminator => “\n”
}
}
3.9Mutate
強大的mutate過濾器,可以對數據進行增刪改查。支持的語法多,且效率高
按照執行順序排列:
coerce
rename:重命名字段
update:更新數據
replace:替換字段值
convert:轉換字段類型
gsub:替換字符
uppercase:轉為大寫的字符串
capitalize:轉換大寫字符串
lowercase:轉為小寫的字符串
strip:剝離字符空白
remove:移除字段
split:分離字段
join:合並數組
merge:合並多個數組
copy:復制字段
例如:
filter {
mutate {
split => ["hostname","."]
add_field => {"shortHostname" => "%{hostname[0]}" }
}
mutate {
rename =>["shortHostname", "hostname" ]
}
}
4.Logstash性能優化
Logstash本質是一個數據處理流程,因此從建模的角度來看,最好的架構就是流水線工作,而剛好logstash官方設計也是基於流水線工作設計。
Logstash單實例本質是一個處理單元,因此最好的架構就是網狀架構,每個實例加載多個管道,每個管道可以自由的被多個處理單元進行處理數據。
logstash先對單個實例進行性能最大化,然后將其加到處理流水線中,由集群管理中心對其進行處理事務分配,從logstash主配置文件中我們知道,單個logstash的性能最大化由3個元素組成,batch.size(批處理大小),workers(工作線程數),batch.delay(批處理頻率),單個實例每秒最大tps計算公式=批處理大小工作線程數/處理頻率,加入1台8核16G內存的實例,它的配置batchsize為1千,工作線程數為8,頻率為200ms,它所能接受的理論tps為=1千8/0.2=4萬,但實際上肯定達不到這個數值,因為這中間需要jvm內存的緩存,而這也是logstash常常需要優化的部分。
假設4萬條數據,每條數據為20KB,那么每秒鍾需要的數據容量=4萬20KB8=6.4gb,那么在你的jvm里面需要常駐6.4gb的堆空間來加載這些數據,同時處理過程中需要有3個階段,因此實際使用過程中需要的對空間需要大於6.4gb,那么我們如何去分析logstash的性能呢?


在第一個圖中,我們看到CPU沒有得到非常有效的使用。實際上,JVM經常需要停止VM以獲得“完整的GC”。完全垃圾收集是過度記憶壓力的常見症狀。這在CPU圖表上的峰值中可見。在更有效配置的示例中,GC圖形模式更平滑,並且CPU以更均勻的方式使用。您還可以看到在分配的堆大小和允許的最大大小之間有足夠的空間,這為JVM GC提供了很大的工作空間。與資源密集程度較高的舊Gen“Full”GC所花費的時間相比,過度分配的VM在高效的Eden GC中花費的時間非常少。在一般實踐中,我們需要讓jvm的堆內存實際使用大小盡可能接近最大大小,且CPU處於比較平衡的趨勢在處理事務。
問題:在項目實踐過程中,我們的logstash單個實例應該配置多大的資源?
完成了單個實例的性能優化之后,對於超大流量處理,我們如何使用多個實例進行並發處理呢?使用更多實例還是更多管道,誰的機制更好呢?這里又帶來了一個新的概念,logstash的管道機制。
logstash集群可以使用負載均衡模式從kafka集群中快速的獲取到最大數據量。例如同一台物理機,配置為8核16G,使用1個8核16G資源實例同時對5個topic的5個分區進行處理,跟4個2核4G資源實例對5個topic的5個分區進行處理,誰會更快?
Logstash提供了一種通過調用的配置文件來執行此操作的方法pipelines.yml。此文件必須放在path.settings文件夾中並遵循以下結構:
-
pipeline.id:my-pipeline_1
path.config:"/etc/path/to/p1.config"
pipeline.workers: 3
-
pipeline.id:my-other-pipeline
path.config:"/etc/different/path/p2.cfg"
queue.type: persisted
此文件以YAML格式化,並包含字典列表,其中每個字典描述一個管道,每個鍵/值對指定該管道的設置。該示例顯示了由ID和配置路徑描述的兩個不同管道。對於第一個管道,值pipeline.workers設置為3,而在另一個管道中,啟用持久隊列功能。未在pipelines.yml文件中顯式設置的設置值將回退到logstash.yml設置文件中指定的默認值。
使用管道注意方法:
當啟動沒有參數的Logstash時,它將讀取pipelines.yml文件並實例化文件中指定的所有管道。另一方面,當使用-e或時-f,Logstash會忽略該pipelines.yml文件並記錄有關它的警告。
如果當前配置的事件流不共享相同的輸入/過濾器和輸出,並且使用標簽和條件將彼此分開,則使用多個管道顯得特別有用。(問題:猜想一下有用的地方會體現在哪些方面)
在單個實例中具有多個管道還允許這些事件流具有不同的性能和持久性參數(例如,管道工作者和持久隊列的不同設置)。這種分離意味着一個管道中的阻塞輸出不會在另一個管道中施加背壓。
也就是說,考慮到為單個管道調整默認值,考慮管道之間的資源競爭非常重要。因此例如考慮減少每個管道使用的管道工作者的數量,因為默認情況下每個管道將使用每個CPU核心1個工作線程。
持久隊列和死信隊列是按管道隔離的,其位置由pipeline.id值命名。
5.啟動和停止
5.1開機自啟動
確認主機運行的logstash實例及其參數
把所有實例的啟動命令追加到/etc/rc.local
例如:在/etc/rc.local追加(注意絕對路徑)
nohup /hadoop/logstash-${version}/bin/logstash -f /hadoop/logstash-${version}/conf/test.conf &>/var/log/logstash/test.log &
5.2啟動logstash
Logstash均需要配置成開機自啟動模式。
cd LOGSTASH_HOME
./bin/logstash -f config/test.conf
后台啟動:
nohup./bin/logstash -f config/test.conf -w 16 1>/var/log/logstash/{系統名} 2>&1 &
啟動多個logstash配置,新建一個目錄(conf)存放多個logstash的配置文件,
./bin/logstash -f conf/*
5.3停止logstash
ps -ef |grep logstash
kill -9 $pid
6.實踐應用
百看不如一做,做幾個小題目,走了不少彎路才繞出來,悲催。。。一開始不熟悉會走很多迷途,且行且珍惜。我自己更才發現官方文檔原來是特別有用,要仔細看看的,grok里的匹配也很強大,回頭另外總結下grok匹配規則,那下面來幾個小題目練習下(此處非標准答案,因己解析哈):
Jul 13 09:01:01 localhost systemd: Stopping user-0.slice.
Jul 13 09:03:56 localhost systemd: Created slice user-0.slice.
Jul 13 09:03:56 localhost systemd: Starting user-0.slice.
解析說明:此處的難點在於systemd:后的內容,有空格就不能使用空格split;時間格式就要仔細研究下grok的規則了
input {
stdin {}
}
filter {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:logtime} %{IPORHOST:hostname} %{GREEDYDATA:mess}" }
}
mutate {
split => ["mess",": "]
add_field => { "%{[mess][0]}" => "%{[mess][1]}" }
remove_field => ["mess","@version"]
}
}
output {
stdout { codec => rubydebug }
}
解析結果如圖:

type=CRED_REFR msg=audit(1563023036.263:1242): pid=6977 uid=0 auid=0 ses=156 subj=unconfined_u:unconfined_r:unconfined_t:s0-s0:c0.c1023 msg='op=PAM:setcred grantors=pam_unix acct="root" exe="/usr/sbin/sshd" hostname=securenat-daf2680060a6 addr=192.168.2.63 terminal=ssh res=success'
解析說明:此處出現了多層嵌套,而且msg字段有兩處(我自己理解的哈),最初開始使用的不是gsub,用split有點麻煩,用gsub直接替換干掉,干脆利索(我也是最后才發現“寶藏”);還有一個就是那個add_field用得有點醉了,糾結就糾結在那個audit放在msg字段里面
input {
stdin {}
}
filter {
kv {}
mutate{
gsub => ["[msg][0]","\(","=" ]
gsub => ["[msg][0]","\):","" ]
}
kv{
source => "[msg][0]"
}
kv{
source => "[msg][1]"
trim_value => "\"'"
target => "msg"
}
mutate {
add_field => { "[msg][audit]" => "%{[audit]}" }
remove_field => ["audit","@version"]
}
}
output {
stdout { codec => rubydebug }
}
解析結果如圖:

2019-06-19 08:55:28,818 [0cfad026-4b3e-4bd5-90c3-d993d67426571560851187560] [d95ddac9-8525-463a-b63f-f31984d745d4] [http-nio-8083-exec-15] INFO c.e.c.filter.ParameterWrapperFilter - 請求業務參數:{
"deviceId" : "hedc76d5b9f6b8b374705b7684f2f5e13",
"appVersion" : "4.4.0",
"osType" : "iOS",
"osVersion" : "OS12.2",
"serviceId" : "https://sit-mobile.essence.com.cn/self-select-stock/public/userStockSign/queryStock",
"appId" : "mobile.essence.com.cn",
"reqdata" : {
"accessToken" : "eyJhbGciOiJIUzUxMiJ9.eyJqdGkiOiIwZDYxNjg1MC0zNDM5LTQ4MDgtOTliOS1hYmQyMzdkOTk1ZTAiLCJpc3MiOiJhZG1pbkBlc3NlbmNlLmNvbS5jbiIsImlhdCI6MTU2txUZA",
"md5Sign" : ""
},
"requestId" : "1560905683118145",
"requestTime" : "2019-06-19 08:54:43"
}
解析說明:此處我一上來就使用grok匹配,思路就錯了點,多行那個應該是首先考慮的的,“請求業務參數”那里是中文冒號,grok是可以配置的,然后后面的多行直接使用json,手起刀落,一次性搞定;另一種方法里的奧妙在於取字符串的“(?<info6>(?<=請求業務參數:)(.*)/?)”
input {
file {
path => ["/zork/logstash-7.2.0/conf/test3.log"]
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^%{TIMESTAMP_ISO8601}"
negate => true
what => "previous"
}
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:info1}\] \[%{DATA:info2}\] \[%{DATA:info3}\] %{LOGLEVEL:loglevel} %{DATA:info4} - %{DATA:info5}:%{GREEDYDATA:info6}"
}
}
date {
match => [ "timestamp","yyyy-MM-dd HH:mm:ss,SSS" ]
}
json {
source => "info6"
target => "請求業務參數"
remove_field => [ "info5","info6","tags","@version" ]
}
}
output {
stdout { codec => rubydebug }
}
或者
input {
file {
path => ["/zork/logstash-7.2.0/conf/test3.log"]
start_position => "beginning"
sincedb_path => "/dev/null"
codec => multiline {
pattern => "^\s*"
negate => false
what => "previous"
}
}
}
filter {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{DATA:info1}\] \[%{DATA:info2}\] \[%{DATA:info3}\] %{LOGLEVEL:loglevel} %{DATA:info4} - %{GREEDYDATA:info5}"
}
}
date {
match => [ "timestamp","yyyy-MM-dd HH:mm:ss,SSS" ]
}
grok {
match => { "info5" => "(?<info6>(?<=請求業務參數:)(.*)/?)" }
}
json {
source => "info6"
target => "請求業務參數"
remove_field => [ "info5","info6","tags","@version" ]
}
}
output {
stdout { codec => rubydebug }
}
解析結果如圖:

06-19 2019 09:09:03,365 [43371440-7c77-4f67-b27e-33d3347bea2e1560906497865] [7f03a62f-92e8-4e3c-bf8b-8af6e3460121] [http-nio-8083-exec-17] INFO c.e.o.stock.repository.UserCenterApi - UC103009,名稱=>自選股查詢 ,業務參數=>{USER_ID=1597929340065856}
解析說明:此處的點在於這個時間,反正我是沒找到grok里匹配這個時間的模式,也繞了一大圈圈
input {
stdin {}
}
filter {
grok {
match => {
"message" => "%{MONTHNUM:month}-%{MONTHDAY:day} %{YEAR:year} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second} \[%{DATA:info1}\] \[%{DATA:info2}\] \[%{DATA:info3}\] %{LOGLEVEL:loglevel} %{DATA:info4} - %{DATA:info5},%{GREEDYDATA:info6}"
}
}
ruby{
code => " timestamp = event.get('month')+'-'+event.get('day')+' '+event.get('year')+' '+event.get('hour')+':'+event.get('minute')+':'+event.get('second') event.set('timestamp',timestamp) "
remove_field => [ "month","day","year","hour","minute","second","@version"]
}
kv {
source => "info6"
field_split => " ,"
# value_split => "=>"
trim_value => ">"
}
kv {
source => "[業務參數]"
trim_key => "{}"
trim_value => "{}"
target => "業務參數"
}
}
output {
stdout { codec => rubydebug }
}
解析結果如圖:
