es根據mysql實時更新數據到組件中


由於logstash更新不是實時而是采用了輪訓的方式,去觸法更新,對於實時性要求比較高的場景顯然不能滿足需求,於是衍生出用canal方案解決實時更新入庫es的方案

方法

1.安裝canal(阿里的開源組件,可以從https://github.com/alibaba/canal/releases找到比較新的版本下載),下載canal.deployer-1.1.4.tar.gz   canal.adapter-1.1.4.tar.gz兩個包

2.由於項目用了7.6.1的es,而阿里canal的比較新版本只支持到了6.5左右,這里解決方案可以,下載阿里canal的source包,D:\camalSource\canal-canal-1.1.4\canal-canal-1.1.4\client-adapter\到這個目錄下引入idea工具進行對pom進行調整jar依賴的版本(不同版本代碼方法上會有點區別需要進行稍微調整到無錯誤),進行編譯打包mvn package -DskipTests,一旦成功就會得到目錄

 

 

 這個目錄就是支持7.6.1的es的adapter。打包到服務器。

將相同版本的canal.deployer-1.1.4.tar.gz   canal.adapter-1.1.4.tar.gz(支持新的版本es)上傳到服務器解壓進行配置,以下是配置過程

1.mysql開啟binlog模式,canal需要用到(具體為什么可以看下架構設計圖)

開啟binlog
server-id = 1
binlog_format = ROW
log_bin = mysql_bin
mysql客戶端連接后
show variables like ‘log_bin’ 查看是否真正開啟binlog

2.創建一個新的cannl給cannal-adapter使用

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' IDENTIFIED BY 'canal'

FLUSH PRIVILEGES;

3.解壓canal.deployer用於部署canal
conf/canal.properties是主配置文件,如其中canal.port用以指定服務端監聽的端口
conf/example/instance.properties是實例的配置文件,主要配置項
修改instance.properties
canal.instance.mysql.slaveId=2 不能和serverid沖突
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

啟動bin下的startup.sh 到此canal.deployer啟動成功(可以netstat看看11111端口是否成功被監聽和log是否正常,觀察logs/canal/canal.log以及logs/example/example.log日志

到此canal.deployer啟動完成,接下來啟動canal.adapter

1.調整/opt/canal7/conf/配置文件application.yml(記得縮進和換行要使用空格,變量前面要空格,cannal對這些的讀取是非常敏感的,用的時候這個調了不少時間)

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 127.0.0.1:11111
#  zookeeperHosts: slave1:2181
#  mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.1.20:3306/dianping?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
      - name: es
        hosts: slave1:9300 # 127.0.0.1:9200 for rest mode
        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: dianping-app

  

接着進入es的文件夾中進行配置(新建自己的文件比如 shop.yml):

dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
         _index: shop
         _type: _doc
         _upsert: true
         _id: id
         sql: "select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c.id=a.seller_id"
         commitBatch: 3000

  啟動canal.adapter bin下的startup.sh 到/opt/canal7/logs/adapter/adapter.log觀察log是否正常完成,如果看到=====> Subscribe destination: example succeed <=============就是成功了

到此只要你修改了mysql的值,cannal就會馬上更新到es中,會自動根據更新條目的id和字段同步到es中(但是索引模型一個字段多處使用顯然會有問題,這時候就需要自己用java代碼寫cannl入庫es的過程,這個后面再記錄了)

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM