由於logstash更新不是實時而是采用了輪訓的方式,去觸法更新,對於實時性要求比較高的場景顯然不能滿足需求,於是衍生出用canal方案解決實時更新入庫es的方案
方法
1.安裝canal(阿里的開源組件,可以從https://github.com/alibaba/canal/releases找到比較新的版本下載),下載canal.deployer-1.1.4.tar.gz canal.adapter-1.1.4.tar.gz兩個包
2.由於項目用了7.6.1的es,而阿里canal的比較新版本只支持到了6.5左右,這里解決方案可以,下載阿里canal的source包,D:\camalSource\canal-canal-1.1.4\canal-canal-1.1.4\client-adapter\到這個目錄下引入idea工具進行對pom進行調整jar依賴的版本(不同版本代碼方法上會有點區別需要進行稍微調整到無錯誤),進行編譯打包mvn package -DskipTests,一旦成功就會得到目錄
這個目錄就是支持7.6.1的es的adapter。打包到服務器。
將相同版本的canal.deployer-1.1.4.tar.gz canal.adapter-1.1.4.tar.gz(支持新的版本es)上傳到服務器解壓進行配置,以下是配置過程
1.mysql開啟binlog模式,canal需要用到(具體為什么可以看下架構設計圖)
開啟binlog
server-id = 1
binlog_format = ROW
log_bin = mysql_bin
mysql客戶端連接后
show variables like ‘log_bin’ 查看是否真正開啟binlog
2.創建一個新的cannl給cannal-adapter使用
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' IDENTIFIED BY 'canal'
FLUSH PRIVILEGES;
3.解壓canal.deployer用於部署canal
conf/canal.properties是主配置文件,如其中canal.port用以指定服務端監聽的端口
conf/example/instance.properties是實例的配置文件,主要配置項
修改instance.properties
canal.instance.mysql.slaveId=2 不能和serverid沖突
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
啟動bin下的startup.sh 到此canal.deployer啟動成功(可以netstat看看11111端口是否成功被監聽和log是否正常,觀察logs/canal/canal.log以及logs/example/example.log日志 )
到此canal.deployer啟動完成,接下來啟動canal.adapter
1.調整/opt/canal7/conf/配置文件application.yml(記得縮進和換行要使用空格,變量前面要空格,cannal對這些的讀取是非常敏感的,用的時候這個調了不少時間)
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 # mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://192.168.1.20:3306/dianping?useUnicode=true username: canal password: canal canalAdapters: - instance: example # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: logger # - name: rdb # key: mysql1 # properties: # jdbc.driverClassName: com.mysql.jdbc.Driver # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true # jdbc.username: root # jdbc.password: 121212 # - name: rdb # key: oracle1 # properties: # jdbc.driverClassName: oracle.jdbc.OracleDriver # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE # jdbc.username: mytest # jdbc.password: m121212 # - name: rdb # key: postgres1 # properties: # jdbc.driverClassName: org.postgresql.Driver # jdbc.url: jdbc:postgresql://localhost:5432/postgres # jdbc.username: postgres # jdbc.password: 121212 # threads: 1 # commitSize: 3000 # - name: hbase # properties: # hbase.zookeeper.quorum: 127.0.0.1 # hbase.zookeeper.property.clientPort: 2181 # zookeeper.znode.parent: /hbase - name: es hosts: slave1:9300 # 127.0.0.1:9200 for rest mode properties: # mode: transport # or rest # # security.auth: test:123456 # only used for rest mode cluster.name: dianping-app
接着進入es的文件夾中進行配置(新建自己的文件比如 shop.yml):
dataSourceKey: defaultDS destination: example groupId: esMapping: _index: shop _type: _doc _upsert: true _id: id sql: "select a.id,a.name,a.tags,concat(a.latitude,',',a.longitude) as location,a.remark_score,a.price_per_man,a.category_id,b.name as category_name,a.seller_id,c.remark_score as seller_remark_score,c.disabled_flag as seller_disabled_flag from shop a inner join category b on a.category_id = b.id inner join seller c on c.id=a.seller_id" commitBatch: 3000
啟動canal.adapter bin下的startup.sh 到/opt/canal7/logs/adapter/adapter.log觀察log是否正常完成,如果看到=====> Subscribe destination: example succeed <=============就是成功了
到此只要你修改了mysql的值,cannal就會馬上更新到es中,會自動根據更新條目的id和字段同步到es中(但是索引模型一個字段多處使用顯然會有問題,這時候就需要自己用java代碼寫cannl入庫es的過程,這個后面再記錄了)