前言
在當今每日信息量巨大的社會中,源源不斷的數據需要被安全的存儲.等到數據的規模越來越大的時候,也許瓶頸就來了,沒有存儲空間了.這時候怎么辦,你也許會說,加機器解決,顯然這是一個很簡單直接但是又顯得有些欠缺思考的辦法.無謂的加機器只會帶來無限上升的成本消耗,更好的辦法應該是做到更加精細化的數據存儲與管理,比如說非常典型的冷熱數據的存儲.對於巨大的長期無用的冷數據而言,應該用性能偏弱,但是磁盤空間富余的機器存,熱數據則反之.數據的分類存儲一定會帶來數據的同步問題,假若我有2套集群,1個是線上的正在使用的集群,另外1個則是冷數據集群,我如何做定期的數據同步並且同時對業務方的使用影響完全透明呢?本文就給大家闡述一下本人的一個解決方案,供大家參考.
數據遷移使用場景
上小節中說到的冷熱數據的同步只是數據遷移的一個表現場景,那么數據遷移還有其他哪些使用場景呢,如下:
- 冷熱集群數據分類存儲,詳見上述描述.
- 集群數據整體搬遷.當公司的業務迅速的發展,導致當前的服務器數量資源出現臨時緊張的時候,為了更高效的利用資源,會將原A機房數據整體遷移到B機房的,原因可能是B機房機器多,而且B機房本身開銷較A機房成本低些等.
- 數據的准實時同步.數據的准實時同步與上一點的不同在於第二點可以一次性操作解決,而准實時同步需要定期同步,而且要做到周期內數據基本完全一致.數據准實時同步的目的在於數據的雙備份可用,比如某天A集群突然宣告不允許再使用了,此時可以將線上使用集群直接切向B的同步集群,因為B集群實時同步A集群數據,擁有完全一致的真實數據和元數據信息,所以對於業務方使用而言是不會受到任何影響的.
上述3個使用場景中,其中第一點相比較於第二,三點來說可能稍微容易一些,但是想要完全做好也不簡單.第三點數據的實時同步想比較第二點來說更加實際一些.因為如果公司要准備集群數據遷移了,一般都會提前通知,然后做逐步遷移,而且也肯定不會讓原集群停止服務,所以采用數據慢慢同步的方式,等到數據徹底同步完了,才最終實現切換,達到最終的遷移目標.
數據遷移要素考量
當要做大規模的數據遷移的時候,需要做很多的前期准備工作,而且需要對很多因素,指標進行考量.以下是幾個主要指標:
1.Bandwidth-帶寬
在做大規模數據量的同步過程中,如何控制同步數據過程中所占用的網絡帶寬就顯得非常的重要.帶寬用的多了,會影響到線上業務的任務運行,帶寬用的少了又會導致數據同步過慢的問題.所以這里會引發出另外一個問題,對於帶寬的限流.也就是說,我要保證我的數據同步程序能保證限定在指定的網絡傳輸速率下,如果你不做任何處理的話,那結果基本上就是網絡有多少帶寬我就用多少帶寬的局面.
2.Performance-性能
性能問題同樣也是一個很關鍵的問題,是采用簡單的單機程序?還是多線程的性能更佳的分布式程序?顯然后者是我們更想要的.
3.Data-Increment-增量同步
當TB,PB級別的數據需要同步的時候,如果每次以全量的方式去同步數據,結果一定是非常糟糕.增量方式的同步會是不錯的選擇,那么哪些情況下會導致數據發生增量變動呢
- 原始數據文件進行了Append追加寫
- 原始數據文件被delete刪除或rename重命名
可能會有人好奇這里為什么沒有對原始數據進行改動的情況,這種case也會造成數據的變動啊,因為一般在海量數據存儲系統中,例如HDFS,一般不會在原文件內容上做修改,要么繼續追加寫,要么刪除文件,不會有類似RandomAccessFile的隨機寫的功能,所以做增量數據同步,只要考慮上述2個條件即可.上述條件中的第二點是非常容易判斷出的,通過定期的快照文件或元信息文件一比就出來了,但是對於文件是否被進行了追加寫或是其他的外界主動的修改操作的時候,我們如何進行判斷呢,下面給出2個步驟:
- 第一步: 先比較文件大小,如果2個階段文件大小發生改變,截取對應原始長度部分進行checksum比較,如果此checksum不變,則此文件必定發生過改變.
- 第二步: 如果文件大小一致,則計算相應的checksum,然后比較2者的checksum.
這種方式算得上是最保險的.
4.Syncable-數據遷移的同步性
數據遷移的過程中需要保證周期內數據是一定能夠同步完的,不能差距太大.比如A集群7天內的增量數據,我只要花半天就可以完全同步到B集群,然后我又可以等到下周再次進行同步.最可怕的事情在於A集群的7天內的數據,我的程序花了7天還同步不完,然后下一個周期又來了,這樣就無法做到准實時的一致性.其實7天還是一個比較大的時間,最好是能達到按天同步.
HDFS數據遷移解決方案: DistCp的使用
上面分析了很多數據遷移中的很多使用場景和可能出現的問題.但是從這里開始,是一個分水嶺了,下部分的文章主要闡述HDFS中的數據遷移解決方案,面對上文中提到的諸多問題,HDFS中到底應該如何解決.如果你不是HDFS,Hadoop的專家,可能問題看起來有點棘手,但是沒有關系,Hadoop內部專門開發了相應的工具,DistCp.在DistCp工具在HDFS中的定位就是來干這件事情的,從source filesystem到target filesystem的數據拷貝.DistCp在hadoop-tools工程下,作為獨立子工程存在.在官方注釋中,對於DistCp的解釋如下:
DistCp is the main driver-class for DistCpV2. For command-line use, DistCp::main() orchestrates the parsing of command-line parameters and the launch of the DistCp job. For programmatic use, a DistCp object can be constructed by specifying options (in a DistCpOptions object), and DistCp::execute() may be used to launch the copy-job. DistCp may alternatively be sub-classed to fine-tune behaviour.
大意是通過命令行附帶參數的形式,構造出DistCp的job,然后執行此Job.所以從這里可以知道,拷貝任務本身是一個MR的Job,已經把Hadoop本身的分布式執行的特性用上了.
DistCp優勢特性
鑒於DistCp的特殊使用場景,程序設計者在此工具代碼中添加了很多的獨到的設計.下面針對上文提到的一些要素進行相應的闡述:
1.帶寬限流
DistCp是支持帶寬限流的,使用者可以通過命令參數bandwidth來為程序進行限流,原理類似於HDFS中數據Balance程序的限流.但是個人感覺做的比Balance稍微簡化了一些.DistCp中相關類是ThrottledInputStream,在每次讀操作的時候,做一些限流判斷:
/** {@inheritDoc} */ @Override public int read() throws IOException { throttle(); int data = rawStream.read(); if (data != -1) { bytesRead++; } return data; }
然后在throttle的方法中進行當前傳輸速率的判斷,如果過快會進行一段時間的睡眠來降低總平均速率
private void throttle() throws IOException { while (getBytesPerSec() > maxBytesPerSec) { try { Thread.sleep(SLEEP_DURATION_MS); totalSleepTime += SLEEP_DURATION_MS; } catch (InterruptedException e) { throw new IOException("Thread aborted", e); } } }
相關的帶寬限流,可以看我的另外一篇文章Hadoop內部的限流機制.
2.增量數據同步
對於增量數據同步的需求,在DistCp中也得到了很好的實現.通過update,append和*diff*2個參數能很好的解決.官方的參數使用說明:
- Update: Update target, copying only missing files or directories
- Append: Reuse existing data in target files and append new data to them if possible.
- Diff: Use snapshot diff report to identify the difference between source and target.
第一個參數,解決了新增文件目錄的同步,第二參數,解決已存在文件的增量更新同步,第三個參數解決刪除或重命名文件的同步.這里需要額外解釋一下diff的使用需要設置2個不同時間的snapshot進行對比,產生相應的DiffInfo.在獲取快照文件的變化時,只會選擇出DELETE和RENAME這2種類型的變化信息.
static DiffInfo[] getDiffs(SnapshotDiffReport report, Path targetDir) { List<DiffInfo> diffs = new ArrayList<>(); for (SnapshotDiffReport.DiffReportEntry entry : report.getDiffList()) { // 只判斷刪除和重命名的類型 if (entry.getType() == SnapshotDiffReport.DiffType.DELETE) { final Path source = new Path(targetDir, DFSUtil.bytes2String(entry.getSourcePath())); diffs.add(new DiffInfo(source, null)); } else if (entry.getType() == SnapshotDiffReport.DiffType.RENAME) { final Path source = new Path(targetDir, DFSUtil.bytes2String(entry.getSourcePath())); final Path target = new Path(targetDir, DFSUtil.bytes2String(entry.getTargetPath())); diffs.add(new DiffInfo(source, target)); } } return diffs.toArray(new DiffInfo[diffs.size()]); }
在文件數據追加寫的判斷邏輯上,DistCp中還是做了很精細的判斷的.首先是判斷是否可以跳過文件當大小不變的情況
private boolean canSkip(FileSystem sourceFS, FileStatus source, FileStatus target) throws IOException { if (!syncFolders) { return true; } boolean sameLength = target.getLen() == source.getLen(); boolean sameBlockSize = source.getBlockSize() == target.getBlockSize() || !preserve.contains(FileAttribute.BLOCKSIZE); // 如果是同大小並且blockSize的大小也一樣,則繼續進行checksum的判斷 if (sameLength && sameBlockSize) { return skipCrc || DistCpUtils.checksumsAreEqual(sourceFS, source.getPath(), null, targetFS, target.getPath()); } else { return false; } }
其次是判斷是否可以進行追加寫
... // 判斷是否可以跳過此文件 if (canSkip(sourceFS, source, targetFileStatus)) { return FileAction.SKIP; } else if (append) { // 如果是設置了追加寫的方式,首先獲取原目標文件的大小 long targetLen = targetFileStatus.getLen(); // 如果原目標文件大小小於現在的源文件大小,說明源文件進行了新的寫操作 if (targetLen < source.getLen()) { // 計算源文件對應目標文件大小的文件checksum FileChecksum sourceChecksum = sourceFS.getFileChecksum( source.getPath(), targetLen); // 如果源文件對應長度的數據的checksum與目標文件checksum完全一致, // 表明源文件多出的數據完全是新寫入的,前面的數據沒有變動,支持追加寫 if (sourceChecksum != null && sourceChecksum.equals(targetFS.getFileChecksum(target))) { // We require that the checksum is not null. Thus currently only // DistributedFileSystem is supported return FileAction.APPEND; } // 如果checksum發生了變化,說明源文件前面部分的數據發生了變動,則將會進行 // OVERWRITE覆蓋的動作 } } } return FileAction.OVERWRITE; ...
並沒有直接根據大小的變化作為根本依據,大小發生變化了,還要再對之前的對應長度的數據做checksum的驗證.
3.高效的性能
第三點關於DistCp的性能問題我想主要分析一下.因為前2點的特性通過普通的程序優化優化也能夠實現,但是在第三點的性能特性上,我想DistCp一定具有他獨到的優勢的.
(1).執行的分布式特性
之前在上文中已經提到過,DistCp本身會構造成一個MR的Job.他是一個純由Map Task構成的Job,注意是沒有Reduce過程的.所以他能夠把集群資源利用起來,集群閑下來的資源越多,他跑的越快.下面是Job的構造過程:
/** * Create Job object for submitting it, with all the configuration * * @return Reference to job object. * @throws IOException - Exception if any */ private Job createJob() throws IOException { String jobName = "distcp"; String userChosenName = getConf().get(JobContext.JOB_NAME); if (userChosenName != null) jobName += ": " + userChosenName; Job job = Job.getInstance(getConf()); job.setJobName(jobName); job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions)); job.setJarByClass(CopyMapper.class); configureOutputFormat(job); // 設置特殊定制的CopyMapper的map類型 job.setMapperClass(CopyMapper.class); // 無Reduce Task job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(CopyOutputFormat.class); job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); job.getConfiguration().set(JobContext.NUM_MAPS, String.valueOf(inputOptions.getMaxMaps())); if (inputOptions.getSslConfigurationFile() != null) { setupSSLConfig(job); } inputOptions.appendToConf(job.getConfiguration()); return job; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
(2).高效的MR組件
高效的MR組件的意思DistCp在相應的Job時,提供了針對此類型任務的Map Class,InputFormat和OutputFormat,分別是CopyMapper, DynamicInputFormat, CopyOutputFormat.這三者MR設置類型與普通的MR類型有什么區別呢,答案在下面:
DynamicInputFormat implements the "Worker pattern" for DistCp. Rather than to split up the copy-list into a set of static splits, the DynamicInputFormat does the following: 1. Splits the copy-list into small chunks on the DFS. 2. Creates a set of empty "dynamic" splits, that each consume as many chunks as it can. This arrangement ensures that a single slow mapper won't slow down the entire job (since the slack will be picked up by other mappers, who consume more chunks.) By varying the split-ratio, one can vary chunk sizes to achieve different performance characteristics.
- 1
- 2
- 3
- 4
- 5
- 6
- 1
- 2
- 3
- 4
- 5
- 6
以上注釋強調了2點,DynamicInputFormat類會將input-file輸入文件分成很多的小的chunk,然后由這些chunk構成動態的”dynamic”的splits,然后盡可能的讓map task消費掉,而不是傳統的將輸入文件分割成固定的spilts.而且前者不會造成任何慢的map拖累整個Job的運行.保證了哪個map消費的塊,那就消費更多spilt的原則.其中具體的原理讀者可自行到org.apache.hadoop.tools.mapred.lib包下的代碼中進行分析.下面是本人做的一張DistCp Job結構圖:
Hadoop DistCp命令的使用
前面花了大量的篇幅闡述了DistCp工具的強大用處,最后給出使用幫助信息,輸入hadoop distcp命令即可獲取幫助信息:
$ hadoop distcp
usage: distcp OPTIONS [source_path...] <target_path>
OPTIONS
-append Reuse existing data in target files and append new data to them if possible -async Should distcp execution be blocking -atomic Commit all changes or none -bandwidth <arg> Specify bandwidth per map in MB -delete Delete from target, files missing in source -diff <arg> Use snapshot diff report to identify the difference between source and target -f <arg> List of files that need to be copied -filelimit <arg> (Deprecated!) Limit number of files copied to <= n -i Ignore failures during copy -log <arg> Folder on DFS where distcp execution logs are saved -m <arg> Max number of concurrent maps to use for copy -mapredSslConf <arg> Configuration for ssl config file, to use with hftps:// -overwrite Choose to overwrite target files unconditionally, even if they exist. -p <arg> preserve status (rbugpcaxt)(replication, block-size, user, group, permission, checksum-type, ACL, XATTR, timestamps). If -p is specified with no <arg>, then preserves replication, block size, user, group, permission, checksum type and timestamps. raw.* xattrs are preserved when both the source and destination paths are in the /.reserved/raw hierarchy (HDFS only). raw.* xattrpreservation is independent of the -p flag. Refer to the DistCp documentation for more details. -sizelimit <arg> (Deprecated!) Limit number of files copied to <= n bytes -skipcrccheck Whether to skip CRC checks between source and target paths. -strategy <arg> Copy strategy to use. Default is dividing work based on file sizes -tmp <arg> Intermediate work path to be used for atomic commit -update Update target, copying only missingfiles or directories
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
其中source_path,taget_path需要帶上地址前綴以區分不同的集群.例如
hadoop distcp hdfs://nn1:8020/foo/a hdfs://nn2:8020/bar/foo
- 1
- 1
這表示從nn1集群拷貝數據到nn2集群.總體而言,distCp的可選參數還是做到了相當細粒度的控制,比如skipcrccheck的選項,可以跳過crc checksum的校驗,checksum的跳過可能會影響到distCp數據完整性的判斷,但同時此配置的關閉會使拷貝過程更加高效一些.
當然說了這么多,跨機房數據遷移的工作所一定還會出現沒有預見到的問題,其中的難度和困難絕對是非常具有挑戰性的,可能我們還要利用DistCp的功能然后搭配上自己的解決方案才能做出更棒的方案.希望本文能夠大家帶來收獲.