第1章 電商實時數倉介紹
1.1 普通實時計算與實時數倉比較
普通的實時計算優先考慮時效性,所以從數據源采集經過實時計算直接得到結果。如此做時效性更好,但是弊端是由於計算過程中的中間結果沒有沉淀下來,所以當面對大量實時需求的時候,計算的復用性較差,開發成本隨着需求增加直線上升。
實時數倉基於一定的數據倉庫理念,對數據處理流程進行規划、分層,目的是提高數據和計算的復用性。
1.2 實時電商數倉項目分層
1)ODS層
原始數據: 日志數據和業務數據
2)DWD層
依據數據對象為單位進行分流,比如訂單、頁面訪問等等
3)DIM層(Hbase+phoenix)
維度數據
4)DWM層
對於部分數據對象進行進一步加工,比如獨立訪問、跳出行為,也可以和維度進行關聯,形成寬表,仍然是明細數據。
5)DWS層
根據某個主題將多個事實數據輕度聚合,形成主題寬表。數據存儲到clickhouse
6)ADS層
把Clickhouse中的數據根據可視化需要進行篩選聚合
第2章 實時需求概述
2.1 離線計算與實時計算的比較
1)離線需求
就是在計算開始前已知所有輸入數據,輸入數據不會產生變化,一般計算量級較大,計算時間也較長。例如今天早上一點,把昨天累積的日志,計算出所需結果。最經典的就是Hadoop的MapReduce方式;
一般是根據前一日的數據生成報表,雖然統計指標、報表繁多,但是對時效性不敏感。從技術操作的角度,這部分屬於批處理的操作。即根據確定范圍的數據一次性計算
2)實時需求
輸入數據是可以以序列化的方式一個個輸入並進行處理的,也就是說在開始的時候並不需要知道所有的輸入數據。與離線計算相比,運行時間短,計算量級相對較小。強調計算過程的時間要短,即所查當下給出結果。
主要側重於對當日數據的實時監控,通常業務邏輯相對離線需求簡單一下,統計指標也少一些,但是更注重數據的時效性,以及用戶的交互性。從技術操作的角度,這部分屬於流處理的操作。根據數據源源不斷地到達進行實時的運算。
2.2 通常實時指標
2.2.1 日常統計報表或分析圖中需要包含當日部分
對於日常企業、網站的運營管理如果僅僅依靠離線計算,數據的時效性往往無法滿足。通過實時計算獲得當日、分鍾級、秒級甚至亞秒的數據更加便於企業對業務進行快速反應與調整。所以實時計算結果往往要與離線數據進行合並或者對比展示在BI或者統計平台中。
2.2.2 實時數據大屏監控
數據大屏,相對於BI工具或者數據分析平台是更加直觀的數據可視化方式。尤其是一些大促活動,已經成為必備的一種營銷手段。另外還有一些特殊行業,比如交通、電信的行業,那么大屏監控幾乎是必備的監控手段。
2.2.3 數據預警或提示
經過大數據實時計算得到的一些風控預警、營銷信息提示,能夠快速讓風控或營銷部分得到信息,以便采取各種應對。比如,用戶在電商、金融平台中正在進行一些非法或欺詐類操作,那么大數據實時計算可以快速的將情況篩選出來發送風控部門進行處理,甚至自動屏蔽。 或者檢測到用戶的行為對於某些商品具有較強的購買意願,那么可以把這些“商機”推送給客服部門,讓客服進行主動的跟進。
2.2.4 實時推薦系統
實時推薦就是根據用戶的自身屬性結合當前的訪問行為,經過實時的推薦算法計算,從而將用戶可能喜歡的商品、新聞、視頻等推送給用戶。這種系統一般是由一個用戶畫像批處理加一個用戶行為分析的流處理組合而成。
第3章 計算架構
3.1 離線架構
3.2 實時架構
第4章 日志數據采集
整個模擬數據的生產過程與離線數倉中模擬數據的生產過程基本一致, 個別地方需要修改,這里提供了一個模擬生成數據的jar包,可以將日志發送給某一個指定的端口,需要大數據程序員了解如何從指定端口接收數據並數據進行處理的流程。
4.1 模擬數據需要用到的文件列表
cd /opt/software/mock/mock_log
4.2 修改application.yml文件
根據自己實際情況修改配置文件application.yml
說明:
1)mock.data是模擬的日志數據的日期
2)mock.type 如果模擬實時數據, 則該值必須設置為http
3)mock.url是日志服務器的地址, 表示把模擬出來的數據發送到這個地址. 寫這個地址的時候一定要明白你接收日志的服務器的地址是哪里!
4.3 生產模擬數據
java -jar gmall2020-mock-log-2020-12-18.jar
注意: 必須等到接收日志的服務器部署完成之后這里才可以正常工作
第5章 創建父工程
在整個實時數倉項目中, 會有比較多的模塊需要管理, 我們統一創建一個父工程來管理不同的模塊
1)新建父工程命名FlinkParent
2)新建SpringBoot子工程
第6章 搭建單機版數據采集服務器
6.1 了解springboot
Spring Boot 是由 Pivotal 團隊提供的全新框架,其設計目的是用來簡化新 Spring 應用的初始搭建以及開發過程。該框架使用了特定的方式來進行配置,從而使開發人員不再需要定義樣板化的配置。
使用 Spring boot 好處:
1)內嵌 Tomcat, 不再需要外部的 Tomcat
2)不再需要那些千篇一律,繁瑣的 xml 文件。
3)更方便的和各個第三方工具( mysql,redis,elasticsearch,dubbo 等等整合),而只要維護一個配置文件即可。
6.1.1 springboot與SSM的關系
springboot 整合了springmvc, spring 等核心功能。也就是說本質上實現功能的還是原有的spring ,springmvc 的包,但是 springboot單獨包裝了一層,這樣用戶就不必直接對 springmvc, spring 等,在 xml 中配置
6.1.2 springboot如何配置
springboot實際上就是把以前需要用戶手工配置的部分,全部作為默認項。除非用戶需要額外更改不然不用配置。這就是所謂的:“約定大於配置”
如果需要特別配置的時候,去修改application.properties
6.2 在idea創建日志采集服務器
步驟1: 在project中創建springboot子模塊
1)父工程的pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.yuange.flink</groupId> <artifactId>FlinkParent</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>flink-logger</module> </modules> </project>
2)flink-logger 子工程
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <artifactId>flink-logger</artifactId> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
步驟2: 創建controller,在controller中定義方法, 用來處理客戶端的http請求,如果不做額外配置, controller需要與主程序GmallLoggerApplication同包, 或它所在包的子包下
package com.yuange.flink.flinklogger.controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @作者:袁哥 * @時間:2021/7/27 19:54 */ @RestController public class LoggerController { @GetMapping("/applog") public String doLog(@RequestParam("param")String log){ System.out.println(log); return "success"; } }
步驟3: 修改日志服務器端口為8081,在配置文件 resources/application.properties 內添加如下代碼:
server.port=8081
步驟4: 啟動日志服務器, 發送模擬數據測試
1)啟動SpringBoot項目
2)在linux啟動模擬數據
(1)修改application.yml : mock.url: http://window地址:8081/applog
注意: window地址必須是在linux虛擬機可以訪問到ip地址.
(2)啟動程序,生成日志
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar
3)觀察idea是否收到數據,如果沒有收到數據, 請確認地址是否正確
步驟5: 把日志落盤
在本實時項目中,落盤的日志后面並沒有使用,主要考慮在企業應用中,采集到數據不僅僅應用到實時項目,也可以其他的一些需求也會可能會用到,比如離線需求。另外也可以起到數據備份的作用。落盤工具使用logback,類似於log4j
1)添加配置文件resources/logback.xml
<?xml version="1.0" encoding="UTF-8"?> <configuration> <!--日志的根目錄, 根據需要更改成日志要保存的目錄--> <property name="LOG_HOME" value="D:\Root\workSpace\IntelliJ IDEA 2019.2.4\workSpace\FlinkParent\output"/> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/app.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!-- 將某一個包下日志單獨打印日志 需要更換我們的 Controller 類 --> <logger name="com.yuange.flink.flinklogger.controller.LoggerController" level="INFO" additivity="true"> <appender-ref ref="rollingFile"/> <appender-ref ref="console"/> </logger> <root level="error" additivity="true"> <appender-ref ref="console"/> </root> </configuration>
2)在controller類上添加注解 @Slf4j
package com.yuange.flink.flinklogger.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @作者:袁哥 * @時間:2021/7/27 19:54 */ @RestController @Slf4j public class LoggerController { @GetMapping("/applog") public String doLog(@RequestParam("param")String log){ saveToDisk(log); return "success"; } private void saveToDisk(String strLog) { //log.info(logString) 這里的log對象默認情況如果idea不能識別, //寫代碼的時候會報錯(執行並不會報錯), 需要在idea安裝一個插件: lombok, 看着就舒服了 log.info(strLog); } }
3)確認是否可以正常落盤, 並能在console打印日志.
步驟6: 把日志直接寫入到kafka中
1)在application.properties中配置Kafka相關配置
#============== kafka =================== # 指定kafka 代理地址,可以多個 spring.kafka.bootstrap-servers=hadoop162:9092,hadoop163:9092,hadoop164:9092 # 指定消息key和消息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
2)具體寫入kafka的代碼
package com.yuange.flink.flinklogger.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; /** * @作者:袁哥 * @時間:2021/7/27 19:54 */ @RestController @Slf4j public class LoggerController { @GetMapping("/applog") public String doLog(@RequestParam("param")String log){ //數據落盤 saveToDisk(log); //寫入kafka sendToKafka(log); return "success"; } @Autowired private KafkaTemplate<String,String> kafka; private void sendToKafka(String strLog) { kafka.send("ods_log",strLog); } private void saveToDisk(String strLog) { //log.info(logString) 這里的log對象默認情況如果idea不能識別, //寫代碼的時候會報錯(執行並不會報錯), 需要在idea安裝一個插件: lombok, 看着就舒服了 log.info(strLog); } }
步驟7: 測試是否可以寫入到kafka
1)啟動Zookeeper
zk start
2)啟動Kafka
kafka.sh start
3)啟動終端消費者
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic osd_log
4)啟動SpringBoot日志服務器
5)發送模擬數據
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar
6)確認kafka是否收到數據
6.3 部署到linux測試數據采集
在idea中如果日志服務器可以測試通過, 現在打包, 然后部署到linux測試.
步驟1: logback.xml中的落盤目錄修改為linux的目錄
#創建applog目錄
mkdir /opt/software/mock/mock_log/applog
<property name="LOG_HOME" value="/opt/software/mock/mock_log/applog"/>
步驟2: 打包flink-logger, 並發送到linux
步驟3: 啟動gmll-logger服務器
cd /opt/module/applog
java -jar flink-logger-1.0-SNAPSHOT.jar
步驟4: 啟動模擬數據
1)修改/opt/software/mock/mock_log/application.yml 文件
#http模式下,發送的地址
mock.url: "http://hadoop162:8081/applog"
2)啟動程序,生產數據
java -jar gmall2020-mock-log-2020-12-18.jar
3)查看Kafka是否有數據
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic ods_log
第7章 搭建集群版數據采集服務器
7.1 部署Nginx
7.1.1 Nginx簡介
Nginx (讀作“engine x”), 是一個高性能的 HTTP 和反向代理服務器 , 特點是占有內存少,並發能力強,事實上 nginx 的並發能力確實在同類型的網頁服務器中表現較好,中國大陸使用 nginx 網站用戶有:百度、京東、新浪、網易、騰訊、淘寶等。
7.1.2 Nginx和Tomcat的關系
除了 tomcat 以外, apache,nginx,jboss,jetty 等都是 http 服務器。nginx 和 apache 只支持靜態頁面和 CGI 協議的動態語言,比如 perl 、 php 等, 但是nginx不支持 java 。Java 程序只能通過與 tomcat 配合完成。 nginx 與 tomcat 配合,為 tomcat 集群提供反向代理服務、負載均衡等服務
7.1.3 Nginx功能
1)反向代理
2)負載均衡
(1)輪詢(默認) 每個請求按時間順序逐一分配到不同的后端服務器,如果后端某台服務器宕機,則自動剔除故障機器,使用戶訪問不受影響
(2)weight 指定輪詢權重,weight值越大,分配到的幾率就越高,主要用於后端每台服務器性能不均衡的情況。
(3)備機模式 平時不工作, 只有其他down 機的時候才會開始工作
(4)公平模式(第三方) 更智能的一個負載均衡算法,此算法可以根據頁面大小和加載時間長短智能地進行負載均衡,也就是根據后端服務器的響應時間來分配請求,響應時間短的優先分配。如果想要使用此調度算法,需要Nginx的upstream_fair模塊。
3)動靜分離
7.1.4 安裝部署
1)使用 yum 安裝依賴包
sudo yum -y install openssl openssl-devel pcre pcre-devel zlib zlib-devel gcc gcc-c++
2)下載 Nginx,如果已經有下載好的安裝包, 此步驟可以省略
wget http://nginx.org/download/nginx-1.12.2.tar.gz
3)解壓到當前目錄
tar -zxvf nginx-1.12.2.tar.gz
4)編譯和安裝,進入解壓后的根目錄
./configure --prefix=/opt/module/nginx
make && make install
5)啟動 Nginx
(1)進入安裝目錄:
cd /opt/module/nginx
(2)啟動 nginx:
sbin/nginx
(3)關閉 nginx:
sbin/nginx -s stop
(4)重新加載:
sbin/nginx -s reload
注意:
1)Nginx 默認使用的是 80 端口, 由於非root用戶不能使用 1024 以內的端口, 但是在生產環境下不建議使用root用戶啟動nginx, 主要從安全方面考慮
2)如果使用普通用戶啟動 Nginx, 需要先執行下面的命令來突破上面的限制:
sudo setcap cap_net_bind_service=+eip /opt/module/nginx/sbin/nginx
6)查看是否啟動成功:http://hadoop162
ps -ef | grep nginx
7.1.5 配置負載均衡
模擬數據以后應該發給nginx, 然后nginx再轉發給我們的日志服務器,日志服務器我們會分別配置在hadoop162,hadoop163,hadoop164三台設備上
1)打開nginx配置文件
cd /opt/module/nginx/conf
vim nginx.conf
2)修改如下配置
http {
# 啟動省略
upstream logcluster{
server hadoop162:8081 weight=1;
server hadoop163:8081 weight=1;
server hadoop164:8081 weight=1;
}
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
#root html;
#index index.html index.htm;
# 代理的服務器集群 命名隨意, 但是不能出現下划線
proxy_pass http://logcluster;
proxy_connect_timeout 10;
}
# 其他省略
}
7.2 分發日志服務器
日志服務器每個節點配置一個. (nginx只需要配置到hadoop162單台設備就行了)
xsync /opt/module/applog/
7.3 日志服務器群起腳本
1)新建腳本
vim /home/atguigu/bin/log-lg.sh
2)編寫腳本
#!/bin/bash log_app=flink-logger-1.0-SNAPSHOT.jar nginx_home=/opt/module/nginx log_home=/opt/module/applog case $1 in "start") # 在hadoop162啟動nginx if [[ -z "`ps -ef | awk '/nginx/ && !/awk/{print $n}'`" ]]; then echo "在hadoop162啟動nginx" $nginx_home/sbin/nginx fi # 分別在162-164啟動日志服務器$ for host in hadoop162 hadoop163 hadoop164 ; do echo "在 $host 上啟動日志服務器" ssh $host "nohup java -jar $log_home/$log_app 1>$log_home/logger.log 2>&1 &" done ;; "stop") echo "在hadoop162停止nginx" $nginx_home/sbin/nginx -s stop for host in hadoop162 hadoop163 hadoop164 ; do echo "在 $host 上停止日志服務器" ssh $host "jps | awk '/$log_app/ {print \$1}' | xargs kill -9" done ;; *) echo "你啟動的姿勢不對" echo " log.sh start 啟動日志采集" echo " log.sh stop 停止日志采集" ;; esac
3)賦予可執行權限
chmod +x /home/atguigu/bin/log-lg.sh
4)分發
xsync /home/atguigu/bin/log-lg.sh
7.4 測試負載均衡
發送模擬數據,注意把端口改為nginx的端口:80
1)啟動zookeeper
2)啟動kafka
3)啟動日志服務器
log-lg.sh start
4)修改配置文件,將訪問端口改為80
vim /opt/software/mock/mock_log/application.yml
#http模式下,發送的地址
mock.url: "http://hadoop162:80/applog"
5)啟動kafka消費者
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop163:9092 --topic ods_log
6)啟動程序,生產數據
cd /opt/software/mock/mock_log
java -jar gmall2020-mock-log-2020-12-18.jar
7)發現Kafka消費到了數據
第8章 業務數據采集
可以實時采集mysql數據的工具有:canal 和 maxwell,debzium
兩個工具是競品, 各有優缺點
8.1 使用canal實時采集mysql數據
8.1.1 什么是canal
阿里巴巴B2B公司,因為業務的特性,賣家主要集中在國內,買家主要集中在國外,所以衍生出了杭州和美國異地機房的需求,從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。
Canal是用java開發的基於數據庫增量日志解析,提供增量數據訂閱&消費的中間件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用來處理獲得的相關數據。(數據庫同步需要阿里的otter中間件,基於canal)。
8.1.2 canal使用場景
原始場景: otter中間件的一部分,阿里otter中間件的一部分,otter是阿里用於進行異地數據庫之間的同步框架,canal是其中一部分。
常見場景1: 更新緩存服務器
常用場景2: 制作拉鏈表
抓取業務數據新增變化表,用於制作拉鏈表,如果表中沒有更新時間, 制作拉鏈表就需要使用canal實時監控數據的變化
常用場景3:用於實時統計
抓取業務表的新增變化數據,用於制作實時統計,我們實時數倉就是這種應用場景!
8.1.3 canal工作原理
mysql的主從復制原理
1)MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
2)MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
3)MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
canal工作原理
1)canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
2)MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
3)canal 解析 binary log 對象(原始為 byte 流)
8.1.4 mysql的binlog
1)什么是binlog
MySQL的二進制日志可以說是MySQL最重要的日志了,它記錄了所有的DDL和DML(除了數據查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進制日志是事務安全型的。一般來說開啟二進制日志大概會有1%的性能損耗。
二進制有兩個最重要的使用場景:
其一:MySQL Replication在Master端開啟binlog,Mster把它的二進制日志傳遞給slaves來達到master-slave數據一致的目的。
其二:自然就是數據恢復了,通過使用mysqlbinlog工具來使恢復數據。
二進制日志包括/兩類文件:
A: 二進制日志索引文件(文件名后綴為.index)用於記錄所有的二進制的文件
B:二進制日志文件(文件名后綴為.00000*)記錄數據庫所有的DDL和DML(除了數據查詢語句)語句事件。
2)開啟binlog
默認情況下, mysql是沒有開啟binlog的, 需要手動開啟,開啟步驟:
(1)找到mysql的配置文件:my.cnf. 大部分的mysql版本默認在: /etc/my.cnf.如果沒有找到, 則可以通過下面的命令查找:
sudo find / -name my.cnf
(2)修改my.cnf. 在my.cnf文件中增加如下內容:
sudo vim /etc/my.cnf
server-id= 1 #日志前綴 log-bin=mysql-bin #同步格式 binlog_format=row #同步的庫 binlog-do-db=flinkdb
3)配置說明
(1)server-id: mysql主從復制的時候, 主從之間每個實例必須有獨一無二的id
(2)log-bin:這個表示binlog日志的前綴是mysql-bin,以后生成的日志文件就是 mysql-bin.123456 的文件后面的數字按順序生成。 每次mysql重啟或者到達單個文件大小的閾值時,新生一個文件,按順序編號。
(3)Binlog_format:mysql binlog的格式,有三種值,分別是statement, row, mixed,三者區別如下
statement:
語句級,binlog會記錄每次一執行寫操作的語句。相對row模式節省空間,但是可能產生不一致性,比如 update tt set create_date=now(),如果用binlog日志進行恢復,由於執行時間不同可能產生的數據就不同。
優點: 節省空間
缺點: 有可能造成數據不一致。
row:
行級, binlog會記錄每次操作后每行記錄的變化。
優點:保持數據的絕對一致性。因為不管sql是什么,引用了什么函數,他只記錄執行后的效果。
缺點:占用較大空間
mixed:
statement的升級版,一定程度上解決了,因為一些情況而造成的statement,模式不一致問題,在某些情況下譬如:當函數中包含 UUID() 時, 包含 AUTO_INCREMENT 字段的表被更新時;執行 INSERT DELAYED 語句時;用 UDF 時;會按照ROW的方式進行處理
優點:節省空間,同時兼顧了一定的一致性。
缺點:還有些極個別情況依舊會造成不一致,另外statement和mixed對於需要對binlog的監控的情況都不方便。
由於canal不是數據庫, 是不能執行sql語句的, 所以, 只能設置為row格式
(3)binlog-do-db:設置把哪個database的變化寫入到binlog, 如果不配置, 則所有database的變化都會寫入到binlog,如果要設置多個數據庫需要, 需要寫多次這個參數的配置
binlog-do-db = a binlog-do-db = b
4)檢測配置是否成功
(1)重啟mysql服務器
sudo systemctl restart mysqld
(2)啟動msyql客戶端, 執行sql語句:
mysql -uroot -paaaaaa
show variables like'%log_bin%';
(3)也可以去對應的目錄下查看是否生成log_bin文件
cd /var/lib/mysql
sudo ls
8.1.5 在mysql准備業務數據
CREATE DATABASE `flinkdb` CHARACTER SET utf8 COLLATE utf8_general_ci;
USE flinkdb;
8.1.6 下載和解壓安裝canal
1)在mysql創建canal用戶,canal需要監控mysql數據, 在企業中一般拿不到root用戶, 需新創建只讀取權限的用戶
Mysql> set global validate_password_policy=0; mysql> set global validate_password_length=4; mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY'canal'; mysql> FLUSH PRIVILEGES;
2)下載canal
wget -P /opt/software/canal https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
3)解壓安裝canal
mkdir /opt/module/canal
tar -zxvf /opt/software/canal/canal.deployer-1.1.4.tar.gz -C /opt/module/canal
8.1.7 配置canal
canal有兩種配置:server級別和instance級別
1)server級別的配置是對整個canal進行配置, 是一些全局性的配置. 一個sever中可以配置多個instance
2)instance級別的配置, 是最小的訂閱mysql的隊列,比如example實例
3)canal server配置
vim /opt/module/canal/conf/canal.properties
#重點關注以下配置: canal.ip = hadoop162 # canal服務器綁定ip地址 canal.port = 11111 # canal端口號, 將來客戶端通過這個端口號可以讀到數據 canal.zkServers = hadoop162:2181,hadoop163:2181,hadoop164:2181 # zk地址, 用來管理canal的高可用 # tcp, kafka, RocketMQ # tcp:客戶端通過tcp方式從Canal服務端拉取增量數據 # kafka:Canal服務端將增量數據同步到kafka中,客戶端從kafka消費數據,此時客戶端感知不到Canal的存在,只需要跟kafka交互。 # RocketMQ:同kafka,增量數據同步到RocketMQ中。 canal.serverMode = kafka canal.destinations = yuange # 配置實例, 如果有多個實例, 用逗號隔開. 我們創建一個yaunge實例 canal.mq.servers = hadoop162:9092,hadoop163:9092,hadoop164:9092
4)canal instance配置
(1)把目錄名example改為yuange(其實就是和剛才的配置保存一致, 用來表示yuange實例)
mv /opt/module/canal/conf/example /opt/module/canal/conf/yuange
(2)打開實例配置文件:
vim /opt/module/canal/conf/yuange/instance.properties
(3)在其中配置要監控的mysql和監控到的數據發送到kafka
# canal實例(slave)的id, 不能和mysql的id重復. 可以自動生成, 無需手工配置 # canal.instance.mysql.slaveId=0 # 要監控的mysql地址 canal.instance.master.address = hadoop162:3306 # 連接mysql的用戶名 canal.instance.dbUsername=canal # 連接mysql的密碼 canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # 該實例監控的 庫.表 默認所有庫下所有表 canal.instance.filter.regex=flinkdb\\..* # 監控gmall數據庫下所有包 # kafka topic配置 canal.mq.topic=ods_db # 注釋掉此配置, 此配置是只發送到一個固定分區中 # canal.mq.partition=0 # 散列模式的分區數, 要和kafka的topic的分區數保持一致 canal.mq.partitionsNum=2 # 如何計算每條數據進入的分區 canal.mq.partitionHash= .*\\..*:$pk$ # 指定所有的表用主鍵hash得到分區索引
8.1.8 cana HA配置和啟動canal
canal只是支持HA, 不支持高負載, 沒有負載均衡的概念.
1)分發canal到hadoop163和hadoop164,注意: 修改canal.ip = hadoop162, 為hadoop163和hadoop164
xsync /opt/module/canal/
2)在hadoop162,hadoop163,hadoop164分別啟動canal,注意:需要先啟動zookeeper和kafka
/opt/module/canal/bin/startup.sh
3)制作canal統一啟停腳本
vim /home/atguigu/bin/canal.sh
#!/bin/bash canal_home=/opt/module/canal case $1 in start) for host in hadoop162 hadoop163 hadoop164 ; do echo "========== $host 啟動canal =========" ssh $host "source /etc/profile; ${canal_home}/bin/startup.sh" done ;; stop) for host in hadoop162 hadoop163 hadoop164 ; do echo "========== $host停止 canal =========" ssh $host "source /etc/profile; ${canal_home}/bin/stop.sh" done ;; *) echo "你啟動的姿勢不對" echo " start 啟動canal集群" echo " stop 停止canal集群" ;; esac
8.1.9 測試kafka是否收到實時數據
1)起一個終端消費者, 消費ods_db
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic ods_db
2)修改配置文件
vim /opt/software/mock/mock_db/application.properties
#配置要連接的mysql數據庫 spring.datasource.url=jdbc:mysql://hadoop162:3306/flinkdb?characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8 spring.datasource.username=root spring.datasource.password=aaaaaa #業務日期 mock.date=2021-07-28
3)生產數據至Mysql
cd /opt/software/mock/mock_db
java -jar gmall2020-mock-db-2020-12-23.jar
4)觀察消費者是否消費到數據, 如果沒有消費到數據, 則需要重新檢測canal配置
8.1.10 接收到的數據格式分析
發送到kafka的數據格式
{ "data":[ { "id":"350", "consignee":"蔣雄", "consignee_tel":"13325313235", "final_total_amount":"389.0", "order_status":"1005", "user_id":"62", "delivery_address":"第17大街第7號樓9單元324門", "order_comment":"描述353475", "out_trade_no":"822287931878949", "trade_body":"十月稻田 沁州黃小米 (黃小米 五谷雜糧 山西特產 真空裝 大米伴侶 粥米搭檔) 2.5kg等2件商品", "create_time":"2020-08-26 15:02:40", "operate_time":"2020-08-26 15:02:41", "expire_time":"2020-08-26 15:17:40", "tracking_no":null, "parent_order_id":null, "img_url":"http://img.gmall.com/933223.jpg", "province_id":"3", "benefit_reduce_amount":"108.0", "original_total_amount":"488.0", "feight_fee":"9.0" } ], "database":"gmall", "es":1598425361000, "id":73, "isDdl":false, "mysqlType":{ "id":"bigint(20)", "consignee":"varchar(100)", "consignee_tel":"varchar(20)", "final_total_amount":"decimal(16,2)", "order_status":"varchar(20)", "user_id":"bigint(20)", "delivery_address":"varchar(1000)", "order_comment":"varchar(200)", "out_trade_no":"varchar(50)", "trade_body":"varchar(200)", "create_time":"datetime", "operate_time":"datetime", "expire_time":"datetime", "tracking_no":"varchar(100)", "parent_order_id":"bigint(20)", "img_url":"varchar(200)", "province_id":"int(20)", "benefit_reduce_amount":"decimal(16,2)", "original_total_amount":"decimal(16,2)", "feight_fee":"decimal(16,2)" }, "old":[ { "order_status":"1002" } ], "pkNames":[ "id" ], "sql":"", "sqlType":{ "id":-5, "consignee":12, "consignee_tel":12, "final_total_amount":3, "order_status":12, "user_id":-5, "delivery_address":12, "order_comment":12, "out_trade_no":12, "trade_body":12, "create_time":93, "operate_time":93, "expire_time":93, "tracking_no":12, "parent_order_id":-5, "img_url":12, "province_id":4, "benefit_reduce_amount":3, "original_total_amount":3, "feight_fee":3 }, "table":"order_info", "ts":1598425365252, "type":"UPDATE" }
8.1.11 驗證canal高可用是否正常工作
1)當前啟動canal的時候, 只有一台設備會啟動 yuange實例
#啟動ZK客戶端
zkCli.sh
get /otter/canal/destinations/yuange/running
2)停止hadoop163的canal, 然后觀察
ssh hadoop163 /opt/module/canal/bin/stop.sh
8.2 使用maxwell實時采集mysql數據
8.2.1 什么是maxwell
maxwell 是由美國zendesk開源,用java編寫的Mysql實時抓取軟件。 其抓取的原理也是基於binlog。
8.2.2 Maxwell與canal的對比
1)Maxwell 沒有 Canal那種server+client模式,只有一個server把數據發送到消息隊列或redis。
2)Maxwell 有一個亮點功能,就是Canal只能抓取最新數據,對已存在的歷史數據沒有辦法處理。而Maxwell有一個bootstrap功能,可以直接引導出完整的歷史數據用於初始化,非常好用。
3)Maxwell不能直接支持HA,但是它支持斷點還原,即錯誤解決后重啟繼續上次點兒讀取數據。
4)Maxwell只支持json格式,而Canal如果用Server+client模式的話,可以自定義格式。
5)Maxwell比Canal更加輕量級。
8.2.3 使用maxwell前的准備工作
1)在mysql中創建一個數據庫, 用於存儲maxwell的元數據(可以省略, maxwell會自動創建)
CREATE DATABASE `maxwell` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
2)創建可以操作數據庫maxwell的用戶:maxwell
USE maxwell; SET GLOBAL validate_password_policy=0; SET GLOBAL validate_password_length=4; GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY 'aaaaaa';
3)給用戶maxwell分配操作其他數據庫的權限
GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'maxwell'@'%';
FLUSH PRIVILEGES;
8.2.4 安裝和配置maxwell
1)下載maxwell
mkdir /opt/software/maxwell
wget -P /opt/software/maxwell https://github.com/zendesk/maxwell/releases/download/v1.27.1/maxwell-1.27.1.tar.gz
2)解壓
tar -zxvf /opt/software/maxwell/maxwell-1.27.1.tar.gz -C /opt/module/
3)配置maxwell
cd /opt/module/maxwell-1.27.1
mv config.properties.example config.properties
vim config.properties
#添加如下配置: log_level=info producer=kafka kafka.bootstrap.servers=hadoop162:9092,hadoop163:9092,hadoop164:9092 kafka_topic=ods_db # 按照主鍵的hash進行分區, 如果不設置是按照數據庫分區 producer_partition_by=primary_key # mysql login info host=hadoop162 user=maxwell password=aaaaaa # 排除掉不想監控的數據庫 filter=exclude:gmall.* # 初始化維度表數據的時候使用 client_id=maxwell_1
8.2.5 啟動maxwell
1)啟動maxwell(先停止canal集群)
canal.sh stop
/opt/module/maxwell-1.27.1/bin/maxwell --config /opt/module/maxwell-1.27.1/config.properties --daemon
2)確認kafka是否收到數據,起一個終端消費者:
/opt/module/kafka-2.4.1/bin/kafka-console-consumer.sh --bootstrap-server hadoop162:9092 --topic ods_db
3)在mysql中生成數據至mysql, 確認kafka是否收到數據
java -jar gmall2020-mock-db-2020-12-23.jar
4)查看消費情況
8.2.6 Maxwell發送到kafka的數據格式
{ "database":"flinkdb", "table":"cart_info", "type":"update", "ts":1627449145, "xid":14229, "xoffset":3823, "data":{ "id":141065, "user_id":"1539", "sku_id":23, "cart_price":40, "sku_num":3, "img_url":"http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-0liuAJTluAAVP1d_tXYs725.jpg", "sku_name":"十月稻田 遼河長粒香 東北大米 5kg", "is_checked":null, "create_time":"2021-07-28 13:12:23", "operate_time":null, "is_ordered":1, "order_time":"2021-07-28 13:12:25", "source_type":"2401", "source_id":null }, "old":{ "is_ordered":0, "order_time":null } }
8.3 Canal和Maxwell發送到kafka的數據對比
1)為了方便做對比, 在gmall數據庫下創建一個表:test_user_info
create table test_user_info(id int primary key, name varchar(255), tel char(11));
2)插入數據
insert into test_user_info values(1,'lisi','13838389438');
Canal |
Maxwell |
{ |
{ |
3)刪除數據
delete from test_user_info where id=1;
Canal |
Maxwell |
{ |
{ |
4)更新數據
update test_user_info set name='zs' where id=1;
Canal |
Maxwell |
{ |
{ |
5)總結數據特點:
(1)日志結構:canal產生的數據會放在數組結構中,maxwell 以影響的數據為單位產生日志,即每影響一條數據就會產生一條日志。如果想知道這些日志是否是通過某一條sql產生的可以通過xid進行判斷,相同的xid的日志來自同一sql
(2)數字類型:當原始數據是數字類型時,maxwell會尊重原始數據的類型不增加雙引,不變為字符串。Canal一律轉換為字符串。
(3)帶原始數據字段定義:canal數據中會帶入表結構。Maxwell更簡潔。
8.4 Maxwell的初始化數據功能
對Mysql中的已有的舊數據, 如何導入到Kafka中? Canal無能為力, Maxwell提供了一個初始化功能, 可以滿足我們的需求
/opt/module/maxwell-1.27.1/bin/maxwell-bootstrap --user maxwell --password aaaaaa --host hadoop162 --database flinkdb --table user_info --client_id maxwell_1
maxwell-bootstrap不具備將數據直接導入kafka或者hbase的能力,通過--client_id指定將數據交給哪個maxwell進程處理,在maxwell的conf.properties中配置