docker安裝logstash,在hub.docker官網是沒有示例的。查了文章,大部分復制黏貼,語焉不詳的。看着懵,雖然經過復制黏貼操作啟起來了,但還是很多不理解。回想下不用docker安裝的logstash,對比了下大致有點理解了。可自己配置run,還是啟動沒一會自動停止了。懊惱不已。
剛才仔細對比,小心求證發現了問題所在。貌似logstash啟動要使用交互模式,即啟動語句里要加上 -it。否則就會啟動后停止。
另外查看資料,官網說的需要配置兩個文件,有一個叫pipelines.yml的。
按照說明配置了幾次都有問題,看啟動日志,貌似自己配的 pipelines文件都沒有被讀取。后來發現是因為版本問題。我自己用的5.6.12版本不存在此文件的。而官網在5版本也說要配置這個,我就納悶了。下載了6版本的logstash,發現conf文件夾下存在這么個文件。這下就明了了,我的5.6.12只需要只需要寫一個conf后綴的文件即可,只有6版本的才需要另外配置管道文件pipelines,來指定需要讀取哪個conf文件。
這是5版本的conf文件夾內容
這是6版本的conf文件夾下內容:
我此次是要讀取數據庫的數據到es中,
上傳了一個mysql的驅動jar文件。自定義一個mysql01.conf文件:
input { jdbc { jdbc_driver_library => "/home/kf/soft/logstash-5.6.12/config/mysql/mysql-connector-java-5.1.25.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.0.4:3306/test?serverTimezone=Asia/Shanghai&useSSL=true&useUnicode=true&characterEncoding=UTF-8" jdbc_user => "root" jdbc_password => "root" schedule => "* * * * *" statement => "SELECT id,username,password,password_salt,status,insertTime,updateTime FROM users WHERE updateTime >= :sql_last_value" #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true use_column_value => true tracking_column => "updateTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_users_last_time" type => "users" # 是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } } output { elasticsearch { # ES的IP地址及端口 hosts => ["192.168.88.130:9200"] # 索引名稱 可自定義 index => "users" # 需要關聯的數據庫中有有一個id字段,對應類型中的id document_id => "%{id}" document_type => "users" } stdout { # JSON格式輸出 codec => json_lines } }
因為elk是在虛擬機搭建的,而mysql是我本機windows的,一開始鏈接報錯,鏈接被拒絕。mysql是默認只允許在本地鏈接的,需要在mysql更改權限:
例如,你想myuser使用mypassword從任何主機連接到mysql服務器的話。
%代表允許所有域的連接
GRANT ALL PRIVILEGES ON *.* TO 'myuser'@'%' IDENTIFIED BY 'mypassword' WITH GRANT OPTION;
要是本機連mysql連不上,再添加localhost訪問權限,本機就可以登錄了 grant all privileges on luffy.* to 'luffy'@'localhost' identified by 'luffy'; 設置完有權限限制的賬號后一定要刷新權限,如果沒刷新權限,該終端無法被通知,當然也可以直接重啟cmd
FLUSH PRIVILEGES;
此時可以鏈接mysql了。但是一直報錯,提示:
而我的數據庫中insertTime,updateTime字段都是這樣的,駝峰命名。百度了下,有說需要把配置文件mysql01.conf中的tracking_column => "updateTime",updateTime用雙引號括起來的,我本身就是括起來的,有說需要把查詢字段放在select語句中的,我也放了。還是不行,然后看報錯行上方的語句,logstash讀取時強制把駝峰命名的T讀成了小寫t。然后查資料在mysql01配置文件中改加了一句
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
這時候可以讀取了。估計是logstash默認把查詢字段全部轉小寫了。
last_run_metadata_path => "./last_record/logstash_users_last_time"
這個是存儲最后更新時間的文件的。開始沒有創建last_record文件夾,啟動會報找不到文件,此時需要手工創建一下即可。
至此,我的logstash讀取數據庫數據到es完成。
在測試環境下配置連接的mysql也是docker容器的。
啟動logstash容器是需要指定配置文件,命令是:
docker run -d -p 5044:5044 -p 9600:9600 -it --name logstash --network ELS -v /home/smartcity/logstash/config/:/usr/share/logstash/config/ logstash:5.6.13 -f /usr/share/logstash/config/union_blueplus.conf
啟動后,logstash日志報錯:
Unable to connect to database. Trying again {:error_message=>"Java::ComMysqlJdbcExceptionsJdbc4::CommunicationsException: Communications link failure\n\nThe last packet successfully received from the server was 5,867 milliseconds ago. The last packet sent successfully to the server was 5,867 milliseconds ago."}
查了好久,很多說是要該mysql配置的,明顯不合理。到牆外看到將jdbc插件的,我也查了版本,是最新的4.3.13版本。
最后跟網上的做對比,排查出是因為url地址多加了ssl的參數導致的。
多表導入:
input { jdbc { jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8" #&useSSL=true&useUnicode=true&characterEncoding=UTF-8" jdbc_user => "root" jdbc_password => "Tsl@2018" #使用前驗證連接是否有效 jdbc_validate_connection => true #多久進行連接有效驗證(4小時) jdbc_validation_timeout => 14400 #連接失敗后最大重試次數 connection_retry_attempts => 50 #連接失敗后重試時間間隔 connection_retry_attempts_wait_time => 1 jdbc_page_size => "2000" schedule => "* * * * *" statement => "select sal.alarmID, sal.villageID, sal.deviceType,sal.alarmTypeName,sal.modelID, sal.alarmLevel,sal.alarmState,sal.alarmTime, sal.alarmContent,de.installAddr,sal.updateTime from e_sense_alarm_log sal left join e_device de on de.deviceID = sal.deviceID WHERE sal.updateTime >= :sql_last_value" #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true use_column_value => true tracking_column => "updateTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_alarm_last_time" type => "alarm" # 是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } jdbc { jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8" #&useSSL=true&useUnicode=true&characterEncoding=UTF-8" jdbc_user => "root" jdbc_password => "Tsl@2018" #使用前驗證連接是否有效 jdbc_validate_connection => true #多久進行連接有效驗證(4小時) jdbc_validation_timeout => 14400 #連接失敗后最大重試次數 connection_retry_attempts => 50 #連接失敗后重試時間間隔 connection_retry_attempts_wait_time => 1 jdbc_page_size => "2000" schedule => "* * * * *" statement => "select de.deviceID, de.isDelete, de.villageID as villageid, de.installAddr as installadd, de.type as devicetype, de.buildingID,bu.buildingNo as buildingno, bu.name as buildingName, de.productModel as productmodel, de.name as deviceName, de.code as code, de.state, de.updateTime as updatetime from e_device de left join b_building bu on de.buildingID = bu.buildingID WHERE de.updateTime >= :sql_last_value" #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true use_column_value => true tracking_column => "updateTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_device_last_time" type => "device" # 是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } jdbc { jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8" #&useSSL=true&useUnicode=true&characterEncoding=UTF-8" jdbc_user => "root" jdbc_password => "Tsl@2018" #使用前驗證連接是否有效 jdbc_validate_connection => true #多久進行連接有效驗證(4小時) jdbc_validation_timeout => 14400 #連接失敗后最大重試次數 connection_retry_attempts => 50 #連接失敗后重試時間間隔 connection_retry_attempts_wait_time => 1 jdbc_page_size => "2000" schedule => "* * * * *" statement => "select al.accessLogID, al.villageID as villageid, al.peopleName as peoplename, peo.gender, peo.phoneNo as phoneno, al.credentialNo as credentialno, lab.name as peoplelabel, bu.buildingNo as buildingno, al.buildingID as buildingid, al.cardNo as cardno, al.openType as opentype, al.updateTime as opentime from e_access_log al left join p_people peo on peo.credentialNo =al.credentialNo left join p_people_label pl on pl.peopleID = peo.peopleID left join s_label lab on lab.labelID = pl.labelID left join b_building bu on bu.buildingID = al.buildingID WHERE al.updateTime >= :sql_last_value" #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true use_column_value => true tracking_column => "updateTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_accessLog_last_time" type => "accessLog" # 是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } jdbc { jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8" #&useSSL=true&useUnicode=true&characterEncoding=UTF-8" jdbc_user => "root" jdbc_password => "Tsl@2018" #使用前驗證連接是否有效 jdbc_validate_connection => true #多久進行連接有效驗證(4小時) jdbc_validation_timeout => 14400 #連接失敗后最大重試次數 connection_retry_attempts => 50 #連接失敗后重試時間間隔 connection_retry_attempts_wait_time => 1 jdbc_page_size => "2000" schedule => "* * * * *" statement => "select fl.faceLogID,io.type as faceinouttype, io.villageID as villageid, io.ioID as ioid, fl.personType as persontype, peo.peopleName as peoplename, peo.phoneNo as phoneno, peo.credentialNo as credentialno, sl.name as peoplelabel, fl.updateTime as facecapturetime from e_face_log fl left join b_in_out io on io.ioID = fl.ioID left join p_people peo on peo.credentialNo = fl.credentialNo left join p_people_label pl on pl.peopleID = peo.peopleID left join s_label sl on sl.labelID = pl.labelID where fl.updateTime >= :sql_last_value and fl.faceSource = 0" #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true use_column_value => true tracking_column => "updateTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_wkface_last_time" type => "wkface" # 是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } jdbc { jdbc_driver_library => "/usr/share/logstash/config/mysql/mysql-connector-java-5.1.38.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/union_blueplus?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8" #&useSSL=true&useUnicode=true&characterEncoding=UTF-8" jdbc_user => "root" jdbc_password => "Tsl@2018" #使用前驗證連接是否有效 jdbc_validate_connection => true #多久進行連接有效驗證(4小時) jdbc_validation_timeout => 14400 #連接失敗后最大重試次數 connection_retry_attempts => 50 #連接失敗后重試時間間隔 connection_retry_attempts_wait_time => 1 jdbc_page_size => "2000" schedule => "* * * * *" statement => "select pr.parkingReserveID, pr.villageID as villageid, io.ioID as inioid, io.ioID as outioid, pr.inParkingLogID, pr.outParkingLogID, pr.carBrand as cartype, pr.plateNo as plateno, peo.peopleName as peoplename, peo.phoneNo as phoneno, peo.credentialNo as credentialno, pr.updateTime as intime from e_parking_reserve pr left join e_parking_channel pc on pc.parkingID = pr.parkingID left join b_in_out io on io.ioID = pc.ioID left join e_parking_car ec on ec.plateNo = pr.plateNo left join p_people peo on peo.peopleID = ec.peopleID where pr.updateTime >= :sql_last_value" #是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中 record_last_run => true use_column_value => true tracking_column => "updateTime" tracking_column_type => "timestamp" last_run_metadata_path => "./last_record/logstash_wkcar_last_time" type => "wkcar" # 是否將 字段(column) 名稱轉小寫 lowercase_column_names => false } } output { if [type] == "alarm"{ elasticsearch { # ES的IP地址及端口 hosts => ["192.168.66.34:9200"] # 索引名稱 可自定義 index => "alarmlogindex" # 需要關聯的數據庫中有有一個id字段,對應類型中的id document_id => "%{alarmID}" document_type => "alarm" } } if [type] == "device"{ elasticsearch { # ES的IP地址及端口 hosts => ["192.168.66.34:9200"] # 索引名稱 可自定義 index => "deviceindex" # 需要關聯的數據庫中有有一個id字段,對應類型中的id document_id => "%{deviceID}" document_type => "device" } } if [type] == "accessLog"{ elasticsearch { # ES的IP地址及端口 hosts => ["192.168.66.34:9200"] # 索引名稱 可自定義 index => "accesslogindex" # 需要關聯的數據庫中有有一個id字段,對應類型中的id document_id => "%{accessLogID}" document_type => "accessLog" } } if [type] == "wkface"{ elasticsearch { # ES的IP地址及端口 hosts => ["192.168.66.34:9200"] # 索引名稱 可自定義 index => "facelogindex" # 需要關聯的數據庫中有有一個id字段,對應類型中的id document_id => "%{faceLogID}" document_type => "wkface" } } if [type] == "wkcar"{ elasticsearch { # ES的IP地址及端口 hosts => ["192.168.66.34:9200"] # 索引名稱 可自定義 index => "parkingreservelogindex" # 需要關聯的數據庫中有有一個id字段,對應類型中的id document_id => "%{parkingReserveID}" document_type => "wkcar" } } stdout { # JSON格式輸出 codec => json_lines } }
參考自:https://blog.csdn.net/u010887744/article/details/86708490