elk7.3.2帶認證的基礎配置


拉取7.3.2的elasticsearch鏡像。

docker run -d --name elasticsearch --net ELS -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node"  elasticsearch:7.3.2

啟動后用

docker container cp -a 容器ID:路徑 宿主機路徑
拷貝容器內的config文件到宿主機用來掛載。

刪除此時的容器,在拷貝出來的配置文件elasticsearch.yml 加上:

http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type  允許跨域

xpack.security.enabled: true 開啟校驗。

完成版:

 

cluster.name: "docker-cluster"
network.host: 0.0.0.0
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
xpack.security.enabled: true

docker run -d --name elasticsearch --net ELS -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node"  -e "ELASTIC_PASSWORD=Tsl@2018"  -e "KIBANA_PASSWORD=Tsl@2018" -v /home/aa/elastic/config:/usr/share/elasticsearch/config elasticsearch:7.3.2
指定了elastic和kibana的密碼。並且掛載了配置文件。

拉取7.3.2的kibana鏡像。
同樣的
 docker run -d --name kibana --net ELS -p 5601:5601 kibana:7.3.2 空啟動后,拷貝config配置文件夾到宿主機上。
修改配置文件
kibana.yml

添加配置

elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.username: "kibana"
elasticsearch.password: "1111"
xpack.security.enabled: true.

完整版:

server.name: kibana
server.host: "0"
elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
xpack.monitoring.ui.container.elasticsearch.enabled: true
elasticsearch.username: "kibana"
elasticsearch.password: "1111"
xpack.security.enabled: true

此時配置的用戶名 kibana只是kibana鏈接es的用戶名,並不是kibana登陸的用戶名,登陸還是需要最高權限的elasstic賬號登陸。
 docker run -d --name kibana --net ELS -p 5601:5601 -v /home/aa/kibana/config:/usr/share/kibana/config kibana:7.3.2

拉取logstash的鏡像。

針對從數據庫抽取數據到es,7版本的logstash不用像5一樣需要指定啟動讀取配置文件了。而是有個專門的文件夾存放讀取寫入配置文件的。
pipeline
文件夾存放讀取寫入配置conf文件的。
config 存放logstash的啟動配置
logstash.yml修改后完整版:

http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://192.168.66.34:9200" ]
xpack.management.elasticsearch.username: elastic
xpack.management.elasticsearch.password: 1111
xpack.monitoring.enabled: true
xpack.monitoring.elasticsearch.username: elastic
xpack.monitoring.elasticsearch.password: 1111

需要復制活掛載驅動jar到容器中,

docker run -d -p 5044:5044 -p 9600:9600 -it --name logstash -v /home/aa/logstash/config/:/usr/share/logstash/config/ -v /home/aa/logstash/pipeline:/usr/share/logstash/pipeline -v /home/aa/logstash/mysql/:/some/config-dir/ --network ELS logstash:7.3.2

 這個版本在這樣啟動后,會一直報錯找不到驅動。

 

 

不是掛載位置的問題。后來查了好久說是容器內的java的classpath位置在/usr/share/logstash/logstash-core/lib/jars下,掛載在別的地方,盡管conf配置文件寫對了地址,依然是讀取不到的!!!

把驅動jar放在此文件夾下,conf里的驅動地址空着然后啟動就可以讀取到了。

應該最終elk三兄弟整合成一個docker-compose文件的,現在還沒學會。等等會了補充上。

 

 

input {
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證連接是否有效
jdbc_validate_connection => true
#多久進行連接有效驗證(4小時)
jdbc_validation_timeout => 14400
#連接失敗后最大重試次數
connection_retry_attempts => 50
#連接失敗后重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
# 同步頻率(分 時 天 月 年),默認每分鍾同步一次
schedule => "* * * * *"
statement => " select sal.alarmID, vi.districtID, di.name districtName, de.streetID, st.name streetName,
de.committeeID, comm.name committeeName, sal.villageID, vi.name villageName, de.buildingID,
vi.name, sal.alarmCount, sal.address,
sal.deviceType,sal.alarmTypeName,sal.modelID,
sal.alarmLevel,sal.alarmState,sal.alarmTime,
sal.alarmContent,de.installAddr,sal.updateTime
from e_sense_alarm_log sal
left join e_device de on de.deviceID = sal.deviceID
left join b_village vi on vi.villageID = de.villageID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
WHERE sal.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_alarm_last_time"
type => "alarm"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}

jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證連接是否有效
jdbc_validate_connection => true
#多久進行連接有效驗證(4小時)
jdbc_validation_timeout => 14400
#連接失敗后最大重試次數
connection_retry_attempts => 50
#連接失敗后重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => " select de.deviceID, de.isDelete, vi.districtID, di.name districtName, de.streetID, st.name streetName,
de.committeeID, comm.name committeeName, de.villageID, vi.name villageName, de.buildingID,
de.installAddr as installadd,
de.type as devicetype, bu.buildingNo as buildingno, bu.name as buildingName,
de.productModel as productmodel, de.name, de.code as code, de.installTime as installtime,
de.state, de.updateTime as updatetime
from e_device de
left join b_building bu on de.buildingID = bu.buildingID
left join b_village vi on vi.villageID = de.villageID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
WHERE de.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_device_last_time"
type => "device"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}

jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證連接是否有效
jdbc_validate_connection => true
#多久進行連接有效驗證(4小時)
jdbc_validation_timeout => 14400
#連接失敗后最大重試次數
connection_retry_attempts => 50
#連接失敗后重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => " select al.accessLogID, vi.districtID, di.name districtName,vi.streetID, st.name streetName, vi.committeeID,
comm.name committeeName, al.villageID, vi.name villageName, al.buildingID as buildingid,bui.name buildName,
peo.peopleID, al.peopleName as peoplename,
peo.gender, peo.phoneNo as phoneno, al.credentialNo as credentialno,
lab.name as peoplelabel, bu.buildingNo as buildingno,
al.cardNo as cardno, al.updateTime as opentime, peo.headPic as headpic,
(case al.openType when '100101' then '刷門禁卡開門' when '100201' then '人臉識別開門' when '100301' then '手機藍牙開門'
when '100302' then '手機遠程開門' when '100303' then '電話按鍵開門' when '100401' then '出門按鈕開門'
when '100402' then '鍵盤密碼開門' when '100501' then '身份證開門' when '100601' then '訪客呼叫開門' end) opentype,
peo.livePic as livepic, peo.idPic as idpic , al.faceLogID faceLogID, io.name ioName, al.deviceID
from e_access_log al
left join p_people peo on peo.credentialNo =al.credentialNo
left join p_people_label pl on pl.peopleID = peo.peopleID
left join s_label lab on lab.labelID = pl.labelID
left join b_building bu on bu.buildingID = al.buildingID
left join b_village vi on vi.villageID = al.villageID
left join b_in_out io on io.ioID = al.ioID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
left join b_building bui on bui.buildingID = al.buildingID
WHERE al.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_accessLog_last_time"
type => "accessLog"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}

jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證連接是否有效
jdbc_validate_connection => true
#多久進行連接有效驗證(4小時)
jdbc_validation_timeout => 14400
#連接失敗后最大重試次數
connection_retry_attempts => 50
#連接失敗后重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => "select fl.faceLogID,io.type as faceinouttype, vi.districtID, di.name districtName, vi.streetID, st.name streetName,
vi.committeeID, comm.name committeeName, io.villageID, vi.name villageName, io.ioID as ioid, io.name, bid.deviceID,
fl.personType as persontype,
peo.peopleName as peoplename, peo.gender, peo.nation, peo.birthDate,
peo.phoneNo as phoneno, peo.credentialNo as credentialno,
peo.domiclleDetailAddress, peo.residenceDetailAddress,
sl.name as peoplelabel, fl.updateTime as facecapturetime, fl.bkgUrl as bkgurl,
fl.faceUrl as faceurl, peo.headPic as headpic, peo.livePic as livepic, peo.idPic as idpic ,
peo.political, peo.education, peo.maritialStatus, peo.origin, fl.faceSimilarity*100 faceSimilarity, peo.peopleType
from e_face_log fl
left join b_in_out io on io.ioID = fl.ioID
left join p_people peo on peo.credentialNo = fl.credentialNo
left join p_people_label pl on pl.peopleID = peo.peopleID
left join s_label sl on sl.labelID = pl.labelID
left join b_village vi on vi.villageID = io.villageID
left join b_inout_device bid on bid.ioID = io.ioID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
where fl.updateTime >= :sql_last_value
#and fl.faceSource = 0
"
#是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_wkface_last_time"
type => "wkface"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}
jdbc {
jdbc_driver_library => ""
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://192.168.66.34:3309/111?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8"
#&useSSL=true&useUnicode=true&characterEncoding=UTF-8"
jdbc_user => "root"
jdbc_password => "111"
#使用前驗證連接是否有效
jdbc_validate_connection => true
#多久進行連接有效驗證(4小時)
jdbc_validation_timeout => 14400
#連接失敗后最大重試次數
connection_retry_attempts => 50
#連接失敗后重試時間間隔
connection_retry_attempts_wait_time => 1
jdbc_page_size => "2000"
schedule => "* * * * *"
statement => "select pr.parkingReserveID, vi.districtID, di.name districtName, vi.streetID, st.name streetName, vi.committeeID,
comm.name committeeName, pr.villageID, vi.name villageName, io.ioID as inioid,
io.ioID as outioid, pr.inParkingLogID as inparkinglogid,
pr.outParkingLogID as outparkinglogid, pr.carBrand as cartype,
pr.plateNo as plateno, peo.peopleName as peoplename, peo.phoneNo as phoneno,
peo.credentialNo as credentialno, pr.insertTime as intime, pr.updateTime as outtime,
peo.headPic as headpic,
peo.livePic as livepic, peo.idPic as idpic, inlog.platePic as inplatepic,
outlog.platePic as outplatepic, inlog.minPlatePic as inplatepic,
outlog.minPlatePic as outplatepic, pr.isRegister
from e_parking_reserve pr
left join e_parking_channel pc on pc.parkingID = pr.parkingID
left join b_in_out io on io.ioID = pc.ioID
left join e_parking_car ec on ec.plateNo = pr.plateNo
left join p_people peo on peo.peopleID = ec.peopleID
left join e_parking_log inlog on inlog.parkingLogID = pr.inParkingLogID
left join e_parking_log outlog on outlog.parkingLogID = pr.outParkingLogID
left join b_village vi on vi.villageID = io.villageID
left join b_district di on di.districtID = vi.districtID
left join b_street st on st.streetID = vi.streetID
left join b_committee comm on comm.committeeID = vi.committeeID
where pr.updateTime >= :sql_last_value"
#是否記錄上次執行結果, 如果為真,將會把上次執行到的 tracking_column 字段的值記錄下來,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
use_column_value => true
tracking_column => "updateTime"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/last_record/logstash_wkcar_last_time"
type => "wkcar"
# 是否將 字段(column) 名稱轉小寫
lowercase_column_names => false
}

 

}
output {
if [type] == "alarm"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "alarmlogindex"
# 需要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{alarmID}"
document_type => "alarm"
}
}
if [type] == "device"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "deviceindex"
# 需要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{deviceID}"
document_type => "device"
}
}
if [type] == "accessLog"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "accesslogindex"
# 需要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{accessLogID}"
document_type => "accessLog"
}
}
if [type] == "wkface"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "facelogindex"
# 需要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{faceLogID}"
document_type => "wkface"
}
}
if [type] == "wkcar"{
elasticsearch {
# ES的IP地址及端口
hosts => ["192.168.66.34:9200"]
user => "elastic"
password => "111"
# 索引名稱 可自定義
index => "parkingreservelogindex"
# 需要關聯的數據庫中有有一個id字段,對應類型中的id
document_id => "%{parkingReserveID}"
document_type => "wkcar"
}
}
stdout {
# JSON格式輸出
codec => json_lines
}
}



 

 

docker-compose.yml的elk配置:

提前把elk的3個config文件夾復制進相應的文件夾下。

 logstash因為需要復制驅動進容器,所以需要自定義一個鏡像。

Dockerfile內容:

FROM logstash:7.2.0

MAINTAINER kf

ADD ./mysql/*****.jar /usr/share/logstash/logstash-core/lib/jars      //復制驅動jar進鏡像

RUN mkdir last_record                       //容器內在當前目錄下創建文件夾

此處源文件需要是相對路徑。不能寫絕對路徑。

docker-compose.yml文件內容:

version: "3"
services:

elasticsearch:
image: elasticsearch:7.2.0
container_name: elastic
ports:
- 9200:9200
- 9300:9300
environment:
ELASTIC_PASSWORD: Root@2018
KIBANA_PASSWORD: Kibana@2018
LOGSTASH_PASSWORD: Logstash@2018
discovery_type: single-node
volumes:
- /root/data/elastic/config:/usr/share/elasticsearch/config
restart: always

kibana:
image: kibana:7.2.0
container_name: kibana
ports:
- 5601:5601
volumes:
- /root/data/kibana/config:/usr/share/kibana/config
restart: always

logstash:
image: logstash:7       自定義的鏡像
container_name: logstash
ports:
- 5044:5044
- 9600:9600
volumes:
- /root/data/logstash/config:/usr/share/logstash/config
- /root/data/logstash/pipeline:/usr/share/logstash/pipeline
restart: always
networks:
default:
external:
name: ELS


執行docker-compose up -d 報錯需要創建network 根據提示創建完成后再次執行即可。
發現docker-compose命令啟動的es會報錯。
最后還是用了docker run的方式啟動。
沒有找到原因。有搞成了的麻煩留言給我,謝謝
后續。請教了別人,着了道原因。docker-compose啟動時 已集群方式啟動的,雖然配了單節點啟動的環境變量,但還是不會生效。discovery_type: single-node
此時要把此變量加在掛載的elasticsearch.yml文件中。完整版:

cluster.name: "docker-cluster"
discovery.type: "single-node"
network.host: 0.0.0.0
http.port: 9200
http.cors.enabled: true
http.cors.allow-origin: "*"
http.cors.allow-headers: Authorization,X-Requested-With,Content-Length,Content-Type
xpack.security.enabled: true
#xpack.security.transport.ssl.enabled: true

此時docker-compose.yml完整版是:

version: "3.7"
services:

elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.3.2
container_name: elastic
ports:
- 9200:9200
- 9300:9300
environment:
#- discovery_type=single-node
- ELASTIC_PASSWORD=Root@
- KIBANA_PASSWORD=Kibana@
- LOGSTASH_PASSWORD=Logstash@
volumes:
- ./elastic/config:/usr/share/elasticsearch/config
restart: always

kibana:
image: kibana:7.3.2
container_name: kibana
ports:
- 5601:5601
volumes:
- /data/elk/kibana/config:/usr/share/kibana/config
depends_on:
- elasticsearch
restart: always

logstash:
image: logstash:7
container_name: logstash
ports:
- 5044:5044
- 9600:9600
volumes:
- /data/elk/logstash/config:/usr/share/logstash/config
- /data/elk/logstash/pipeline:/usr/share/logstash/pipeline
depends_on:
- elasticsearch
restart: always
networks:
default:
external:
name: ELS

 

此時既可以  docker-compose up -d --build 啟動elk了。

此時存入es的數據會存在時區導致的時間差8小時問題。可以在docker-compose的es的環境變量加入:- TZ=Asia/Shanghai

 

 即可將插入的時間字段和數據庫一致。但此時的時間會是UTC格式的。前端轉變格式后 時間又會默認加8小時。所以盡量在es這里存入時時間格式也轉為普通的YYYY-MM-DD HH:mm:ss格式。

查到一個函數:DATE_FORMAT(sal.alarmTime,'%Y-%m-%d %T')    存儲類型type為text。

還有一種方式看來的,沒有經過測試:

將mysql中的mytime數據在logstash中作一個8小時追加

filter{ ruby 
        { 
            code => "event.set('mytime', event.get('mytime').time.localtime + 8*60*60)" 
        }}
————————————————

 

 轉換時間格式為text后,查詢會報錯:

Fielddata is disabled on text fields by default. Set fielddata=true on [gender] in order to load fielddata in memory by uninverting the inverted index. Note that this can however use significant memor

此時需要在kibana執行:

PUT facelogindex/_mapping
{
  "properties": {
    "facecapturetime": {
      "type": "text",
      "fielddata": true
    }
  }
}

facelogindex為索引,  facecapturetime為字段名







免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM