Logstash導入數據


開發中, 更多的是從現有數據庫中導入數據
Django中 python manage.py rebuild_index 就是在導入數據

  • 方式
    • 自己寫一個程序, 按照之前的語法從數據庫中讀取數據並添加到es中
    • 也可以使用Logstash工具導入數據
  • 安裝
  • 從mysql中導入數據
    • 創建配置文件

Logstach安裝

sudo rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch

在 /etc/yum.repos.d/ 中創建logstash.repo文件

[logstash-6.x]
name=Elastic repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md

執行

sudo yum install logstash
cd /usr/share/logstash/bin/
sudo ./logstash-plugin install logstash-input-jdbc
sudo ./logstash-plugin install logstash-output-elasticsearch
scp mysql-connector-java-8.0.13.tar.gz python@10.211.55.7:~/
tar -zxvf mysql-connector-java-8.0.13.tar.gz

創建配置文件logstash_mysql.conf

input{
     jdbc {  # java數據庫訪問的API接口
         jdbc_driver_library => "/home/python/mysql-connector-java-8.0.13/mysql-connector-java-8.0.13.jar"
         jdbc_driver_class => "com.mysql.jdbc.Driver"
         jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/toutiao?tinyInt1isBit=false"
         jdbc_user => "root"
         jdbc_password => "mysql"
         jdbc_paging_enabled => "true"  # 數據分頁, 一共14W數據
         jdbc_page_size => "1000"  # 每頁1000條數據
         jdbc_default_timezone =>"Asia/Shanghai"
         statement => "select a.article_id as article_id,a.user_id as user_id, a.title as title, a.status as status, a.create_time as create_time,  b.content as content from news_article_basic as a inner join news_article_content as b on a.article_id=b.article_id"  # 聯表查詢, 盡量起別名,否則ES的字段名稱會變為a.xx, 這樣和mysql的字段名稱會出現差異
     }
}

output{
      elasticsearch {
         hosts => "127.0.0.1:9200"
         index => "articles"
         document_id => "%{article_id}"  # 讓文檔id記錄文章id, 方便進行數據庫查詢
         document_type => "article"
      }
      stdout {  # 導入過程中以json形式顯式的輸出導入的內容
         codec => json_lines  
     }
}
  • 增量更新的配置
input{
  record_last_run => "true"  	# 記錄最后一次運行時的數據點, 默認為最后一次更新的時間
	use_column_value => "true"  # 不再記錄最后一次更新的時間, 而是記錄最后一次更新時, 數據庫某個字段的值(字段的值要求是遞增的)
  tracking_column => "article_id"  # 設置記錄的字段
last_run_metadata_path => "/xx/data"    # 數據點的存儲位置
  clean_run => "false"    # 從存儲位置開始繼續讀取, 如果設置為true, 則清除數據點, 從頭開始讀取
}

解壓縮java類庫

tar -zxvf mysqlxxx.tar.gz

執行導入命令

sudo /usr/share/logstash/bin/logstash -f ./logstash_mysql.conf


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM