數據同步解決方案-canal與rabbitmq


 

學習目標

  • 能夠完成canal環境的搭建與數據監控微服務的開發
  • 能夠完成首頁廣告緩存更新的功能,掌握OkHttpClient的基本使用方法
  • 能夠完成商品上架索引庫導入數據功能,能夠畫出流程圖和說出實現思路
  • 能夠完成商品下架索引庫刪除數據功能,能夠畫出流程圖和說出實現思路

該工程使用lua+nginx+rabbitmq+redis等技術的第一個主要目的是實現輪播圖的讀取,具體圖解:
在這里插入圖片描述
其中nginx的作用包括了從數據庫中查詢數據,也包括了將數據庫中的數據更新到redis緩存當中去。

1. canal

1.1 canal簡介

canal可以用來監控數據庫數據的變化,從而獲得新增數據,或者修改的數據。

canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。

阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。

在這里插入圖片描述

原理相對比較簡單:

  1. canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  3. canal解析binary log對象(原始為byte流)

1.2 環境部署

1.2.1 mysql開啟binlog模式

(1)查看當前mysql是否開啟binlog模式。

SHOW VARIABLES LIKE '%log_bin%' 
  • 1

如果log_bin的值為OFF是未開啟,為ON是已開啟。

(2)修改/etc/my.cnf 需要開啟binlog模式。

[mysqld] log-bin=mysql-bin binlog-format=ROW server_id=1

修改完成之后,重啟mysqld的服務。

(3) 進入mysql

mysql -h localhost -u root -p

(4)創建賬號 用於測試使用

使用root賬號創建用戶並授予權限

create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;

1.2.2 canal服務端安裝配置

(1)下載地址canal

https://github.com/alibaba/canal/releases/tag/canal-1.0.24

在這里插入圖片描述

(2)下載之后 上傳到linux系統中,解壓縮到指定的目錄/usr/local/canal

解壓縮之后的目錄結構如下:

在這里插入圖片描述
(3)修改 exmaple下的實例配置

vi conf/example/instance.properties

在這里插入圖片描述

修改如圖所示的幾個參數。

一定要注釋掉下面這個參數,這樣就會掃描全庫

#canal.instance.defaultDatabaseName =

(3)啟動服務:

[root@localhost canal]# ./bin/startup.sh

(4)查看日志:

cat /usr/local/canal/logs/canal/canal.log

在這里插入圖片描述

這樣就表示啟動成功了。
在暢購項目實行的過程中使用的是docker容器化部署了canal。
在這里插入圖片描述

1.2.3 canal常見錯誤處理

錯誤信息如下:

2019-06-17 19:35:20.918 [New I/O server worker #1-2] ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x7f2e9be3, /192.168.200.56:52225 => /192.168.200.128:11111], exception=java.io.IOException: Connection reset by peer

解決辦法:

進入mysql中執行下面語句查看binlog所在位置

mysql> show master status; 顯示如下: +------------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +------------------+----------+--------------+------------------+-------------------+ | mysql-bin.000001 | 120 | | | | +------------------+----------+--------------+------------------+-------------------+ 1 row in set (0.00 sec)

如果file中binlog文件不為 mysql-bin.000001 可以重置mysql

mysql> reset master;

查看canal配置文件

vim usr/local/canal/conf/example/meta.dat

找到對應的binlog信息更改一致即可, 或者刪除這個meta.dat也可以.

"journalName":"mysql-bin.000001","position":43581207,"

1.3 數據監控微服務

當用戶執行數據庫的操作的時候,binlog 日志會被canal捕獲到,並解析出數據。我們就可以將解析出來的數據進行相應的邏輯處理。

我們這里使用的一個開源的項目,它實現了springboot與canal的集成。比原生的canal更加優雅。

https://github.com/chenqian56131/spring-boot-starter-canal

使用前需要將starter-canal安裝到本地倉庫。

我們可以參照它提供的canal-test,進行代碼實現。

(1)創建工程模塊changgou_canal,pom引入依賴

<dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency>

(2)創建包com.changgou.canal ,包下創建啟動類

@SpringBootApplication @EnableCanalClient public class CanalApplication { public static void main(String[] args) { SpringApplication.run(CanalApplication.class, args); } }

(3)添加配置文件application.properties

canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000

(4)創建com.changgou.canal.listener包,包下創建類

@CanalEventListener public class BusinessListener { @ListenPoint(schema = "changgou_business", table = {"tb_ad"}) public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.err.println("廣告數據發生變化"); rowData.getBeforeColumnsList().forEach((c) -> System.err.println("更改前數據: " + c.getName() + " :: " + c.getValue())); rowData.getAfterColumnsList().forEach((c) -> System.err.println("更改后數據: " + c.getName() + " :: " + c.getValue())); } }

測試:啟動數據監控微服務,修改changgou_business的tb_ad表,觀察控制台輸出。

2. 首頁廣告緩存更新

2.1 需求分析

當tb_ad(廣告)表的數據發生變化時,更新redis中的廣告數據。

2.2 實現思路

(1)修改數據監控微服務,監控tb_ad表,當發生增刪改操作時,提取position值(廣告位置key),發送到rabbitmq

(2)從rabbitmq中提取消息,通過OkHttpClient調用ad_update來實現對廣告緩存數據的更新。

在這里插入圖片描述

2.3 代碼實現

2.3.1 發送消息到mq

(1)在rabbitmq管理后台創建隊列 ad_update_queue ,用於接收廣告更新通知

(2)引入rabbitmq起步依賴

<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency>

(3)配置文件application.properties 添加內容

spring.rabbitmq.host=192.168.200.128

(4)修改BusinessListener類

@CanalEventListener public class BusinessListener { @Autowired private RabbitTemplate rabbitTemplate; @ListenPoint(schema = "changgou_business", table = {"tb_ad"}) public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.err.println("廣告數據發生變化"); //修改前數據 for(CanalEntry.Column column: rowData.getBeforeColumnsList()) { if(column.getName().equals("position")){ System.out.println("發送消息到mq ad_update_queue:"+column.getValue()); rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //發送消息到mq break; } } //修改后數據 for(CanalEntry.Column column: rowData.getAfterColumnsList()) { if(column.getName().equals("position")){ System.out.println("發送消息到mq ad_update_queue:"+column.getValue()); rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //發送消息到mq break; } } } }

(5)測試,運行數據監控微服務canal,新增、修改或刪除tb_ad表數據,修改后觀察控制台輸出和rabbitmq管理界面中ad_update_queue是否接收到消息

在這里插入圖片描述

2.3.2 從mq中提取消息執行更新

(1)changgou_service_business工程pom.xml引入依賴

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>3.9.0</version> </dependency>

(2)在spring節點下添加rabbitmq配置

spring: rabbitmq: host: 192.168.200.128

(3)com.changgou.business包下創建listener包,包下創建類

@Component @RabbitListener(queues = "ad_update_queue") public class AdListener { /** * 獲取更新廣告通知 * @param message */ @RabbitHandler public void updateAd(String message){ System.out.println("接收到消息:"+message); String url = "http://192.168.200.128/ad_update?position="+message; OkHttpClient okHttpClient = new OkHttpClient(); final Request request = new Request.Builder() .url(url) .build(); Call call = okHttpClient.newCall(request); call.enqueue(new Callback() { @Override public void onFailure(Call call, IOException e) { e.printStackTrace();//顯示錯誤信息 } @Override public void onResponse(Call call, Response response) throws IOException { System.out.println("調用成功"+response.message()); } }); } }

(4)測試,啟動eureka和business微服務,觀察控制台輸出和數據同步效果。

3. 商品上架索引庫導入數據

3.1 需求分析

商品上架將商品的sku列表導入或更新索引庫。

3.2 實現思路

(1)當調用商品微服務中商品上架方法時候, 則商品微服務連接mysql數據庫根據SPU的主鍵ID更改SPU表中is_marketable狀態字段的值為1(已上架).

(2)在rabbitmq管理后台創建商品上架交換器(Exchanges)。使用分列模式(fanout)的交換器是考慮商品上架會有很多種邏輯需要處理,導入索引庫只是其中一項,另外還有商品詳細頁靜態化等操作。這樣我們可以創建導入索引庫的隊列和商品詳細頁靜態化隊列並與商品上架交換器進行綁定。

(3)商品微服務將需要上架的SPU的主鍵ID發送給rabbitMq的商品上架交換器, 交換器則將數據根據路由規則發給對應的索引庫上架隊列和靜態頁上架隊列.

(4) 搜索微服務從rabbitmq的索引庫上架隊列中提取spu的id,通過feign調用商品微服務得到sku的列表,並且通過調用elasticsearch的高級restAPI 將sku列表導入到索引庫。

(5) 靜態頁微服務從rabbitmq的靜態頁上架隊列中提取SPU的id, 通過feign調用商品微服務得到sku, 分類, 品牌, spu等各種數據信息, 然后根據模板生成靜態化頁面.
在這里插入圖片描述

3.3 代碼實現

3.3.1 配置rabbitMQ

(1)在rabbitmq后台創建交換器goods_up_exchange(類型為fanout),創建隊列search_add_queue綁定交換器goods_up_exchange

a. 添加交換器

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-qJX0abSk-1584274503577)(images/6-1.png)]

b. 添加搜索上架隊列

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-p2ak2lFW-1584274503577)(images/6-2.png)]

c. 添加靜態頁上架隊列

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ig0H5I3B-1584274503577)(images/6-111.png)]

d. 交換器中綁定搜索上架隊列和靜態頁上架隊列

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5BjYB56q-1584274503577)(images/6-3.png)]

3.3.2 發送上架商品id到Mq

(1) changgou_service_goods工程引入依賴:

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

(2) changgou_service_goods工程的application.yml中添加rabbitmq配置

spring: rabbitmq: host: 192.168.200.128

(3) changgou_service_goods工程的SpuServiceImpl中更改方法

@Autowired private RabbitMessagingTemplate rabbitTemplate; @Override public void put(String id) { /** * 1. 更改數據庫中的上架狀態 */ Spu spu = spuMapper.selectByPrimaryKey(id); if(!spu.getStatus().equals("1")){ throw new RuntimeException("未通過審核的商品不能上架!"); } spu.setIsMarketable("1");//上架狀態 spuMapper.updateByPrimaryKeySelective(spu); /** * 2. 將數據發送到rabbitmq中 */ rabbitTemplate.convertAndSend("goods_up_exchange","",id); }

3.3.3 索引庫環境准備

(1)elasticsearch 6.5.2安裝

(2)ik中文分詞器安裝

(3)kibana-6.5.2 安裝

3.3.4 搜索微服務搭建

(1)創建changgou_service_search模塊,pom.xml引入依賴

<dependency> <groupId>com.changgou</groupId> <artifactId>changgou_common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_service_goods_api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_service_search_api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

(2)changgou_service_search的application.yml

server: port: 9009 spring: application: name: search rabbitmq: host: 192.168.200.128 redis: host: 192.168.200.128 main: allow-bean-definition-overriding: true #當遇到同樣名字的時候,是否允許覆蓋注冊 data: elasticsearch: cluster-name: elasticsearch cluster-nodes: 192.168.200.128:9300 eureka: client: service-url: defaultZone: http://127.0.0.1:6868/eureka instance: prefer-ip-address: true feign: hystrix: enabled: true client: config: default: #配置全局的feign的調用超時時間 如果 有指定的服務配置 默認的配置不會生效 connectTimeout: 600000 # 指定的是 消費者 連接服務提供者的連接超時時間 是否能連接 單位是毫秒 readTimeout: 600000 # 指定的是調用服務提供者的 服務 的超時時間() 單位是毫秒 #hystrix 配置 hystrix: command: default: execution: timeout: #如果enabled設置為false,則請求超時交給ribbon控制 enabled: false isolation: strategy: SEMAPHORE

elasticSearch的配置是我們自己定義的,后邊的連接工廠類會用到

(3)創建com.changgou包,包下創建SearchApplication

@SpringBootApplication @EnableEurekaClient @EnableDiscoveryClient @EnableFeignClients(basePackages = "com.itheima.feign") public class SearchApplication { public static void main(String[] args) { SpringApplication.run(SearchApplication.class); } }

 

3.3.5 創建索引庫結構

  1. 在changgou_service_api項目下創建changgou_service_search_api項目
  2. pom.xml文件引入依賴
<dependency> <groupId>com.changgou</groupId> <artifactId>changgou_common</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
  1. 創建包com.changgou.pojo
  2. 創建和ES索引庫映射實體類SkuInfo.java
@Document(indexName = "skuinfo", type = "docs") public class SkuInfo implements Serializable { //商品id,同時也是商品編號 @Id @Field(index = true, store = true, type = FieldType.Keyword) private Long id; //SKU名稱 @Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart") private String name; //商品價格,單位為:元 @Field(index = true, store = true, type = FieldType.Double) private Long price; //庫存數量 @Field(index = true, store = true, type = FieldType.Integer) private Integer num; //商品圖片 @Field(index = false, store = true, type = FieldType.Text) private String image; //商品狀態,1-正常,2-下架,3-刪除 @Field(index = true, store = true, type = FieldType.Keyword) private String status; //創建時間 private Date createTime; //更新時間 private Date updateTime; //是否默認 @Field(index = true, store = true, type = FieldType.Keyword) private String isDefault; //SPUID @Field(index = true, store = true, type = FieldType.Long) private Long spuId; //類目ID @Field(index = true, store = true, type = FieldType.Long) private Long categoryId; //類目名稱 @Field(index = true, store = true,type = FieldType.Keyword) private String categoryName; //品牌名稱 @Field(index = true, store = true,type = FieldType.Keyword) private String brandName; //規格 private String spec; //規格參數 private Map<String, Object> specMap; ......get和set方法......

3.3.6 創建ES操作的Dao接口

public interface SearchMapper extends ElasticsearchRepository<SkuInfo,Long> { } 
  • 1
  • 2
  • 3

3.3.7 搜索微服務批量導入數據邏輯

(1) changgou_service_goods_api創建com.changgou.feign 包,包下創建接口

@FeignClient(name="goods") @RequestMapping("/sku") public interface SkuFeign { /*** * 多條件搜索品牌數據 * @param searchMap * @return */ @GetMapping(value = "/search" ) public Result findList(@RequestParam Map searchMap); }

(2)changgou_service_search項目下創建 com.changgou.search.service包包下創建接口EsManagerService

public interface EsManagerService { /** * 創建索引庫結構 */ public void createIndexAndMapping(); /** * 根據spuid導入數據到ES索引庫 * @param spuId 商品id */ public void importDataToESBySpuId(String spuId); /** * 導入全部數據到ES索引庫 */ public void importAll(); }

(2)創建com.changgou.search.service包,包下創建服務實現類

@Service public class EsManagerServiceImpl implements EsManagerService { @Autowired private SearchMapper searchMapper; @Autowired private SkuFeign skuFeign; @Autowired private ElasticsearchTemplate esTemplate; /** * 創建索引庫結構 */ @Override public void createIndexAndMapping() { //創建索引 esTemplate.createIndex(SkuInfo.class); //創建映射 esTemplate.putMapping(SkuInfo.class); } /** * 根據spuid導入數據到ES索引庫 * @param spuId 商品id */ @Override public void importDataToESBySpuId(String spuId) { List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId); List<SkuInfo> skuInfos = JSON.parseArray(JSON.toJSONString(skuList), SkuInfo.class); for (SkuInfo skuInfo : skuInfos) { skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class)); } searchMapper.saveAll(skuInfos); } /** * 導入全部數據到ES索引庫 */ @Override public void importAll() { Map paramMap = new HashMap(); paramMap.put("status", "1"); Result result = skuFeign.findList(paramMap); List<SkuInfo> skuInfos = JSON.parseArray(JSON.toJSONString(result.getData()), SkuInfo.class); for (SkuInfo skuInfo : skuInfos) { skuInfo.setPrice(skuInfo.getPrice()); skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class)); } searchMapper.saveAll(skuInfos); } }

3.3.8 根據spuId導入索引庫數據

(1)changgou_service_search項目下, 創建com.changgou.controller包, 包下創建SearchController

@RestController @RequestMapping("/sku_search") public class SearchController { @Autowired private EsManagerService esManagerService; @Autowired private SearchService searchService; @GetMapping("/createIndexAndMapping") public Result createIndexAndMapping() { esManagerService.createIndexAndMapping(); return new Result(true, StatusCode.OK, "創建成功"); } /** * 導入所有審核通過的庫存數據到ES索引庫 * @return */ @GetMapping("/importAll") public Result importAllDataToES() { esManagerService.importAll(); return new Result(true, StatusCode.OK, "導入數據成功!"); } /** * 全文檢索 * @return */ @GetMapping public Map search(@RequestParam Map<String, String> paramMap) throws Exception { Map resultMap = searchService.search(paramMap); return resultMap; } }

3.3.9 接收mq消息執行導入

(1)changgou_service_search工程的pom.xml文件中引入依賴包

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

(2)changgou_service_search工程創建com.changgou.listener包,包下創建類

@Component @RabbitListener(queues = "search_add_queue") public class SpuAddListener { @Autowired private EsManagerService esManagerService; @RabbitHandler public void addDataToES(String spuId) { System.out.println("===接收到需要商品上架的spuId為======" + spuId); esManagerService.importDataToESBySpuId(spuId); } }

測試:

注意: 測試前將ES中現有的SkuInfo索引庫刪除干凈

(1)啟動環境 eureka 、elasticsearch 、canal服務端、canal數據監控微服務、rabbitmq

(2)啟動商品微服務、搜索微服務

(3)先訪問 http://localhost:9009/sku_search/createIndexAndMapping 創建索引庫結構

( 4 ) 修改tb_spu某記錄的is_marketable值為1,觀察控制台輸出,啟動kibana查詢記錄是否導入成功

4. 商品下架索引庫刪除數據

4.1 需求分析

商品下架后將商品從索引庫中移除。

4.2 實現思路

與商品上架的實現思路非常類似。

(1)當管理員操作商品微服務, 調用下架操作時, 首先更新mysql的SPU表中的is_marketable狀態為0(下架)。

(2)在rabbitmq管理后台創建商品下架交換器(Exchanges)。使用分列模式(Fanout)的交換器是考慮商品下架會有很多種邏輯需要處理,索引庫刪除數據只是其中一項,另外還有刪除商品詳細頁等操作。

(3)搜索微服務從rabbitmq的的隊列中提取spu的id,通過調用elasticsearch的高級restAPI 將相關的sku列表從索引庫刪除。

https://blog.csdn.net/qq_36079912/article/details/104885048

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM