1.導語
來源:https://www.toutiao.com/i6852129443306340871/
本節億級數據從 MySQL 到 Hbase 的三種同步方案與實踐將主要圍繞下面架構圖中的三種方法進行實踐與講解。

2.工欲善其事,必先利其器
2.1 環境需知
我的實驗環境為:Ubuntu16.04+hadoop偽分布式(所以重點會介紹偽分布式環境部署),本節實驗可以適用於大部分Linux。
實驗的環境有:
- MySQL
- Hadoop偽分布式/完全分布式
- HBase
- Phoenix
- Zookeeper
- Kafka
- Maxwell
- Flink
所以,本節內容先從以上環境部署講起,再來逐步分析億級數據從 MySQL 到 Hbase 的三種同步方案與實踐。
注意:本節不會非常深入的去講解HBase、Phoenix、Kafka、Maxwell、Flink等內容,因為涉及的面非常多,光一個就可以講很多天了,所以本節將具體的某一塊與我們的場景相結合進行闡述,談談他們的具體應用與使用,相信大家看完后,對這些會有更加深入的理解!
2.2 偽分布式環境部署
2.2.1.准備工作
【JAVA】
Hadoop環境需要JAVA環境,所以首先得安裝Java,而Ubuntu默認Java為OpenJdk,需要先卸載,再安裝Oracle。除此之外,也可以不用卸載OpenJDK,將Oracle JAVA設為默認的即可。
★
https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
”

關於java配置只要輸入java或者javac看到輸出,配置成功。
【用戶】
在Ubuntu或者類Unix系統中,用戶可以通過下列命令添加創建用戶:
sudo useradd -s /bin/bash -g hadoop -d /home/hadoop -m hadoop
如果提示hadoop不再sudoers文件中,執行下列命令:
vi /etc/sudoers
編輯上述文件:
# User privilege specification
root ALL=(ALL:ALL) ALL
hadoop ALL=(ALL:ALL) ALL # 添加此行
再執行上述命令:
light@city:~$ sudo useradd -s /bin/bash -g hadoop -d /home/hadoop -m hadoop
useradd:“hadoop”組不存在
添加用戶組:
light@city:/home$ sudo groupadd hadoop
再次執行即可:
light@city:~$ sudo useradd -s /bin/bash -g hadoop -d /home/hadoop -m hadoop
設置或修改密碼:
sudo passwd hadoop
【SSH】
安裝ssh
sudo apt-get install openssh-server
配置免密登陸
su - hadoop
ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 0600 ~/.ssh/authorized_keys
然后輸入
ssh localhost
此時不需要輸入密碼,說明成功!
注意:關於ssh免秘登陸失敗問題,大家可以通過以下方法進行嘗試,大部分問題在於目錄及文件權限!
sudo chmod 755 $HOME
sudo chmod 600 id_rsa
sudo chmod 600 id_rsa.pub
sudo chmod 644 authorized_keys
2.2.2 偽分布式
【Hadoop】
- 下載及安裝
在下列鏡像中下載Hadoop版本,我下載的3.0.2。
★
https://mirrors.cnnic.cn/apache/hadoop/common/
”
wget https://mirrors.cnnic.cn/apache/hadoop/common/hadoop-3.0.2/hadoop-3.0.2.tar.gz
tar zxvf hadoop-3.0.2.tar.gz
sudo mv hadoop-3.0.2 /usr/local/hadoop
- 配置
編輯etc/hadoop/core-site.xml,configuration配置為
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>
注意:一定要看本機的9000端口是否被占用,如果被占用了,后面就啟動不出來NameNode!
關於查看本機的9000端口是否被占用:
sudo netstat -alnp | grep 9000

會發現9000端口被php-fpm給占用了,所以這里得修改為其他端口,比如我修改為9012,然后可以再次執行這個命令,會發現沒被占用,說明可行!
編輯etc/hadoop/hdfs-site.xml,configuration配置為
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
- 初始化
格式化HDFS
bin/hdfs namenode -format
注意:格式化執行一次即可!
啟動NameNode和DataNode
sbin/start-dfs.sh
這時在瀏覽器中訪問http://localhost:9870/,可以看到NameNode相關信息。
http://localhost:9864/查看DataNode相關信息。
由於hadoop3.x版本與2.x版本監聽端口不一樣,所以如果還是原先的50070便訪問不到相關信息,不知道上述9870或者9864,沒關系,可以通過下面命令查看!
輸入netstat命令即可查看tcp監聽端口:
sudo netstat -ntlp

jps_ntlp
上述兩個重要端口,9864后面可以看到進程ID為17270,通過JPS查看可以看到對應DataNode,9870類似方法。

hadoop
- 配置YARN
編輯etc/hadoop/mapred-site.xml,configuration配置為
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
編輯etc/hadoop/yarn-site.xml,configuration配置為
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
- 啟動YARN
sbin/start-yarn.sh
查看進程:
Jps
NodeManager
SecondaryNameNode
NameNode
ResourceManager
DataNode
YARN就是上述的資源管理:ResourceManager。
同理,可以通過上述方法查看ResourceManager的端口,默認為8088。
瀏覽器輸入:http://localhost:8088/cluster

hadoop
- 啟動與停止
啟動:
sbin/start-dfs.sh
sbin/start-yarn.sh
停止:
sbin/stop-dfs.sh
sbin/stop-yarn.sh
至此,偽分布式搭建完畢!后面開始HBase與Phoenix搭建!
【HBase】
- 下載安裝
★
https://mirrors.cnnic.cn/apache/hbase/
”
wget https://mirrors.cnnic.cn/apache/hbase/stable/hbase-1.4.9-bin.tar.gz
tar zxvf hbase-1.4.9-bin.tar.gz
sudo mv zxvf hbase-1.4.9-bin /usr/local/hbase
- 單機HBase配置
編輯conf/hbase-site.xml,configuration配置為
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://localhost:9012/hbase</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/hadoop/zookeeper</value>
</property>
</configuration>
- 啟動
bin/start-hbase.sh
jps查看進程:
HMaster
Jps
- 終端
bin/hbase shell

hbaseshell
如果想要關閉HBase,則輸入:
bin/stop-hbase.sh
- HBase偽分布式配置
編輯conf/hbase-site.xml,configuration中添加
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
然后修改root由本地文件系統到HDFS,編輯conf/hbase-site.xml,hbase.rootdir值由
file:///home/hadoop/hbase
修改為
hdfs://localhost:9012/hbase
★
注意后面的端口號9012,需要保證與Hadoop DFS配置中的fs.defaultFS相同!
”
這樣子修改后,會在hdfs文件系統中看到HBase目錄,當然你也可以不用配置此項!
上述配置完畢后,保存后,重啟HBase即可!
【封裝】
每次啟動這些輸入太多命令,太繁瑣,直接一個bash腳本搞定,首先進入/usr/local,然后再運行這個腳本!
啟動腳本:
#!/bin/bash
hadoop/sbin/start-dfs.shkuangjia
hadoop/sbin/start-yarn.sh
hbase/bin/start-hbase.sh
停止腳本:
#!/bin/bash
hadoop/sbin/stop-dfs.sh
hadoop/sbin/stop-yarn.sh
hbase/bin/stop-hbase.sh
【zookeeper】
由於Hbase自帶了zookeeper,一開始使用自帶的,后來發現出了很多問題,換成自己配置zookeeper,配置方法如下:
最近做的數據遷移,當上游數據流向下游過大的時候,HBase就會崩潰。HBase自帶的Zookeeper出了問題,就嘗試自己安裝獨立的Zookeeper。
(1)禁用HBase自帶的Zookeeper
修改 ./conf/hbase-env.sh
export HBASE_MANAGES_ZK=false(如果值為true,則使用自帶的Zookeeper,會隨着HBase一起啟動)
(2)安裝及配置獨立Zookeeper
Zookeeper最新的版本可以通過官網獲取
wget http://apache.fayea.com/zookeeper/zookeeper-xxx/zookeeper-xxx.tar.gz
tar xfz zookeeper-xxx.tar.gz
mv zookeeper-xxx /usr/local/zookeeper
★
拷貝配置文件
”
cd zookeeper-xxx/conf/
cp zoo_sample.cfg zoo.cfg
★
修改配置項
”
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
dataDir:Zookeeper保存節點數據的目錄。dataLogDir:Zookeeper保存節點數據的日志。
如果沒有這個目錄,就創建一下。
(3)HBase配置
★
拷貝 zoo.cfg 到 hbase/conf/ 目錄下
”
cp zoo.cfg /usr/local/hbase/conf/
這是官方文檔推薦的做法,如果不拷貝 zoo.cfg,在 hbase-site.xml 中也可以對Zookeeper進行相關配置,但HBase會優先使用 zoo.cfg(如果有的話)的配置
★
修改 hbase-site.xml
”
在原文件上加入:
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
如果不加的話,在啟動獨立安裝的Zookeeper后,HBase不能正常啟動。
(4)啟動Zookeeper
./bin/zkServer.sh start
(5)檢查服務是否啟動
ps -ef | grep zookeeper
(6)啟動HBase
在成功啟動Zookeeper后,就可以啟動HBase了:
./bin/start-hbase.sh
【Phoenix】
版本要與HBase相匹配!
下載apache-phoenix-4.14.2-HBase-1.4-bin.tar.gz
★
安裝
”
tar -xvf apache-phoenix-4.14.2-HBase-1.4-bin.tar.gz
mv apache-phoenix-4.14.2-HBase-1.4-bin.tar.gz /usr/local/phoenix
★
配置
”
將hbase-site.xml配置文件拷貝到phoenix的bin目錄下
★
啟動
”
首先啟動zookeeper與hbase。
hadoop@city: ./start_zk.sh
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
hadoop@city: ./start_hbase.sh
running master, logging to /usr/local/hbase/logs/hbase-hadoop-master-city.out
: running regionserver, logging to /usr/local/hbase/logs/hbase-hadoop-regionserver-city.out
啟動phoenix:
sqlline.py localhost

install_phoenix
【Kafka】
★
什么是Kafka?
”
Kafka 是一種分布式的,基於發布 / 訂閱的消息系統。在這里可以把Kafka理解為生產消費者模式。
Kafka是使用Java開發的應用程序,Kafka需要運行Zookeeper,兩者都需要Java,所以在需要安裝Zookeeper和Kafka之前,先安裝Java環境。
★
啟動Zookeeper
”
直接輸入zkServer.sh start即可!

zookeeper_start
★
Kafka安裝及配置
”
Kafka下載地址:
★
http://kafka.apache.org/downloads
”
同上述安裝,這里下載.tgz文件,也是解壓后移動到/usr/local即可!
關於配置文件可以直接采用默認的即可。
★
啟動Kafka
”
./bin/kafka-server-start.sh ./config/server.properties

enter image description here
★
Topic創建
”
當使用下面maxwell提取出來的binlog信息的時候,默認使用kafka進行消費。
./kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
★
發布與訂閱
”
向Topic上發布消息,按Ctrl+D結束:
./kafka-console-producer.sh --broker-list localhost:9092 --topic test

enter image description here
從Topic上接收消息,按Ctrl+C結束:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

recv_mesg
【Maxwell】
★
Maxwell是什么?
”
官網原語:Maxwell's daemon, a mysql-to-json kafka producer。
這里解釋一下Maxwell是將mysql binlog中的insert、update等操作提取出來,並以json數據返回作為kafka生產者。
當然自己也可以用編程實現binlog數據提取,並返回一個json數據。
★
下載地址:http://maxwells-daemon.io/
”
安裝方式同上。
★
mysql配置Maxwell
”
Maxwell配置文件中默認用戶名密碼均為maxwell,所以需要在mysql中做相應的授權。
mysql> GRANT ALL on maxwell.* to'maxwell'@'%' identified by 'maxwell';
mysql> GRANT SELECT, REPLICATION CLIENT,REPLICATION SLAVE on *.* to 'maxwell'@'%';
mysql> flush privileges;
★
配置Maxwell
”
cp config.properties.example config.properties
vi config.properties
maxwell配置:
log_level=info
# 默認生產者
producer=kafka
kafka.bootstrap.servers=localhost:9092
# mysql login info
host=localhost
user=maxwell
password=maxwell
# kafka配置
kafka_topic=test
kafka.compression.type=snappy
kafka.acks=all
kinesis_stream=test
★
啟動maxwell
”
./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test
當然也可以把上述封裝成一個啟動腳本:
#!/bin/bash
./maxwell/bin/maxwell --user='maxwell' --password='maxwell' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=test
直接啟動:
./start_maxwell.sh

maxwell
【Flink】
★
什么是Flink?
”
干說Flink比較抽象,直接舉個例子吧,就拿本節的同步來說,本節使用的Flink就是做實時流計算的一個場景ETL,數據倉庫的實時同步,當上游下發數據到Kafka隊列中,然后通過Flink程序做window的收集,並將數據sink到Hbase中。
★
下載:https://flink.apache.org/
”
安裝的時候,直接進行解壓縮並配置path環境即可!
★
解壓縮
”
tar -zxf xxx.tgz
mv xxx /usr/local
★
配置環境變量
”
vim ~/.bashrc
export FLNK_HOME=/usr/local/flink
export PATH=$FLINK_HOME/bin:$PATH
使上述生效:
source ~/.bashrc
★
啟動與關閉flink
”
cd flink/bin
./start-cluster.sh # 啟動
stop-cluster.sh # 關閉
3.億級MySQL數據插入
本節題目為:億級數據從 MySQL 到 Hbase 的三種同步方案與實踐,首先需要了解如何快速插入MySQL。
那么MySQL數據插入將會從以下幾個方法入手:
- load data infile
- Python 單條插入
- Python 多線程插入
當然也可以使用其他語言進行實現!!!
下面來逐步談談數據插入!
數據插入之前,需要了解我們的數據,先來看一下數據字段描述:
數據以ASCII文本表示,以逗號為分隔符,以回車換行符(0x0D 0x0A)結尾。數據項及順序:車輛標識、觸發事件、運營狀態、GPS時間、GPS經度、GPS緯度,、GPS速度、GPS方向、GPS狀態
車輛標識:6個字符
觸發事件:0=變空車,1=變載客,2=設防,3=撤防,4=其它
運營狀態:0=空車,1=載客,2=駐車,3=停運,4=其它
GPS時間:格式yyyymmddhhnnss,北京時間
GPS經度:格式ddd.ddddddd,以度為單位。
GPS緯度:格式dd.ddddddd,以度為單位。
GPS速度:格式ddd,取值000-255內整數,以公里/小時為單位。
GPS方位:格式ddd,取值000-360內整數,以度為單位。
GPS狀態:0=無效,1=有效
結束串:回車符+換行符
數據舉例:
154747,4,2,20121130001607,116.6999512,39.9006233,0,128,1
078245,4,0,20121130001610,116.3590469,39.9909782,0,92,1
194086,4,1,20121130001610,116.5017776,40.0047951,25,220,1
那么只需要將上述的數據字段與數據對上就行了,一行為一條數據記錄。
首先編寫創建數據庫與表命令:
create database loaddb;
CREATE TABLE loadTable(id int primary key not null auto_increment,
carflag VARCHAR(6),touchevent CHAR(1),opstatus CHAR(1),gpstime DATETIME,
gpslongitude DECIMAL(10,7),gpslatitude DECIMAL(9,7),gpsspeed TINYINT,
gpsorientation SMALLINT,gpsstatus CHAR(1))engine=MyISAM;
注意:上述選擇了MyISAM引擎是因為load命令使用的時候,保證數據插入的效率!
3.1 load data infile
load data infile在導入大數據場景下非常的快!具體的說明后面會在比較的時候詳細說,這里說一下使用語法,如下:
load data local infile "/home/light/mysql/gps1.txt" into table loadTable fields terminated by ',' lines terminated by "\n" (carflag, touchevent, opstatus,gpstime,gpslongitude,gpslatitude,gpsspeed,gpsorientation,gpsstatus);
在使用這個命令的時候,是在MySQL的clinet端使用,登陸后敲這個命令即可!在數據字段描述中大家會看到幾個關鍵點:以逗號為分隔符,以回車換行符,對應於上述代碼是:
fields terminated by ',' lines terminated by "\n"
注意:更換自己的數據集路徑!
3.2 Python 批量插入
Python單條插入使用的是pymysql庫。下面是部分代碼,完整代碼見:
批量提交源碼
with open('/home/light/mysql/gps1.txt', 'r') as fp:
for line in fp:
...
...
...
count += 1
if count and count%70000==0:
# 執行多行插入,executemany(sql語句,數據(需一個元組類型))
self.cur.executemany(sql, data_list)
# 提交數據,必須提交,不然數據不會保存
self.conn.commit()
data_list = []
print("提交了:" + str(count) + "條數據")
if data_list:
# 執行多行插入,executemany(sql語句,數據(需一個元組類型))
self.cur.executemany(sql, data_list)
# 提交數據,必須提交,不然數據不會保存
self.conn.commit()
print("提交了:" + str(count) + "條數據")
self.cur.close() # 關閉游標
self.conn.close() # 關閉pymysql連接
上述有個關鍵點需要說明一下:
(1)使用executemany而非execute,這個提交速度要快!(2)使用批量插入,而非單條插入提交,這樣會提升效率!
3.3 Python 多線程插入
原始數據為一個gps1.txt文件,這個數據太大,如果直接使用多線程插入,不太方便,所以先使用文件切分方法,然后進行多線程的插入。
關於文件切分,可以點擊這里:文件切分源碼。
Python中使用多線程源碼
def multicore(self):
file_list = [1,2324,4648,6972,9298]
m1 = mp.Process(target=self.run, args=(file_list[0],file_list[1],'m1',))
m2 = mp.Process(target=self.run, args=(file_list[1]+1,file_list[2],'m2',))
m3 = mp.Process(target=self.run, args=(file_list[2]+1,file_list[3],'m3',))
m4 = mp.Process(target=self.run, args=(file_list[3]+1,file_list[4],'m4',))
m1.start()
m2.start()
m3.start()
m4.start()
m1.join()
m2.join()
m3.join()
m4.join()
具體插入思路是使用四個線程分別讀取每個區間段的數據,然后再對數據進行批量插入!如果這一塊不懂的伙伴,歡迎留言哈~
3.4 MySQL數據導入方法對比
★
load命令與普通的insert區別
”
相同點不同點兩者都是通過讀取本地txt文件,按照相同的分隔來讀取進行插入。程序插入法實質為insert語句間接執行。load data設計用於在單個操作中大量加載表格數據。
★
效率比較
”
兩者耗時如下:
第一種:load data (這里截取的是Innodb引擎表的插入結果,當使用MyISAM時,會比現在還快!)

enter image description here
用時1h11分。
第二種:程序插入法(這里只截取了批量插入的!)

用時:27322.45/36=7.58h
上述對比可知,load data效率非常高,原因在於使用的是load data infile方式,而第二種則為傳統的insert方式。
究其根源主要是MySQL內部對於load 和 insert的處理機制不同。
Load的處理機制是:在執行load之前,會關掉索引,當load全部執行完成后,再重新創建索引.
Insert的處理機制是:每插入一條則更新一次數據庫,更新一次索引.
另外,load與insert的不同還體現在load省去了sql語句解析,sql引擎處理,而是直接生成文件數據塊,所以會比Insert快很多.
4.同步利器
4.1 簡單粗暴Sqoop
首先來回顧一下Sqoop架構圖:

架構圖
這里大家記住一個規則:大數據需要切分!如果不切分,這個億級數據直接導入會崩潰!!!
★
什么是Sqoop?
”
Sqoop是一個用來將Hadoop和關系型數據庫中的數據相互轉移的開源工具,可以將一個關系型數據庫中的數據導進到Hadoop的HDFS或者HBase等。
sqoop核心參數與代碼解釋:
sqoop import --connect jdbc:mysql://localhost:3306/loaddb --username root --password xxxx --query "${sql}" --hbase-row-key id --hbase-create-table --column-family info --hbase-table mysql_data --split-by id -m 4
--connect 指定連接的數據庫,如果你的數據庫不是本地的,記得修改地址!--username 用戶名 --password 密碼 --query sql語句 --hbase-row-key 指定rowkey,如果存在則修改為該值 --hbase-create-table 創建Hbase表 --column-family 列簇 --hbase-table hbase表名
注意:當-m 設置的值大於1時,split-by必須設置字段!
由於數據太大,需要分片導入,具體導入代碼見倉庫:
up=185941000
for((i=1; i>0; i++))
do
start=$(((${i} - 1) * 40000 + 1))
end=$((${i} * 40000))
if [ $end -ge $up ]
then
end=185941000
fi
sql="select id,carflag, touchevent, opstatus,gpstime,gpslongitude,gpslatitude,gpsspeed,gpsorientation,gpsstatus from loaddb.loadTable1 where id>=${start} and id<=${end} and \$CONDITIONS";
sqoop import --connect jdbc:mysql://localhost:3306/loaddb --username root --password xxxx --query "${sql}" --hbase-row-key id --hbase-create-table --column-family info --hbase-table mysql_data --split-by id -m 4
echo Sqoop import from: ${start} to: ${end} success....................................
if [ $end -eq $up ]
then
break
fi
done
思路是每隔4萬導入一次,當然您也可以修改。
耗時:(使用linux的time統計bash腳本運行時間)

enter image description here
導入結果:

enter image description here
如果遇到問題,顯示虛擬內存溢出,不斷新開進程,殺死之前的進程,解決方案:關閉虛擬內存。

enter image description here
修改yarn-site.xml
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
以上就是MySQL同步Hbase方案1。
4.2 Kafka-thrift同步

enter image description here
4.2.1 binlog
binlog是sever層維護的一種二進制日志,與innodb引擎中的redo/undo log是完全不同的日志。
可以簡單的理解該log記錄了sql標中的更新刪除插入等操作記錄。通常應用在數據恢復、備份等場景。
★
開啟binlog
”
對於我的mysql的配置文件在下面這個文件夾,當然直接編輯my.cnf也是可以的。
vi /etc/mysql/mysql.conf.d/mysqld.cnf
對配置文件設置如下:

openbinlog
★
查看是否啟用
”
進入mysql客戶端輸入:
show variables like '%log_bin%';

binlog
★
binlog介紹
”
我的log存放在var下面的log的mysql下面:

loglook
在mysql-bin.index中包含了所有的log文件,比如上述圖就是包含了1與2文件,文件長度超過相應大小就會新開一個log文件,索引遞增,如上面的000001,000002。
★
binlog實戰
”
首先創建一個表:
create table house(id int not null primary key,house int,price int);
向表中插入數據:
insert into loaddb.house(id,house,price) values(1,2,3);
上面提到插入數據后,binlog會更新,那么我們去查看上面log文件,應該會看到插入操作。
Mysql binlog日志有ROW,Statement,MiXED三種格式;
set global binlog_format='ROW/STATEMENT/MIXED'
命令行:
show variables like 'binlog_format'

row
對於mysql5.7的,binlog格式默認為ROW,所以不用修改。
那么為何要了解binlog格式呢,原因很簡單,我要查看我的binlog日志,而該日志為二進制文件,打開后是亂碼的。對於不同的格式,查看方式不一樣!
對於ROW模式生成的sql編碼需要解碼,不能用常規的辦法去生成,需要加上相應的參數,如下代碼:
sudo /usr/bin/mysqlbinlog mysql-bin.000002 --base64-output=decode-rows -v
使用mysqlbinlog工具查看日志文件:

binlog
4.2.2 啟動thrift接口
thrift為其他語言與hbase操縱接口。啟動目的為后面數據插入做准備。

enter image description here
4.2.3 kafka-thrift流程小結
使用github倉庫代碼將原始數據進行每2w一個文件切分!
切分輸出:

split

上述切分速度非常快,2分鍾左右即可切完,可以自定義文件大小。
編寫Kafka數據入Hbase,完整代碼見github倉庫代碼:
def batchTokafka(self,start_time,table_name):
table = self.conn.table(table_name)
i = 1
with table.batch(batch_size=1024*1024) as bat:
for m in self.consumer:
t = time.time()
database = json.loads(m.value.decode('utf-8'))["database"]
name = json.loads(m.value.decode('utf-8'))["table"]
row_data = json.loads(m.value.decode('utf-8'))["data"]
if database=='loaddb'and name == 'sqlbase1':
row_id = row_data["id"]
row = str(row_id)
print(row_data)
del row_data["id"]
data = {}
for each in row_data:
neweach = 'info:' + each
data[neweach] = row_data[each]
data['info:gpslongitude'] = str(data['info:gpslongitude'])
data['info:gpslatitude'] = str(data['info:gpslatitude'])
data['info:gpsspeed'] = str(data['info:gpsspeed'])
data['info:gpsorientation'] = str(data['info:gpsorientation'])
# self.insertData(table_name, row, data)
print(data)
bat.put(row,data)
if i%1000==0:
print("===========插入了" + str(i) + "數據!============")
print("===========累計耗時:" + str(time.time() - start_time) + "s=============")
print("===========距離上次耗時"+ str(time.time() - t) +"=========")
i+=1
上述運行后,開始MySQL數據插入,這里插入采用4個多進程進行程序插入,速度非常快。
當MySQL數據在插入的同時,數據流向如下:
mysql插入->入庫mysql->記錄binlog->maxwell提取binlog->返回json給kafka->kafka消費端通過thrift接口->寫入hbase。
上述同步的結果如下,為了明確是否真正數據同步,只看了一條數據,作為驗證。
★
多個進程插入圖
”

mutprocess
★
kafka消費入hbase圖
”

enter image description here
★
MySQL數據圖
”

enter image description here
★
Hbase數據圖
”

enter image description here
以上就是從Mysql到Hbase的同步方案2。
4.3 Kafka-Flink

enter image description here
4.3.1 實時同步Flink
方案3為方案2的改進,上述是通過Python寫入Hbase,這里改成java,並使用最新的流處理技術:Flink。
Flink在ETL場景中使用頻繁,非常適合數據同步,於是在這個方案中采用Flink進行同步。
核心代碼實現,完整代碼見github倉庫地址:
SingleOutputStreamOperator<Student> student = env.addSource(
new FlinkKafkaConsumer011<>(
"test", //這個 kafka topic 需要和上面的工具類的 topic 一致
new SimpleStringSchema(),
props)).setParallelism(9)
.map(string -> JSON.parseObject(string, Student.class))
.setParallelism(9);
long start =System.currentTimeMillis();
student.timeWindowAll(Time.seconds(3)).apply(new AllWindowFunction<Student,
List<Student>, TimeWindow>() {
@Override
public void apply(TimeWindow window, Iterable<Student> values,
Collector<List<Student>> out) throws Exception {
ArrayList<Student> students = Lists.newArrayList(values);
if (students.size() > 0) {
System.out.println("1s內收集到 mysql表 的數據條數是:"
+ students.size());
long end =System.currentTimeMillis();
System.out.printf("已經用時time:%d\n",end-start);
out.collect(students);
}
}
}).addSink(new SinkToHBase()).setParallelism(9);
使用Flink進行批量入Hbase。
4.3.2 Flink小結
首先啟動maxwell與kafka,hbase也要啟動,接着在數據寫入端,可以采用load data infile或者python程序插入法進行數據插入,數據會通過maxwell到kafka再到Flink,然后sink到Hbase。
★
插入端為load data infile的同步
”

★
插入端為Python程序的同步
”

5.Phoenix組件和原生Hbase查詢的時間性能對比
- 原生Hbase查詢時間:
count 'mysql_data'

Hbase查詢時間為3856秒大約1小時7分鍾
- Hbase查詢優化
count 'mysql_data', INTERVAL => 10000000
每隔一千萬查詢一次:

enter image description here
間隔查詢3372.5740秒,大約耗時:56分鍾。
- 協處理器

enter image description here
耗時:1874188毫秒=1874.188s秒,大約31分鍾。
- Phoenix查詢時間:

可以看到Hbase查詢時間為3956秒大約1小時多一點。
而Phoenix查詢時間為2015.033秒,大約33分鍾左右。
綜上可得出,速度快慢**:協處理器>Phoenix查詢>間隔count>普通count>全表scan**。
6.同步效率對比
sqoop導入,大約50h左右。kafka-thrift單條插入約等於sqoop導入。kafka-thrift批量插入,大約7h。kafka-flink,大約3-7h。
不同的同步方式,大家可以看到效率有着明顯的差別,其中使用Flink效率最高,並且如果自己電腦是集群模式,那么效率就會更加的高!
優化點:Flink窗口收集設置,上游插入速度調整,下游接收調整等。
最后,幾點策略總結如下:
- 大數據需分割、批量插入
- 插入有序
- phoenix大數據查詢需設超時時間。
- Flink最穩定、效率最高、根據計算機性能影響。
- Python thrift,可以批量與單條插入結合。
- Sqoop需切分、虛擬內存需關閉。
- HBase若崩潰,趕緊查Zookeeper。