使用logstash同步mysql數據到elasticsearch


說明:以下文檔說到elasticsearch都簡稱es。

           logstash同步數據不僅僅是數據庫到es,它只是一個同步中間件,數據來源和數據的目標存儲都是可以在配置里面指定的,根據數據來源和目標存儲的不同配合logstash提供的不同插件。

           本文講解的是從mysql 同步到es實現方式。使用的版本logstash版本是6.6.1,es版本是6.5.4,logstash版本和es版本一定要配套,如果2者版本差距過大,同步過程中會報版本錯誤。

          寫這篇文章的時候es和Logstash最新版本是7.2。

 

 

1、logstash是一款開源的數據同步工具,還是比較優秀的。其他幾款同步es數據的工具,比如elasticseach-jdbc等都試過,好久沒人維護了。

     官方文檔訪問地址:https://www.elastic.co/guide/en/logstash/current/index.html

2、es是一個基於Lucene的搜索服務器。它提供了一個分布式多用戶能力的全文搜索引擎,基於RESTful web接口。

     官方說明文檔訪問地址:https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html

3、logstash和es的安裝

     參考官方文檔。

4、mysql同步到es的logstash配置文件,命名一個logstash-mysql-es.conf配置文件,其中ip是mysql數據庫對應的ip,es_ip是elasticsearch對應的ip內容如下:

 

input {
  jdbc {
    type1 => "kl_carousel_info"
    jdbc_connection_string2 => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    jdbc_user3 => "root"
    jdbc_password4 => "123456"
    jdbc_driver_library5 => "./config/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class6 => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled7 => true
    jdbc_fetch_size8 => 100
    jdbc_page_size9 => 100000
    jdbc_default_timezone10 =>"Asia/Shanghai"
    statement11 => "select * from kl_carousel_info where createtime >= :sql_last_value order by createtime asc"
    schedule12 => "*/3 * * * *"
    record_last_run13 => true
    use_column_value14 => true
    tracking_column15 => "createtime"
    tracking_column_type16 => "numeric"
    last_run_metadata_path17 => "./id/logstash_kl_carousel_info_last_id"
    clean_run18 => false
    lowercase_column_names19 => false
  }
  jdbc {
    type => "kl_knowledge_article"
    jdbc_connection_string => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "./config/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
 jdbc_fetch_size => 100
    jdbc_page_size => 100000
    jdbc_default_timezone =>"Asia/Shanghai"
    statement => "SELECT 'kl_infomation_article' table_name,a.`title` m_title,a.pinyin,a.`type` m_type,a.`knowledge_type_id`,a.`knowledge_type_name`,a.`tags`,a.status,a.`click_rate`, a.id m_id,c.`organization_id`,c.`organization_name`,b.`id`,b.`masterid`,b.`content`,b.`accessory_id`,b.`accessory_path`,b.`state`,b.`sort`,b.`updatetime`,b.`createtime` FROM `kl_infomation_article` b INNER JOIN kl_infomation_master a on a.id=b.masterid INNER JOIN `kl_master_to_organization` c ON a.`id`=c.`master_id` WHERE b.`state`='1' AND a.`state`='1' AND c.`state`='1' AND b.updatetime >= :sql_last_value order by b.updatetime asc"
    schedule => "*/3 * * * *"
    record_last_run => true
    use_column_value => true
    tracking_column => "updatetime"
    tracking_column_type => "numeric"
    last_run_metadata_path => "./id/logstash_kl_knowledge_article_last_id"
    clean_run => false
    lowercase_column_names => false
  }
  jdbc {
    type => "kl_knowledge_document"
    jdbc_connection_string => "jdbc:mysql://ip:3306/db_name?useUnicode=true&characterEncoding=utf-8&useSSL=false"
    jdbc_user => "root"
    jdbc_password => "123456"
    jdbc_driver_library => "./config/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_paging_enabled => true
 jdbc_fetch_size => 100
    jdbc_page_size => 100000
    jdbc_default_timezone =>"Asia/Shanghai"
    statement => "SELECT 'kl_infomation_document' table_name,a.`title` m_title,a.pinyin,a.`type` m_type,a.`knowledge_type_id`,a.`knowledge_type_name`,a.`tags`,a.status,a.`click_rate`,a.id m_id,c.`organization_id`,c.`organization_name`, b.* from `kl_infomation_document` b INNER JOIN kl_infomation_master a  on a.id=b.master_id INNER JOIN `kl_master_to_organization` c ON a.`id`=c.`master_id`  WHERE b.`state`='1' AND a.`state`='1' AND c.`state`='1' AND b.updatetime >= :sql_last_value order by b.updatetime asc"
    schedule => "*/3 * * * *"
    record_last_run => true
    use_column_value => true
    tracking_column => "updatetime"
    tracking_column_type => "numeric"
    last_run_metadata_path => "./id/logstash_kl_knowledge_document_last_id"
    clean_run => false
    lowercase_column_names => false
  }
}
output {
  if[type]=="kl_carousel_info"20{
   elasticsearch {
  hosts21 => "es_ip:9200"
  index22 => "kl_carousel_info"
  document_type23 => "doc"
  document_id24 => "%{id}"
  manage_template25 => true
  template_overwrite26 => true
  template_name27 => "kl_carousel_info"
  template28 => "/opt/elasticsearch/logstash-6.6.1/template/kl_carousel_info_logstash.json"
   }
  }
  if[type]=="kl_knowledge_article"{
      elasticsearch {
  hosts => "es_ip:9200"
  index => "kl_knowledge"
  document_type => "doc"
  document_id => "%{id}"
  manage_template => true
  template_overwrite => true
  template_name => "kl_knowledge"
  template => "/opt/elasticsearch/logstash-6.6.1/template/kl_knowledge_logstash.json"
   }
  }
  if[type]=="kl_knowledge_document"{
      elasticsearch {
  hosts => "es_ip:9200"
  index => "kl_knowledge"
  document_type => "doc"
  document_id => "%{id}"
  manage_template => true
  template_overwrite => true
  template_name => "kl_knowledge"
  template => "/opt/elasticsearch/logstash-6.6.1/template/kl_knowledge_logstash.json"
   }
  }
  stdout {
      codec => json_lines
  }
}
 
 
logstash配置文件說明:
    【1】——定義的類型名稱,與output中上標【20】對應,說明那個輸入到哪個輸出類型。
    【2】——mysql的jdbc連接串。
    【3】,【4】——mysql的用戶名和密碼。
    【5】,【6】——mysql的驅動包與驅動名名稱配置。
    【7】,【8】,【9】——mysql查詢時的分頁設置。
    【10】——時區設置。
    【11】——同步時的查詢sql語句,如果是數值型一定要順序排序。
    【12】——定時同步頻率設置,以上例子設置的是3分鍾同步一次,默認是1分鍾。
    【13】,【14】,【15】,【16】——【14】使用列的字段做增量判斷,否則默認是當前時間【15】增量同步時用於判斷的字段名稱,【16】字段類類型,以上例子是時間字段的長整型類型,所以是numeric
    【17】——用於記錄上一次同步時標記字段最后同步一行的值的文件路徑,比如同步最后一行createtime值是1559198274577,那么這個值會記錄到文件logstash_kl_carousel_info_last_id中,下次同步時僅僅同步createtime大於這個值的數據。
    【18】——是否每次同步都清理,最好為false,否則每次同步都是全量同步
    【19】——列字段是否都轉為小寫名稱
    【20】——與【1】一一對應
    【21】——同步插入或者更新到es的目標地址
    【22】——es的索引名稱
    【23】——es的文檔類型名稱,6.x版本都可以是一個索引對應多個文檔類型的,但是不建議這么做,后續es版本只支持一個索引對應一個文檔類型。
    【24】——es中文檔的id,例子中使用表記錄的id作為文檔的id
    【25】——這個如果使用自己配置的模板,必須配置為true
    【27】——模板名稱,與定義的模板名稱對應
    【28】—— 使用自定義模板的文件路徑,模板用於創建es的索引,決定了索引創建的方式
5、模板的定義
     模板是在第一次同步數據時用於創建es索引用的,由es來加載模板。
     {  
    "template": "kl_*"1
    "order" : 12
    "settings": {      
        "index.number_of_shards": 5,      
        "number_of_replicas": 1,      
        "index.refresh_interval": "60s",
        "analysis":{
            "analyzer":{
                "pinyin_smart"3:{
                    "type":"custom",
                    "tokenizer":"ik_smart"4,
                    "char_filter": ["html_strip"]5,
                    "filter":[
                        "my_pinyin"6
                    ]
                }
            },
            "filter":{
                "my_pinyin"7:{
                    "type":"pinyin"8,
                    "keep_separate_first_letter" : false,
                    "keep_full_pinyin" : true,
                    "keep_original" : true,
                    "limit_first_letter_length" : 16,
                    "lowercase" : false,
                    "remove_duplicated_term" : true
                }
            }
        }  
    },   
    "mappings":
    {        
        "doc": {
      "dynamic_templates"9: [ {
                    "string_fields" : {
                    "match" : "*"10,
                    "match_mapping_type" : "string"11,
                    "mapping" : {
                        "analyzer":"pinyin_smart"12,
                        "type" : "text"13,
                       "fields": {
                           "raw": {
                               "type":  "keyword",
                               "ignore_above": 256
                            }
                        }
                    }
                    }
               }
       ],
      "properties"14: {               
                "@timestamp":{                   
                    "type": "date"               
                },
    "id": {
       "type": "keyword"
       },
    "m_id": {
       "type": "keyword"
       },
    "masterid": {
       "type": "keyword"
       },
    "master_id": {
       "type": "keyword"
       },
    "accessory_id": {
       "type": "keyword"
       },
    "parent_id": {
       "type": "keyword"
       },
    "knowledge_type_id": {
       "type": "keyword"
       },
    "organization_id": {
       "type": "keyword"
       },
    "table_name": {
       "type": "keyword"
       },
    "m_type": {
       "type": "keyword"
       },
    "m_title"15: {
       "type": "text",
       "norms": true16,
       "analyzer":"ik_smart"17,
       "search_analyzer":"ik_smart"18,
       "fields": {
           "pinyin"19: {
               "type":     "text",
               "analyzer": "pinyin_smart"
            },
           "raw"20: {
               "ignore_above":25621,
               "type":  "keyword"22
          }
       }
       },
    "title": {
       "type": "text",
       "norms": true,
       "analyzer":"ik_smart",
       "search_analyzer":"ik_smart",
       "fields": {
      "pinyin": {
        "type":     "text",
        "analyzer": "pinyin_smart"
      },
      "raw": {
          "ignore_above":256,
       "type":  "keyword"
      }
       }
       },
                "img_title": {
       "type": "text",
       "norms": true,
       "analyzer":"ik_smart",
       "search_analyzer":"ik_smart",
       "fields": {
      "pinyin": {
        "type":     "text",
        "analyzer": "pinyin_smart"
      },
      "raw": {
          "ignore_above":256,
       "type":  "keyword"
      }
       }
       },
       "content": {
       "type": "text",
       "norms": true,
       "analyzer":"ik_smart",
       "search_analyzer":"ik_smart",
       "fields": {
      "pinyin": {
        "type":     "text",
        "analyzer": "pinyin_smart"
      },
      "raw": {
          "ignore_above":256,
       "type":  "keyword"
       
      }
       }
       }        
             }   
          }
       }
}
es模板的配置文件說明:
    【1】——模板名稱,這里"kl_*"通配以“kl_”開頭的模板名稱,與logstash配置的【27】對應。
    【2】——模板的優先順序,如果應用了多個模板則決定合並的優先順序,值越大優先級越高,es自帶的默認模板是0,所以自定義模板最好設置為>0的值。
    【3】——分詞分析器名稱。
    【4】——使用了ik分詞器的ik_smart方式分詞,ik分詞器插件需要在es里面安裝並且重啟才能生效。
    【5】——分詞時跳過HTML標簽,HTML不要去分詞。
    【6】——拼音分詞過濾,支持拼音分詞+ik中文分詞混合,拼音分詞與ik。
    【7】——過濾器定義。
    【8】——拼音分詞器。
    【9】——動態模板,沒有在。
    【10】——匹配字段,*代表匹配任何沒有在【14】里面定義字段。
    【11】——匹配字段的類型,這里是字符串類型。
    【12】——使用自定義的分析器名稱。
    【13】——對應es的字段類型。
   ,【14】——屬性字段索引的方式定義。
    【15】——字段的名稱。
    【16】——是否支持打分,如果要打分,一定要設置為true
    【17】——使用ik分詞器的ik_smart方式分詞。
    【18】——指定es查詢是使用ik分詞器的ik_smart方式分詞。最好與插入是分詞器保持一致,否則搜索結果可能不符合預期。
    【19】——定義拼音分詞嵌套字段,這里自定義名稱為"pinyin"
    【20】——定義keyword的嵌套子字段,方便like查詢,注意:如果需要支持like查詢一定需要定義keyword類型字段,否則可能查詢不到,因為被分詞了,但是keyword類型最大支持32766個字符或者10922個漢字。
    【21】——定義keyword類型在字段長度超過多少個字符時忽略,這里是超過256忽略,這個值定義過大對性能會有顯著影響,查詢速度會變慢。
    【22】——keyword類型。
 
    6、手動刪除es索引
         比如我要刪除索引以"kl_"開頭的索引, 在linux下可以執行命令:curl -XDELETE -u elastic:changeme http://localhost:9200/kl_*
    7、刪除模板
         這里踏過一個大坑,在第一次啟動logstash,執行模板加載,新建es索引后,后面修改了模板,但是索引並沒有生效,原因就是需要刪除es里面默認的模板,估計是模板合並導致的,
        刪除模板語句命令:curl -XDELETE 'http://127.0.0.1:9200/_template/*'
 8、啟動(開始)同步(Linux)
   cd  {lostash_home},
   然后執行命令:nohup bin/logstash -f config/logstash-mysql-es.conf > logs/logstash.out 2>&1 &
 
 
 
        
 
      
      
      
                           

     


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM