windows配置啟動es,logstash同步mysql數據庫


windows配置部署es+logstash同步mysql數據庫

一、版本:

jdk--11.0.9			  
elasticsearch--7.10.0   https://www.elastic.co/cn/downloads/past-releases#elasticsearch
logstash--7.10.0	https://www.elastic.co/cn/downloads/past-releases#logstash
ik分詞器--7.10.0		https://github.com/medcl/elasticsearch-analysis-ik/releases
jdbc--8.0		https://repo1.maven.org/maven2/mysql/mysql-connector-java/

二、啟動es:

1、elasticsearch、logstash、ik下載好之后放在一個沒有空格的目錄內,解壓即可。

2、在elasticsearch/plugins目錄下解壓elasticsearch-analysis-ik,並把目錄名字改為ik

3、雙擊elasticsearch\bin\elasticsearch.bat即可啟動es,默認端口localhost:9200

三、啟動logstash:

logstash\bin目錄下創建logstash.conf文件

input {
    jdbc {
    	#jdbc連接數據庫的格式
        jdbc_connection_string => "jdbc:mysql:url\db_name?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai"
        jdbc_user => "數據庫用戶"
        jdbc_password => "密碼"
    	#指定jdbc的路徑
        jdbc_driver_library => "mysql-connector-java-8.0.15.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
    
    	#是否分頁
        jdbc_paging_enabled => "true"     
        jdbc_page_size => "5000"
    
    	#數據庫的查詢語句,兩種方法:1、statement_filepath指定sql路徑;2、sql語句
    	
        # statement_filepath => "filename.sql"
        statement => "SELECT * FROM tablename WHERE keyId >= :sql_last_value"
    	#多久執行一次同步。
        schedule => "* * * * *"
        type => "_doc"

        # 是否需要記錄某列的值,用於實時同步更新。
        use_column_value => true
        # 需要記錄的字段,一般用於記錄主鍵id,或者更新時間,用於sql查詢最新。
        tracking_column => "keyid"
        # 是否清除記錄的字段。
        clean_run => false
	#記錄字段保存的位置。
        last_run_metadata_path => "./logstash_capital_bill_last_id"
	#寫入es數據的key,默認會被轉成小寫,該字段用於控制是否小寫。
        #lowercase_column_names => True

        jdbc_default_timezone => "Asia/Shanghai"
        plugin_timezone => "local"
    }
}


#ElasticSearch中默認使用UTC時間,和中國時間相差8小時,加入以下配置
filter {
    ruby {
        code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
    }
    ruby {
        code => "event.set('@timestamp',event.get('timestamp'))"
    }
    mutate {
        remove_field => ["timestamp"]
    }
    ruby {
        code => "event.set('myTimeField', event.get('myTimeField').time.localtime + 8*60*60)"
    }
}

output {
    elasticsearch {
        hosts => ["localhost:9200"]
        document_id => "%{keyid}"#對應主鍵id,默認全部小寫
        index => "test" #index名字
        document_type => "_doc"
    }
    stdout {
     	# JSON格式輸出
        codec => json_lines
    }
}

進入logstash\bin目錄,輸入logstash -f logstash.conf啟動logstash


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM