ElasticSearch7.x系列三:Logstash的使用


前言

Logstash一般用來收集整理日志,但是也可以做數據的同步

我希望我的數據庫的數據全部存到ElasticSearch之后,我的數據庫做的增刪改查都可以增量的更新到ElasticSearch里面

Logstash配置文件

數據庫驅動

首先,下載JDBC,我使用的是SQLserver,所以直接搜索

Microsoft SQL Server JDBC

然后下載即可,我目前是放在了Logstash的bin文件目錄下

配置文件編寫:看看就行,重點在多表同步配置

還記得安裝篇,我寫的運行Logstash嗎?命令是這樣的,在bin目錄下執行

.\logstash -e 'input { stdin { } } output { stdout {} }'

注意到input和output了吧,一個是輸入,一個是輸出,也就是說Logstash的啟動,得需要一個配置文件,這個配置文件有輸入和輸出.這就好辦了,輸入源有很多,我這里使用SQLserver,輸出呢也有很多,我這里介紹兩個

配置文件,讀取SQLserver數據庫導出到txt和ElasticSearch

input {
    jdbc {
      jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      jdbc_user => "sa"
      jdbc_password => "666666"
       # clean_run => true  #開啟了sql_last_value就從0開始了
      schedule => "* * * * *"
      statement => "select Id,Name as name,CreateDate as createDate from Solution where SolutionId > :sql_last_value"
	  lowercase_column_names => false
      use_column_value => true
      tracking_column => "solutionid"   #注意,這里必須全部小寫
    }
}

output { 
    file{
      path => "D:\Desktop\output.txt"
    }
    
    # elasticsearch {
    #       hosts => ["192.168.3.8:9200"]
    #       index => "solution"
    # }
}

我必須講解一下這個配置文件,因為有坑,我踩了

首先,我希望讀取數據庫一張表的數據,導出到txt文本,我希望數據庫有新加入的數據的時候,能增量的給我導出到txt里面去

所以我們查數據庫的時候,導出的做一個記錄,比如Id,下次我再導出的時候,只導出Id比上一次大的即可,但是使用Id作為增量有一個問題,就是我數據庫更新了以前的數據,logstash不知道,所以增量的判斷字段必須得是時間,這個在多表更新的那里講

輸入源解釋

  1. jdbc_driver_library : 沒啥說的,數據庫驅動,也可以不寫死路徑,可以配置在環境變量里,不過我懶得弄了
  2. jdbc_driver_class : 沒啥說的,驅動類別
  3. jdbc_connection_string : 你的數據庫地址
  4. jdbc_user : 數據庫用戶名
  5. jdbc_password : 數據庫密碼
  6. clean_run : 值為true的話sql_last_value就從0開始了,sql_last_value下面講
  7. schedule : 更新頻率,五個星星代表 分 時 天 月 年,例如"* * * * *"就是每分鍾, " * /1 * * " 就是每小時, "/10 * * * *" /10 是每十分鍾 不加斜杠 10 是每小時的第10分,默認是最小單位是分,也就是全部5個就是每分鍾執行一次,其實也可以秒級執行的,這樣寫
 schedule => "*/5 * * * * *"

只要在前面再加一個* 單位就是秒,這里就是每5s執行一次

  1. use_column_value : 是否開啟增量字段
  2. tracking_column : 指定增量字段,一般情況下都是表的Id或者創建時間這倆字段,但是我推薦使用時間字段,而且必須要注意,Logstash默認把數據庫字段全部轉成小寫了,所以我們得關閉,使用lowercase_column_names => false
  3. lowercase_column_names => false 關閉默認的小寫
  4. statement : 數據庫查詢語句,也可以寫到sql文件里面,讀取文件,不過簡單的話直接寫語句挺好的,這里做了一個增量查詢,我每查詢一次,就把我的增量字段賦值給JDBC自帶的sql_last_value變量,所以我查詢的時候大於sql_last_value就實現了增量更新了

現在知道clean_run 的作用了吧,自己測試的時候,測了幾次,sql_last_value變的很大了,又想從頭測起,就把sql_last_value清0即可
在正式服務器上使用的時候,clean_run 關了

輸出解釋

file{
  path => "D:\Desktop\output.txt"
}

這個就是導出到txt

elasticsearch {
    hosts => ["192.168.100.100:9200"]
    index => "solution"
}

這個就是導出到ElasticSearch,Index是solution

啟動

還是來到bin目錄下,啟動命令框,輸入

logstash -f jdbcconfig/jdbc.conf

我在bin目錄下新建了一個文件夾叫jdbcconfig,所以換成你們自己的文件夾名.我的配置文件叫jdbc.conf,同樣也換成你們自己的

多表同步 : 重要,非常重要

這一塊是重點,我踩了很多的坑,找了很多的資料,嘗試了很多次,終於找到了我想要的解決方案

上面講的配置是單表的,但是實際應用的時候都是多表導入到ElasticSearch的,這個時候有兩種方法,我目前已知的2種

  1. 多寫幾個配置文件,多啟動幾次
  2. 一個配置文件里寫多個輸入源和多個輸出
input {
    jdbc {
      jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      jdbc_user => "sa"
      jdbc_password => "666666"
      schedule => "* * * * *"
      clean_run => true
      statement => "select ArticleID as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate from Article where ApproveState = 1 and UpdateDate > :sql_last_value"
      use_column_value => true
      tracking_column => "updateDate"
      tracking_column_type => "timestamp"
      type => "article"
    }
    jdbc {
      jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      jdbc_user => "sa"
      jdbc_password => "666666"
      schedule => "* * * * *"
      clean_run => true
      statement => "select SolutionId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate,Tags,ClickCount from Solution where UpdateDate > :sql_last_value"
      use_column_value => true
      tracking_column => "updatedate"
      tracking_column_type => "timestamp"
      type => "solution"
    }
    jdbc {
      jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      jdbc_user => "sa"
      jdbc_password => "666666"
      schedule => "* * * * *"
      # clean_run => true
      statement => "select FileId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from DownloadFile where ApproveState = 1 and UpdateDate > :sql_last_value"
      use_column_value => true
      tracking_column => "updateDate"
      tracking_column_type => "timestamp"
      type => "downloadfile"
    }
    jdbc {
      jdbc_driver_library => "D:\Vae\ElasticSearch\logstash-7.6.2\logstash-7.6.2\bin\jdbcconfig\mssql-jdbc-8.2.2.jre8.jar"
      jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
      jdbc_connection_string => "jdbc:sqlserver://192.168.100.100:1433;DatabaseName=VaeDB;"
      jdbc_user => "sa"
      jdbc_password => "666666"
      schedule => "* * * * *"
      clean_run => true
      statement => "select VideoId as Id,Title as title,CreateDate as createDate,Content as content,CONVERT (VARCHAR (30),UpdateDate,25) AS UpdateDate from Video where UpdateDate > :sql_last_value"
      use_column_value => true
      tracking_column => "updateDate"
      tracking_column_type => "timestamp"
      type => "video"
    }
}

filter {
    mutate {
            add_field => {
                    "[@metadata][articleid]" => "%{articleid}"
            }
            add_field => {
                    "[@metadata][solutionid]" => "%{solutionid}"
            }
            add_field => {
                    "[@metadata][fileid]" => "%{fileid}"
            }
            add_field => {
                    "[@metadata][videoid]" => "%{videoid}"
            }
    }  
}

output {
  # stdout {
  #     codec => json_lines
  # }
 if [type] == "article"{
      elasticsearch {
        hosts  => "192.168.100.100:9200"
        index => "article"
        action => "index"
        document_id => "%{[@metadata][articleid]}"
      }
  }
  if [type] == "solution"{
      elasticsearch {
        hosts  => "192.168.100.100:9200"
        index => "solution"
        action => "index"
        document_id => "%{[@metadata][solutionid]}"
      }
  }
    if [type] == "downloadfile"{
      elasticsearch {
        hosts  => "192.168.100.100:9200"
        index => "downloadfile"
        action => "index"
        document_id => "%{[@metadata][fileid]}"
      }
  }
  if [type] == "video"{
      elasticsearch {
        hosts  => "192.168.100.100:9200"
        index => "video"
        action => "index"
        document_id => "%{[@metadata][videoid]}"
      }
  }

}

配置講解

如果是Id就這樣寫

use_column_value => true
tracking_column => "videoid"

如果是時間就這樣寫,多了一個時間類型

use_column_value => true
tracking_column => "updatedate"
tracking_column_type => "timestamp"

想要做增量的更新,包括插入和修改,就最好使用一個updateDate字段

filter是干嘛的?

這一塊我現在還很模糊,不過我現在會使用的是定義變量,我定義一個變量,使用

filter {
    mutate {
      add_field => {
              "[@metadata][videoid]" => "%{videoid}"
      }
    }
}

這個意思就是,定義了一個[@metadata][videoid]的變量,值是上面的查詢的表里面的videoid

然后我下邊的output里面就可以直接"%{[@metadata][videoid]}"來調用了

所以目前我所知的filter作用,就是變量溝通input和output,當然還有其他的,我還沒看

sql語句

我input里面寫的sql語句,都是把列名都寫出來了,這樣好,可以重命名,一定要注意,數據庫里面的type字段一定要重命名,不然會沖突

而且時間我都轉了一下

CONVERT (VARCHAR (30),UpdateDate,25) AS updateDate

因為網上看到有很多時區的問題,所以我直接轉成字符串就解決時區的問題了吧

數據庫Type字段問題

數據庫字段不能有type,因為Logstash的配置文件使用了type,巨坑,所以數據庫里面的type字段自己重命名

數據庫字段必須駝峰命名

我數據查詢的時候,記得一定要重命名成駝峰的格式,不然ES不能映射給Model,例如

select VideoId as Id,Name as name,Title as title,CreateDate as createDate from ...

注意,單個單詞的全部小寫,CreateDate這種兩個大寫字母的,駝峰命名,前面小寫,后面大寫

如果你實在不想使用駝峰命名,每個字段去as一次,也有其他辦法,就是在你接收的Model上加上 [Text(Name = "CreateDate")]也行

類型錯誤

expected:'String Begin Token', actual:'10', at offset:597
expected:'String Begin Token', actual:'22', at offset:332

這種錯都一樣,都是類型的錯誤,比如ES里面是long,但是你的Model接收的是string,我遇到的情況是我的數據ID,有的是int類型的,有的是string類型的,我都用一個string類型的Id去接收,這就報錯了


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM