HDFS集群PB級數據遷移方案-DistCp生產環境實操篇
作者:尹正傑
版權聲明:原創作品,謝絕轉載!否則將追究法律責任。
用了接近2個星期的時間,終於把公司的需要的大數據組建部署完畢了,當然,在部署的過程中踩了不少坑,自己也對系統,網絡,各個大數據生態圈常用軟件進行了調優操作,后期等我整理好筆記后會分享給大家參考的。集群是部署好了,但是沒有數據的話也沒有人會去用。因此我們需要把舊集群的數據遷移至新集群中(舊集群的數據都是存放在雲平台上的,而新集群),在遷移的過程中,參考網上的很多解決方案,最終選擇了distcp,官網文檔也是相當的友好啊,大家一看就懂(我在下文已經給出了相應的鏈接)。
溫馨提示:我的hadoop采用的是CDH方式部署的,對於不同Hadoop版本間的拷貝,用戶應該使用HftpFileSystem。 這是一個只讀文件系統,所以DistCp必須運行在目標端集群上(更確切的說是在能夠寫入目標集群的TaskTracker上)。 源的格式是 hftp://<dfs.http.address>/<path> (默認情況dfs.http.address是 <namenode>:50070)。
一.DistCp注意事項
1>.什么是DistCp
用官方的話解釋就是:DistCp(分布式拷貝)是用於大規模集群內部和集群之間拷貝的工具。 它使用Map/Reduce實現文件分發,錯誤處理和恢復,以及報告生成。 它把文件和目錄的列表作為map任務的輸入,每個任務會完成源列表中部分文件的拷貝。 由於使用了Map/Reduce方法,這個工具在語義和執行上都會有特殊的地方。 這篇文檔會為常用DistCp操作提供指南並闡述它的工作模型。
2>.使用方法
請參考官網:http://hadoop.apache.org/docs/r1.0.4/cn/distcp.html。
3>.DistCp的注意事項(摘自官網)
1>.DistCp會嘗試着均分需要拷貝的內容,這樣每個map拷貝差不多相等大小的內容。 但因為文件是最小的拷貝粒度,所以配置增加同時拷貝(如map)的數目不一定會增加實際同時拷貝的數目以及總吞吐量。 2>.如果沒使用-m選項,DistCp會嘗試在調度工作時指定map的數目 為 min (total_bytes / bytes.per.map, 20 * num_task_trackers), 其中bytes.per.map默認是256MB。 3>.建議對於長時間運行或定期運行的作業,根據源和目標集群大小、拷貝數量大小以及帶寬調整map的數目。 4>.對於不同Hadoop版本間的拷貝,用戶應該使用HftpFileSystem。 這是一個只讀文件系統,所以DistCp必須運行在目標端集群上(更確切的說是在能夠寫入目標集群的TaskTracker上)。 源的格式是 hftp://<dfs.http.address>/<path> (默認情況dfs.http.address是 <namenode>:50070)。 5>.Map/Reduce和副效應 像前面提到的(參考官網),map拷貝輸入文件失敗時,會帶來一些副效應。 5.1>.除非使用了-i,任務產生的日志會被新的嘗試替換掉。 5.2>.除非使用了-overwrite,文件被之前的map成功拷貝后當又一次執行拷貝時會被標記為 "被忽略"。 5.3>.如果map失敗了mapred.map.max.attempts次,剩下的map任務會被終止(除非使用了-i)。 5.4>.如果mapred.speculative.execution被設置為 final和true,則拷貝的結果是未定義的。
二.DistCp使用案例
[root@node105 ~]# hostname node105.yinzhengjie.org.cn [root@node105 ~]# [root@node105 ~]# free -h total used free shared buff/cache available Mem: 17G 2.2G 8.3G 11M 6.9G 14G Swap: 8.9G 0B 8.9G [root@node105 ~]# [root@node105 ~]# [root@node105 ~]# hadoop distcp usage: distcp OPTIONS [source_path...] <target_path> OPTIONS -append Reuse existing data in target files and append new data to them if possible -async Should distcp execution be blocking -atomic Commit all changes or none -bandwidth <arg> Specify bandwidth per map in MB -blocksperchunk <arg> If set to a positive value, fileswith more blocks than this value will be split into chunks of <blocksperchunk> blocks to be transferred in parallel, and reassembled on the destination. By default, <blocksperchunk> is 0 and the files will be transmitted in their entirety without splitting. This switch is only applicable when the source file system implements getBlockLocations method and the target file system implements concat method -copybuffersize <arg> Size of the copy buffer to use. By default <copybuffersize> is 8192B. -delete Delete from target, files missing in source -diff <arg> Use snapshot diff report to identify the difference between source and target -f <arg> List of files that need to be copied -filelimit <arg> (Deprecated!) Limit number of files copied to <= n -filters <arg> The path to a file containing a list of strings for paths to be excluded from the copy. -i Ignore failures during copy -log <arg> Folder on DFS where distcp execution logs are saved -m <arg> Max number of concurrent maps to use for copy -mapredSslConf <arg> Configuration for ssl config file, to use with hftps://. Must be in the classpath. -numListstatusThreads <arg> Number of threads to use for building file listing (max 40). -overwrite Choose to overwrite target files unconditionally, even if they exist. -p <arg> preserve status (rbugpcaxt)(replication, block-size, user, group, permission, checksum-type, ACL, XATTR, timestamps). If -p is specified with no <arg>, then preserves replication, block size, user, group, permission, checksum type and timestamps. raw.* xattrs are preserved when both the source and destination paths are in the /.reserved/raw hierarchy (HDFS only). raw.* xattrpreservation is independent of the -p flag. Refer to the DistCp documentation for more details. -rdiff <arg> Use target snapshot diff report to identify changes made on target -sizelimit <arg> (Deprecated!) Limit number of files copied to <= n bytes -skipcrccheck Whether to skip CRC checks between source and target paths. -strategy <arg> Copy strategy to use. Default is dividing work based on file sizes -tmp <arg> Intermediate work path to be used for atomic commit -update Update target, copying only missingfiles or directories [root@node105 ~]#
[root@calculation101 ~]# hdfs dfs -du -h /user/kuaikan/report_new/2018/10/
[root@node105 ~]# hostname node105.yinzhengjie.org.cn [root@node105 ~]# [root@node105 ~]# su hdfs [hdfs@node105 root]$ [hdfs@node105 root]$ hdfs dfs -mkdir -p /yinzhengjie/data/ [hdfs@node105 root]$ [hdfs@node105 root]$ hdfs dfs -chown -R root:root /yinzhengjie/data/ [hdfs@node105 root]$ [hdfs@node105 root]$ exit exit [root@node105 ~]# [root@node105 ~]# hdfs dfs -ls /yinzhengjie Found 1 items drwxr-xr-x - root root 0 2018-10-29 03:29 /yinzhengjie/data [root@node105 ~]#
[root@node105 ~]# jps 17617 NameNode 17985 DFSZKFailoverController 18406 Bootstrap 11047 Jps [root@node105 ~]# [root@node105 ~]# mkdir /yinzhengjie [root@node105 ~]# [root@node105 ~]# [root@node105 ~]# nohup time hadoop distcp hdfs://10.1.1.111:8020/user/kuaikan/report_new/2018/10/23 hdfs://node105.yinzhengjie.org.cn:8020//yinzhengjie/data >> /yinzhengjie/distcp.log 2>&1 & [1] 11125 [root@node105 ~]# [root@node105 ~]# jobs [1]+ Running nohup time hadoop distcp hdfs://10.1.1.111:8020/user/kuaikan/report_new/2018/10/23 hdfs://node105.yinzhengjie.org.cn:8020//yinzhengjie/data >> /yinzhengjie/distcp.log 2>&1 & [root@node105 ~]# [root@node105 ~]# jps 17617 NameNode 17985 DFSZKFailoverController 11126 DistCp ------>注意,開始拷貝的同時,distcp也啟動了相應的進程。 18406 Bootstrap 11357 Jps [root@node105 ~]# [root@node105 ~]# hostname node105.yinzhengjie.org.cn [root@node105 ~]# [root@node105 ~]# free -h total used free shared buff/cache available Mem: 17G 2.4G 8.1G 11M 7.0G 14G Swap: 8.9G 0B 8.9G [root@node105 ~]#
[root@node105 ~]# tail -100f /yinzhengjie/distcp.log
[root@node105 ~]# jobs [root@node105 ~]# [root@node105 ~]# jps 17617 NameNode 17985 DFSZKFailoverController 18406 Bootstrap 15097 Jps [root@node105 ~]# [root@node105 ~]# [root@node105 ~]# tail -35f /yinzhengjie/distcp.log
[root@node105 ~]# hdfs dfs -ls /yinzhengjie/data Found 1 items drwxr-xr-x - root root 0 2018-10-29 03:55 /yinzhengjie/data/23 [root@node105 ~]# [root@node105 ~]# hdfs dfs -du -h /yinzhengjie/data 22.0 G 66.1 G /yinzhengjie/data/23 [root@node105 ~]# [root@node105 ~]# hostname node105.yinzhengjie.org.cn [root@node105 ~]#
三.自定義腳本實現自動拷貝(結合我的生產環境)
我上面只是對distcp對簡單使用,生產環境我們需要編寫腳本,讓他自己遷移數據。下面是我在生產環境中使用的一個簡單的shell腳本:可供參考
1 #!/bin/bash 2 #@author :yinzhengjie 3 #blog:http://www.cnblogs.com/yinzhengjie 4 #EMAIL:y1053419035@qq.com 5 6 7 #判斷用戶輸入的參數是否合法,我這個腳本要求傳入3個參數,第一個參數是數據庫名稱,第二個參數是表面,第三個參數是具體的日期。 8 if [ $# -eq 3 ];then 9 vl=`expr length $3` 10 if [ $vl -ne '8' ];then 11 echo "`date` ERROR:Input '$dateValue' should be 'YYYYMMDD'" 12 exit 1 13 fi 14 dateValue=`date -d "0 day ago $3" +"%Y%m%d"` 15 else 16 dateValue=`date -d"-1 days" +"%Y%m%d"` 17 fi 18 19 20 #用來提示任務即將啟動,標志着任務的開始 21 echo "`date` copy work of table $2 on database $1 start, dateValue is $dateValue " 22 23 24 #將舊機群的數據拷貝至新集群,並將日志保存在:"/yinzhengjie/distcp.log" 25 #nohup time hadoop distcp hdfs://uhadoop-2mqmxu-master2:8020/user/hive/warehouse/$1.db/$2/dt=${dateValue} hdfs://node101.yinzhengjie.org.cn:8020/user/hive/warehouse/$1.db/$2 >> /yinzhengjie/distcp.log 2>&1 & 26 time hadoop distcp hdfs://uhadoop-2mqmxu-master2:8020/user/hive/warehouse/$1.db/$2/dt=${dateValue} hdfs://node101.yinzhengjie.org.cn:8020/user/hive/warehouse/$1.db/$2 27 28 #用來提示任務執行完畢,標志着拷貝完成 29 echo "`date` copy work of table $2 on database $1 done, dateValue is $dateValue "
注意,執行distcp命令是支持帶寬限速的,遷移數據我們走的是專線,但是distcp會將帶寬流量吃滿,導致其他的業務在高峰期使用這條專線的時候存在丟包的情況。感興趣的小伙伴可以調試一下他的相關參數。當然,測試起來會很麻煩,你得調試參數,還得使用監控軟件實時監控到專線帶寬的使用量。我索性在高峰期就不啟動distcp進程,寫了一個python腳本僅供大家參考。
1 #!/usr/bin/python 2 # -*- coding: utf-8 -*- 3 #@author :yinzhengjie 4 #blog:http://www.cnblogs.com/yinzhengjie/tag/python%E8%87%AA%E5%8A%A8%E5%8C%96%E8%BF%90%E7%BB%B4%E4%B9%8B%E8%B7%AF/ 5 #EMAIL:y1053419035@qq.com 6 7 8 import json 9 import subprocess 10 import threading 11 import time 12 from datetime import datetime, timedelta #我們將獲取時間的包倒入。 13 14 #獲取當前操作系統的時間 15 today = datetime.today() 16 17 18 19 #這個函數是用來操作系統運行命令的腳本,需要傳入2個參數,第一個參數是執行的命令,第二個參數是shell的布爾直,調用復雜的linux命令(比如:"df -h | grep /dev/sda1")的方法,需要加“shell=True” 20 def runCommand(cmdstring, isTrue=False): 21 cmdstring_list = cmdstring.split() 22 #調用復雜的linux命令的方法,需要加“shell=True”,表示將前面引號的內容放在一個終端(terminal)去執行, 23 p = subprocess.Popen(cmdstring_list, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,shell=isTrue) 24 #communicate() 等待任務結束,並將結果輸入的結果復制給變量。使用兩個out變量接受輸出結果,使用err變量接受error信息 25 out,err = p.communicate() 26 #需要注意的是這個不能保存命令輸出的結果,而是保存命令執行的結果喲!一般非“0”就表示命令沒有執行成功,而結果是“0”表示執行命令實成功的,但是命令的輸出結果是無法保存的!切記! 27 ret = p.returncode 28 return ret,out,err 29 30 31 32 #咱們這里的定義了一個線程類,主要是用來調用我們需要指定的Linux腳本。 33 class myThread(threading.Thread): 34 cmd='' 35 36 def __init__(self,cmd): 37 threading.Thread.__init__(self) 38 self.cmd = cmd 39 40 def run(self): 41 print datetime.today().__str__().split('.')[0] 42 print "Thread: '%s' started." % (self.cmd) 43 ret,out,err = runCommand(self.cmd) 44 print out 45 print err 46 print "Thread: '%s' finished." % (self.cmd) 47 48 49 50 # 顧名思義,這里定義獲取日期的函數,里面需要傳入一個參數,參數的默認值是1,也就是說默認你會獲取到一天以前的時間。比如當前的時間是"2018-10-30 15:19:23.620339",則返回:"20181029" 51 def getDate(delta=1): 52 #注意,timedelta(delta)函數可以指定一個周期,需要傳入一個int類型的參數,基本單位是day, 53 mydate = today - timedelta(delta) 54 #返回值是將mydate轉換成字符串,並用字符串的split方法將其變成一個數組並取得第一個元素,並將取出來的元素中含有"-"給去除掉。 55 return (mydate.__str__().split())[0].replace('-','') 56 57 58 # 該函數的功能和上面的getDate函數功能基本一樣,里面也需要傳入一個參數,參數的默認值是1,也就是說默認你會獲取到一天以前的時間。比如當前的時間是"2018-10-30 15:19:23.620339",則返回:"2018_10_29" 59 def getDate_ex(delta=1): 60 today = datetime.today() 61 mydate = today - timedelta(delta) 62 return (mydate.__str__().split())[0].replace('-','_') 63 64 65 #定義獲取當前小時的方法 66 def getDate_hour(): 67 return (datetime.today().__str__().split())[1].split(":")[0] 68 69 print getDate_hour() 70 71 date_list=[] 72 73 date_list_ex=[] 74 75 threads=[] 76 77 78 #設置總線程個數,根據你的服務器性能以及貸款的大小,選擇合適的線程數,由於我們專線帶寬就2G,還要保證kafka的傳輸帶寬,因此我這里給出3個。 79 total_thread_num=3 80 81 82 #業務邏輯如下:第一個參數是當前時間距離需要遷移的天數的時間,比如我從距離225天(溫馨提示:可以使用"date -d "225 days ago"進行運算。)開始拷貝數據,距離400天前時結束。 83 for delta in range(225,400): 84 while(True): 85 hour=getDate_hour() 86 current_thread_num = 0 87 for thread in threads: 88 if thread.isAlive(): 89 current_thread_num += 1 90 #由於遷移數據時,我們需要避免掉高峰期,我們的專線只有2G的帶寬,在高峰期傳輸的話,會導致kafka集群出現數據丟失的情況! 91 if (current_thread_num >= total_thread_num or hour in ('19','20','21','22')): 92 print "%s\tThreading pool is full. Waiting for 60s..." % (datetime.today().__str__().split('.')[0]) 93 time.sleep(30) 94 continue 95 else: 96 break 97 date=getDate(delta) 98 time.sleep(10) 99 #注意,下面的"/data/extract/tablecp/tablecp_storage.sh" 就是我上面關於distcp的shell腳本。 100 current_thread = myThread("/data/extract/tablecp/tablecp_storage.sh yingshi ad_sarrs_hour " + date) 101 current_thread.start() 102 threads.append(current_thread) 103 104 print datetime.today().__str__().split('.')[0], 105 print "\tAll task finished."
后記:由於我司帶寬專線僅有10G帶寬,在沒有其他業務時,遷移數據時速度還湊合,但是隨着業務不斷往我們的新建數據中心遷移時,發現帶寬是實時打滿的,導致日志收集出現丟包的情況,被迫研究了一下帶寬的限速,重新修改了一下腳本:

#!/bin/bash #@author :yinzhengjie #blog:http://www.cnblogs.com/yinzhengjie #EMAIL:y1053419035@qq.com if [ $# -eq 3 ];then vl=`expr length $3` if [ $vl -ne '8' ];then echo "`date` ERROR:Input '$dateValue' should be 'YYYYMMDD'" exit 1 fi dateValue=`date -d "0 day ago $3" +"%Y%m%d"` else dateValue=`date -d"-1 days" +"%Y%m%d"` fi echo "`date` copy work of table $2 on database $1 start, dateValue is $dateValue " hadoop distcp -Dmapred.jobtracker.maxtasks.per.job=1 -Dmapred.job.max.map.running=1 -Ddistcp.bandwidth=8192 hdfs://uhadoop-2mqmxu-master2:8020/user/hive/warehouse/$1.db/$2/dt=${dateValue} hdfs://10.1.3.101:8020/user/hive/warehouse/$1.db/$2 echo "`date` copy work of table $2 on database $1 done, dateValue is $dateValue "

Hadoop 1版本 distcp [OPTIONS] <srcurl> * <desturl> 選項: -p [rbugp] 狀態 r:復制數 b:塊大小 u:用戶 g:組 p:權限 t:修改和訪問時間 -p單獨相當於-prbugpt -i 忽略失敗 -basedir <basedir> 從<srcurl>復制文件時,使用<basedir>作為基本目錄 -log <logdir> 將日志寫入<logdir> -m <num_maps> 最大並發副本數 -overwrite 覆蓋目的地 -update 如果src大小與dst大小不同,則覆蓋 -skipcrccheck 不要使用CRC檢查來確定src是否是 不同於dest。 -copybychunk 剁碎和復制的文件 -f <urilist_uri> 將<urilist_uri>中的列表用作src列表 -filelimit <n> 將文件的總數限制為<= n -filelimitpermap <n> 每個地圖要復制的最大文件數 -sizelimit <n> 將總大小限制為<= n個字節 -sizelimitpermap <n> 每個映射要復制的最大字節數 -delete 刪除dst中存在的文件,但不在src中 -mapredSslConf <f> 映射器任務的SSL配置文件名 -usefastcopy 使用FastCopy(僅適用於DFS) 注1:如果設置了-overwrite或-update,則每個源URI和目標URI保持同級一致。 例如: hadoop distcp -p -update hdfs://A:9000//home/aa hdfs://B:9000//home/bb 支持的通用選項是 -conf <configuration file>指定應用程序配置文件 -D <property = value>給定屬性的使用值 -fs <local | namenode:port>指定一個namenode -jt <local | jobtracker:port>指定jobtracker在corona上 -jtold <local | jobtracker:port>指定jobtracker在mapreduce上 -files <逗號分隔的文件列表>指定要復制到map reduce cluster的逗號分隔文件 -libjars <逗號分隔的jars列表> 指定要包含在類路徑中的逗號分隔的jar文件。 -archives <逗號分隔的歸檔列表> 指定要在計算機上取消歸檔的逗號分隔的歸檔。
Hadoop 2版本 用法:distcp OPTIONS [source_path ...] <target_path> OPTIONS -append 重新使用目標文件中的現有數據並追加新的如果可能,給他們的數據 -async 應該是distcp執行阻塞 -atomic 提交所有更改或無 -bandwidth <arg> 以MB為單位指定每個map的帶寬,注意由於在互聯網數據傳輸都是以二進制形式傳輸,因此,我們將MB的文件需要轉換稱大Mb需要乘以八個比特位,因此1Gb = 1024MB = 1024MB * 8bits = 8192Mb -delete 從目標中刪除,源文件丟失 -diff <arg> 使用snapshot diff報告來標識源和目標之間的差異 -f <arg> 需要復制的文件列表 -filelimit <arg> (已棄用!)限制復制到<= n的文件數 -i 在復制期間忽略故障 -log <arg> DFS上的distcp執行日志文件夾保存 -m <arg> 要用於副本的最大並發map數 -mapredSslConf <arg> 配置ssl配置文件,用於hftps:// -overwrite 選擇無條件覆蓋目標文件,即使它們存在。 -p <arg> 保留源文件狀態(rbugpcaxt) (復制,塊大小,用戶,組,權限,校驗和類型,ACL,XATTR,時間戳) 如果-p是指定為no <arg>,然后保留復制,塊大小,用戶,組,權限,校驗和類型和時間戳。 原始的* xattrs是源和目的地都保留路徑位於/.reserved/raw層次結構中(HDF只要)。原始* xattrpreservation是獨立的-p標志。請參閱DistCp文檔更多細節。 -sizelimit <arg> (已棄用!)限制復制到<= n的文件數字節 -skipcrccheck 是否跳過源和源之間的CRC檢查目標路徑。 -strategy <arg> 復制策略使用。默認是分工基於文件大小 -tmp <arg> 要用於原子的中間工作路徑承諾 -update 更新目標,僅復制missingfiles或目錄