需求:
由於我們用的阿里雲Hbase,按存儲收費,現在需要把kafka的數據直接同步到自己搭建的hadoop集群上,(kafka和hadoop集群在同一個局域網),然后對接到hive表中去,表按每天做分區
一、首先查看kafka最小偏移量(offset)
/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.210:9092 -topic test --time -2 (--time -1為查看kafka最大偏移量) test:2:0 test:1:0 test:0:0
顯示三個partition最小offset都為0,這樣的話我們可以直接從kafka把數據遷移到HDFS上面
還有另外一種情況:
如果最小offset不為0,需要先把hbase上面的數據導入到HDFS上,再把offset設置為之前存放在zookeeper上面的offset
方法:
1). 首先查看存放在zookeeper上面的offset /usr/local/zookeeper/bin/zkCli.sh -server 192.168.1.210:2181 #] get /consumers/test_group/offsets/test/0 16051516 cZxid = 0x10050df5c ctime = Wed Sep 19 16:15:50 CST 2018 mZxid = 0x100691806 mtime = Thu Nov 08 11:30:08 CST 2018 pZxid = 0x10050df5c cversion = 0 dataVersion = 6433 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 8 numChildren = 0 2). 把導入到Hbase的sparkstreaming任務kill掉,然后把hbase的數據全部導入到HDFS上 3). 然后編寫導入到HDFS上面的spark代碼,設置相同的group_id,則會讀取之前存放在zookeeper中的offset
二. 把數據導入到HDFS上面(實現代碼不在這里作展示)
我這邊保存的文件以日期為命名格式(每天凌晨執行一次,在sparkstreaming里面設置) hdfs dfs -ls /data/res/20181108 drwxr-xr-x - hadoop hadoop 0 2018-11-08 00:00 /data/res/20181108/part-00000 drwxr-xr-x - hadoop hadoop 0 2018-11-08 00:00 /data/res/20181108/part-00001 drwxr-xr-x - hadoop hadoop 0 2018-11-08 00:00 /data/res/20181108/part-00002
三. 把數據進行getmerge,獲取每天的數據,然后導入到hive表中去
實現shell腳本,每天凌晨1點跑,下面按天分割文件,主要是防止當天會摻雜着昨天的數據
]$ cat hdfs2hive.sh #! /bin/sh rm -f /data/hadoop/data/* hdfs="/usr/local/hadoop/bin/hdfs" hive="/usr/local/hive/bin/hive" yesterday=`date +"%Y%m%d" -d "-1 days"` today=`date +"%Y%m%d"` ## 合並hdfs文件夾到本地 hdfs_dir="/data/soda_yx/$yesterday" res=`$hdfs dfs -ls $hdfs_dir` if [ -n "$res" ];then $hdfs dfs -getmerge $hdfs_dir "/data/hadoop/data/data/res.data" if [ $? -eq 0 ];then echo "merge to local file is success." fi fi ## 按天過濾出文件 dir="/data/hadoop/data" `cat $dir/res.data |awk -F"\t" '{if($36=="'"$yesterday"'") print $0}' > $dir/$yesterday` `cat $dir/res.data |awk -F"\t" '{if($36=="'"$today"'") print $0}' > $dir/$today` if [ $? -eq 0 ];then echo "filter file is success." fi ## 插入數據到hive中去 if [ -n "$dir/$yesterday" ];then $hive -e "LOAD DATA LOCAL INPATH '$local_file' INTO TABLE xxx.soda_report partition(dt='$yesterday')" if [ $? -eq 0 ];then echo "Import local data to hive is success." fi fi if [ -n "$dir/$today" ];then $hive -e "LOAD DATA LOCAL INPATH '$dir/$today' INTO TABLE xxx.soda_report partition(dt='$today')" if [ $? -eq 0 ];then echo "Import local data to hive is success." fi fi