有些既存的項目把一部分日志信息寫入到數據庫中了,或者是由於其它的原因我們希望把關系型數據庫中的信息讀取到 elasticsearch 中。這種情況可以使用 logstash 的 jdbc input 插件從關系型數據庫中讀取日志數據,然后輸出到 elasticsearch 中。本文介紹如何在 windows 系統中配置 logstash 從 SQL Server 數據庫中讀取數據。
說明:演示的環境為 windows server 2016,logstash 的版本為 6.2.4。
關鍵步驟
本文將按照下面的順序介紹使用 logstash 從 SQL Server 數據庫導出數據的關鍵步驟:
- 安裝 Java Development Kit(JDK)
- 安裝 Logstash
- 安裝 SQL Server 的 JDBC 驅動
- 配置 Logstash
- 集成域認證
- 持續讀取數據
- 把時間戳設置為記錄產生的時間
安裝 Java Development Kit(JDK)
運行 logstash 6.2.4 需要先在環境中安裝 JDK,請不要安裝最新版本的 JDK,最好是安裝 JDK8,演示中筆者安裝的版本為 jdk-8u111-windows-x64,直接安裝到默認的目錄中。
在 logstash 的運行腳本中用到了 JAVA_HOME 環境變量,因此我們需要先添加這個環境變量(注意,環境變量的值為 JDK 的實際安裝目錄):
環境變量添加完成后,新啟動一個 PowerShell 窗口,執行下面的命令:
> echo $env:JAVA_HOME
通過輸出的結果驗證環境變量是否被正確添加。
安裝 Logstash
請從官方下載 logstash 的 windows 安裝包,其實就是一個 zip 文件,比如:logstash-6.2.4.zip。Logstash 的安裝非常簡單,直接解壓縮就可以了。示例中,我把它解壓到了 C 盤的根目錄下,並重命名為 logstash,因此 logstash 的安裝目錄為:C:\logstash。
如果你想把 logstash 配置為 windows service 運行在后台,請參考《Windows 下配置 Logstash 為后台服務》一文。
安裝 SQL Server 的 JDBC 驅動
Logstash 需要使用 JDBC 驅動從 SQL Server 數據庫中讀取數據,因此我們還需要安裝 JDBC 驅動。同樣不要去獲取最新版本的驅動程序,請選擇 Microsoft JDBC Driver 4.2 for SQL Server。下載安裝包 sqljdbc_4.2.8112.200_enu.exe,然后運行它。其實它只是個自解壓的壓縮包,選個目錄並解壓縮。筆者選擇的 C 盤的根目錄,所以驅動文件的絕對路徑為:
C:\sqljdbc_4.2\enu\jre8\sqljdbc42.jar
在 logstash jdbc 插件中,我們可以直接指定這個文件的絕對路徑,比如:
input { jdbc { jdbc_driver_library => "C:\sqljdbc_4.2\enu\jre8\sqljdbc42.jar" ... } }
除了這種方式,我們還可以通過添加環境變量的方法來指定 JDBC 驅動。在 windows 系統中再添加一個名為 CLASSPATH 環境變量,變量的值為:
.;C:\sqljdbc_4.2\enu\jre8\sqljdbc42.jar
. 表示在當前目錄下查找,接着是一個 ;(分號)和后面的絕對路徑。如果這個路徑中包含空格,需要使用雙引號包裹這個路徑。
在添加了環境變量 CLASSPATH 后,就可以刪除上面配置文件中的 jdbc_driver_library 信息了!本文的示例中將會使用 CLASSPATH 環境變量。
配置 Logstash
從 SQL Server 數據庫中讀取數據是由 logstash 的 JDBC 插件實現的,該插件作為 input 插件默認已隨 logstash 安裝,可以直接使用:
input { jdbc { jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://DBSVR_NAME;databaseName=DB_NAME;user=****;password=****;" jdbc_user => "****" jdbc_password => "****" statement => "SELECT * FROM [DB].[SCHEMA].[TABLE]" } } output { file { path => "c:\output.txt" } }
jdbc_connection_string 描述了到 SQL Server 的連接字符串,你需要指定 SQL Server 服務器的地址、目標數據庫的名稱以及用戶名稱及其密碼。jdbc_user 和 jdbc_password 是對連接字符串中用戶名、密碼的重復。statement 則用來指定查詢語句,它返回的結果會被 logstash 獲取到。簡單起見,我把結果輸出到了本機的 c:\output.txt 文件中,這樣比較容易調試。
請根據你的實際情況更新上面的配置文件,並保存到 C:\logstash\sql.conf 文件中,然后以管理員權限啟動 PowerShell 並進入到 C:\logstash 目錄,執行下面的命令:
> .\logstash.bat -f .\sql.conf
如果配置信息正確, statement 指令指定的 SQL 語句的執行結果就會被保存到 C:\output.txt 文件中。
集成域認證
Windows 平台下很多場景中都會使用集成域認證的方式進行身份認證,比如在上例中采用集成域認證的方式代替連接字符串中的用戶名和密碼:
input { jdbc { jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver" jdbc_connection_string => "jdbc:sqlserver://DBSVR_NAME;databaseName=DB_NAME;integratedSecurity=true;" jdbc_user => "" statement => "SELECT * FROM [DB].[SCHEMA].[TABLE]" } }
在 jdbc_connection_string 字符串中我們用 integratedSecurity=true 替換了用戶名和密碼,並且刪除了配置項 jdbc_password。 配置項 jdbc_user 也被設置成了空字符串,因為此時 jdbc_user 的值可以隨便設置,但不能不設置。
配置 sqljdbc_auth.dll
如果此時啟動 logstash 會收到 "無法加載 sqljdbc_auth.dll" 的錯誤。原因是使用域集成認證時,需要加載 sqljdbc_auth.dll,默認的設置無法找到這個 dll。這個 dll 就在我們安裝的 JDBC 驅動目錄下,我們需要在 C:\logstash\config\jvm.options 文件中顯示指定它的路徑。比如添加下面的行:
-Djava.library.path=C:\sqljdbc_4.2\enu\auth\x64
因為我們的演示環境是 x64 架構的,所以這里指定 x64 目錄,對於 x86 架構的系統,請指定 x86 目錄。
現在就可以通過域認證的方式訪問 SQL Server 了,重新執行一遍前面的命令試試!
持續讀取數據
使用現在的配置,每執行一遍 .\logstash.bat -f .\sql.conf 命令就會把數據重復追加到 output.txt 文件中一遍。也就是說,每次執行 statement 語句返回的結果都基本一樣,不僅無法持續地從數據庫讀取數據,還會重復輸出已經獲取過的內容。我們可以使用 jdbc 插件的內置變量 sql_last_value 和配置項 schedule、use_column_value、tracking_column 解決這個問題:
schedule => "* * * * *" statement => "SELECT * FROM [DB].[SCHEMA].[TABLE] WHERE id > :sql_last_value" use_column_value => true tracking_column => "id"
schedule => "* * * * *" 表示每隔一分鍾重復執行一次數據讀取的操作,它支持 crontab 的語法,所以我們可以根據需要靈活設置讀取數據的間隔。內置變量 sql_last_value 會在本地保存一個值,它記錄了上次讀取的最后一條記錄中的一個值,如果 use_column_value 被設置為 true 且 tracking_column 被設置為 "id",則 sql_last_value 保存的就是 id 列的最后一個值(在關系型數據庫中,id 列是比較常見的設計)。
上面配置的含義為:
- sql_last_value 變量總是記錄上次讀取的最后一條記錄中的 id。
- 每隔一分鍾執行一次數據讀取操作。
- 每次只讀取上次讀取后新增的數據。
把時間戳設置為記錄產生的時間
在日志的查詢操作中,很多行為是基於默認的 @timestamp 字段的。@timestamp 字段可以簡單的理解為日志記錄產生的時刻。但是如果我們的日志記錄是從數據庫或其它地方導入過來的,@timestamp 字段默認記錄的是導入日志的時刻,這是不正確的。如果原有的日志記錄中保存有日志產生的時刻,我們就可以由它來獲得 @timestamp 字段的值:
filter { mutate { add_field => { "logtime" => "%{actiondatetime}" } } date { match => ["logtime", "ISO8601"] target => "@timestamp" remove_field => [ "logtime" ] } }
上面的配置假設數據庫中 actiondatetime 列保存了 datetime 類型的數據,通過一個臨時字段 logtime 把 actiondatetime 列保存的信息設置給 @timestamp。這樣導入后的日志記錄的 @timestamp 字段與 actiondatetime 字段保持一致。
參考:
Jdbc input plugin
How to copy SQL Server data to Elasticsearch using LogStash