把kafka數據從hbase遷移到hdfs,並按天加載到hive表(hbase與hadoop為不同集群)


需求:
由於我們用的阿里雲Hbase,按存儲收費,現在需要把kafka的數據直接同步到自己搭建的hadoop集群上,(kafka和hadoop集群在同一個局域網),然后對接到hive表中去,表按每天做分區

一、首先查看kafka最小偏移量(offset)

/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.210:9092 -topic test --time -2 (--time -1為查看kafka最大偏移量)
test:2:0
test:1:0
test:0:0

顯示三個partition最小offset都為0,這樣的話我們可以直接從kafka把數據遷移到HDFS上面

還有另外一種情況:

如果最小offset不為0,需要先把hbase上面的數據導入到HDFS上,再把offset設置為之前存放在zookeeper上面的offset

方法:

1). 首先查看存放在zookeeper上面的offset
/usr/local/zookeeper/bin/zkCli.sh -server 192.168.1.210:2181
#] get /consumers/test_group/offsets/test/0
16051516
cZxid = 0x10050df5c
ctime = Wed Sep 19 16:15:50 CST 2018
mZxid = 0x100691806
mtime = Thu Nov 08 11:30:08 CST 2018
pZxid = 0x10050df5c
cversion = 0
dataVersion = 6433
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 8
numChildren = 0
2). 把導入到Hbase的sparkstreaming任務kill掉,然后把hbase的數據全部導入到HDFS上
3). 然后編寫導入到HDFS上面的spark代碼,設置相同的group_id,則會讀取之前存放在zookeeper中的offset

 

二. 把數據導入到HDFS上面(實現代碼不在這里作展示)

我這邊保存的文件以日期為命名格式(每天凌晨執行一次,在sparkstreaming里面設置)
hdfs dfs -ls /data/res/20181108
drwxr-xr-x   - hadoop    hadoop          0 2018-11-08 00:00 /data/res/20181108/part-00000
drwxr-xr-x   - hadoop    hadoop          0 2018-11-08 00:00 /data/res/20181108/part-00001
drwxr-xr-x   - hadoop    hadoop          0 2018-11-08 00:00 /data/res/20181108/part-00002

 

三. 把數據進行getmerge,獲取每天的數據,然后導入到hive表中去

實現shell腳本,每天凌晨1點跑,下面按天分割文件,主要是防止當天會摻雜着昨天的數據

]$ cat hdfs2hive.sh 
#! /bin/sh

rm -f /data/hadoop/data/*

hdfs="/usr/local/hadoop/bin/hdfs"
hive="/usr/local/hive/bin/hive"

yesterday=`date +"%Y%m%d" -d "-1 days"`
today=`date +"%Y%m%d"`

## 合並hdfs文件夾到本地
hdfs_dir="/data/soda_yx/$yesterday"

res=`$hdfs dfs -ls $hdfs_dir`

if [ -n "$res" ];then
    $hdfs dfs -getmerge $hdfs_dir "/data/hadoop/data/data/res.data"
    if [ $? -eq 0 ];then
        echo "merge to local file is success."
    fi
fi

## 按天過濾出文件
dir="/data/hadoop/data"
`cat $dir/res.data |awk -F"\t" '{if($36=="'"$yesterday"'") print $0}' > $dir/$yesterday`
`cat $dir/res.data |awk -F"\t" '{if($36=="'"$today"'") print $0}' > $dir/$today`
if [ $? -eq 0 ];then
    echo "filter file is success."
fi

## 插入數據到hive中去
if [ -n "$dir/$yesterday" ];then
    $hive -e "LOAD DATA LOCAL INPATH '$local_file' INTO TABLE xxx.soda_report partition(dt='$yesterday')"
    if [ $? -eq 0 ];then
        echo "Import local data to hive is success."
    fi
fi
   
if [ -n "$dir/$today" ];then
    $hive -e "LOAD DATA LOCAL INPATH '$dir/$today' INTO TABLE xxx.soda_report partition(dt='$today')"
    if [ $? -eq 0 ];then
        echo "Import local data to hive is success."
    fi
fi


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM