我是如何利用Hadoop做大規模日志壓縮的


背景

剛畢業那幾年有幸進入了當時非常熱門的某社交網站,在數據平台部從事大數據開發相關的工作。從日志收集、存儲、數據倉庫建設、數據統計、數據展示都接觸了一遍,比較早的趕上了大數據熱這波浪潮。雖然今天的人工智能的熱度要遠高於大數據,但是大家還是不能否定大數據在人工智能中不可取代的地位。

話回正題,當時遇到了一個需要解決的問題就是如何快速對日志進行壓縮。那時一天的日志量大概是3TB左右,共100+種日志,最大的一個日志一天要1TB,最小的日志只有幾十M。統計需求大部分是用HIVE完成,HIVE中的表每天建立一個分區,每個分區對應一種日志的壓縮文件(有天級和小時級)。

當時日志壓縮方式是一個日志一個日志進行壓縮,利用crontab進行任務並行,效率非常低。經常出現的情況是到了第二天中午12點鍾,前一天的日志還沒有壓縮完,統計需求就沒法用hive去做,報表數據就出不來,給我們的壓力很大。

也許有小伙伴說,hive可以利用前一天不經過壓縮的日志進行統計,后台慢慢進行日志壓縮,壓縮完成后在重新load一下分區不就ok了嗎?這個方案確實可行。但是當時的實際情況是,有好多的表load的都是壓縮后的數據,修改成本比較高(幾百張表)。不得已還是得盡量縮短壓縮時間,這個問題經過我的一番折騰,終於把日志壓縮完成時間提前到凌晨1點鍾之前,各種報表數據的統計可以在早晨八點鍾之前完成。接下來我就把詳細做法介紹給大家。

Hadoop並行壓縮

壓縮格式

首先我們就要討論壓縮格式,我們選擇的壓縮格式是bz2,原因是bz2算法支持分片壓縮合並:即每個小bz2文件頭尾相連拼接到一起就是一個大的bz2文件。map/reduce也支持對bz2文件的分塊:即利用多個map同時對壓縮文件的不同部分進行處理。當時也試過gzip算法,但是gzip沒法分片,hive生成的任務只會有一個map,統計效率低下。

壓縮方案

如圖所示,有兩個日志文件A.log和B.log需要壓縮,利用map/reduce並行處理這兩個日志。假設map/reduce自動為A.log和B.log分別生成3個map任務同時進行壓縮,每個map任務讀取日志文件的一部分並用bz2算法進行壓縮后寫入到集群的HDFS中。A.log通過map端壓縮生成了3個壓縮文件:A.log.1.bz2,A.log.2.bz2,A.log.3.bz2,之后map通過k-v對把<源文件名稱,壓縮文件名稱>發送給reduce,這樣相同日志就會分配到同一個reduce上。reduce做的事情很簡單,首先根據壓縮文件編號從小到大排序,然后從hadoop上讀取壓縮文件並merge到一起,最后在HDFS上生成一個新的壓縮文件。

注:這里每個日志分成1,2,3三個塊是為了描述方便,實際上使用的是map處理文件塊時文件的偏移量。

存在的問題

  • reduce性能瓶頸  這么做之后reduce就成為性能瓶頸了,因為一個日志最終都交給一個reduce進行合並,還是比較慢。解決方案是壓縮前的日志不能按天存放,需要按小時存放,這樣大日志可以分批次壓縮合並到天級別的壓縮文件中。由於我們只是保證在第二天及時產生前一天的壓縮文件,我們在前一天就可以對已存在的部分日志進行分批壓縮,而只在每天零點對前一天最后面幾個小時的日志進行壓縮合並,縮短延遲。當時我采用每6個小時壓縮一次,這樣一天的日志分四次壓縮完成,每天凌晨只對前一天最后6小時日志壓縮,延遲保證在一小時之內。
  • 集群流量風暴 這個方案會大量的從HDFS上讀寫數據,非常容易造成集群流量風暴,導致集群上其它計算任務失敗。解決方案是每次讀寫一定大小的數據后sleep幾秒。
  • map端讀數據優化 我們知道map/reduce默認是按行讀取數據並處理,這對於我們來說效率很低。比如一個大的日志可能有幾億條日志,那么就要調用map幾億次,而我們的map只對數據進行壓縮,不要求按行傳遞,最好的方式是按塊。解決方案是重寫RecordReader類,實現自己的讀數據方案。
  • 如何讓一個Reduce只merge一種日志 如果只按文件名進行reduce路由,就會出現有兩種日志都分配到一個reduce上merge的情況。因為選擇reduce的時候,默認行為是根據key計算哈希值后對reduce數取模得到編號,這樣就有可能兩個不同的key的哈希值是相同的。如果兩個日志都分配到同一個reduce上,那么排在后面的日志必須等前面的日志merge完之后才能merge,效率不高。解決方案是:設置reduce數為日志種類數,覆寫Partitioner類,把日志種類與reduce編號一一對應,這樣就能達到所有日志不用排隊同時merge的效果。

具體實現

在這里把需要實現的代碼簡要的列出來,這里面以java版本為例。

  • FileInputFormat類:必須自己寫一個類繼承該類,覆寫其createRecordReader方法。這個方法是一個工廠方法,告訴map/reduce需要一個什么樣的RecordReader,RecordReader就是map讀取數據所用到的類。
  • public class CompressMergeInputFormat extends
            FileInputFormat<FileAndPos, ByteBuffer> {
    
        @Override
        public RecordReader<FileAndPos, ByteBuffer> createRecordReader(
                InputSplit split, TaskAttemptContext context) throws IOException,
                InterruptedException {
            return new CompressMergeRecordReader();
        }
        
        /**
         * 由於數據量較大,默認以8個blockSize作為一個Split分配給一個map。hadoop默認一個blockSize是64M,當日志量太大時會產生很多小壓縮文件。
         * */
        @Override
        protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
            return 8 * blockSize;
        }
    }
  • RecordReader<KEY, VALUE>類:必須自己寫一個類繼承該類,該類是一個模板類,模板參數分別由Key和Value類型指定,實際上是kv對。map/reduce默認key是當前讀取數據在文件中的偏移量,value是內容。我們必須覆寫其initialize,nextKeyValue,getCurrentKey,getCurrentValue方法,並且實現KEY和VALUE。
  • public class CompressMergeRecordReader extends
            RecordReader<FileAndPos, ByteBuffer> {
        private Path file;
        private long pos;
        private long readed = 0;
        private long length;
    
        private FileSystem fs;
        private FSDataInputStream in;
        private Configuration config = null;
    
        private FileAndPos currentKey = new FileAndPos();
        private ByteBuffer currentValue = new ByteBuffer();
    
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context){
            
            FileSplit filesplit = (FileSplit) split;
            file = filesplit.getPath();       //獲取文件路徑
            pos = filesplit.getStart();       //獲取split塊偏移,每個split都會被map壓縮成1個單獨的文件
            length = filesplit.getLength();
            currentKey.setFile(file.toString());
            currentKey.setPos(pos);
            config = context.getConfiguration();
            fs = file.getFileSystem(context.getConfiguration());
            in = fs.open(file);
            in.seek(pos);
        }
    
        @Override
        public boolean nextKeyValue(){   //讀取下一個k-v
            if (readed >= length) {
                return false;
            }
            int once = in.read(currentValue.buffer);
            currentValue.length = once;
            if (once == -1) {
                return false;
            }
            if(readed + once > length){   //如果大於本文件塊則要少讀一些
               currentValue.length = (int)(length - readed);
               readed = length;
            } else {
               readed += once;
            }
    return true; } @Override public FileAndPos getCurrentKey() throws IOException, InterruptedException { return currentKey; } @Override public ByteBuffer getCurrentValue() throws IOException, InterruptedException { return currentValue; } //... 其它省略
    }
  • 實現KEY:必須是一個Writable對象,實現readFields和write和compareTo方法。我們的key中記錄文件路徑以及當前數據偏移量。
  • public class FileAndPos implements WritableComparable<FileAndPos> {
        private String file;
        private long pos;
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(file);
            out.writeLong(pos);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            file = in.readUTF();
            pos = in.readLong();
        }
    
        @Override
        public int compareTo(FileAndPos o) { //需要自己實現比較函數,map會把讀到的一批key-value安裝這個順序排序
            int filecompare = file.compareTo(o.file);
            if (filecompare == 0) {
                if (pos < o.pos) {
                    return -1;
                } else if (pos > o.pos) {
                    return 1;
                } else {
                    return 0;
                }
            } else {
                return filecompare;
            }
        }
        //... 省略
    }
  • 實現VALUE:可以實現成一個固定大小的數組(1M),代表調用一次map函數傳遞多少數據。
  • public class ByteBuffer {
        public byte[] buffer = new byte[1024 * 1024];
        public int length = 0;
    }
  • 實現Partitioner:當map完成時,在map的cleanup函數中向reduce發送一條kv。Partioner的默認行為是對key計算hash值,根據hash值對reduce數取模得到reduce編號。但是由於我們的key帶有文件路徑以及偏移信息,直接使用hadoop默認行為會把本應分到同一個reduce上的kv對分配到多個reduce上,造成多個reduce同時寫一個文件的問題,所以我們必須重寫Partitioner類,對同一個日志的kv對產生固定的reduce編號,這樣達到所有種類日志同時merge,並且每個reduce只merge一種日志,不存在一個日志分配到多個reduce上的效果。
  • public class CompressMergePartitioner extends Partitioner<Text, Text> {
    
        @Override
        public int getPartition(Text key, Text value, int numPartitions) {
            
            String str = key.toString();
            
            //查表得到日志對應的reduce編號,需要提前做好這張表
            
            return reduceIndex;
        }
    }
  • 實現OutputFormat 由於我們的reduce是自己寫文件,必須阻止reduce自己的默認行為(把value寫入一個part_xxx文件),需要覆寫OutputFormat類,使其不產生任何輸出文件
  • public class CompressMergeOutputFormat<K, V> extends OutputFormat<K, V> {
    
        @Override
        public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) {
            return new RecordWriter<K, V>() {
                public void write(K key, V value) {
                   //啥都不做
                }
    
                public void close(TaskAttemptContext context) {
                }
            };
        }
        //...其它代碼忽略
  • map實現
  • public class CompressMergeMap extends
            Mapper<FileAndPos, ByteBuffer, Text, Text> {
        //...省略
        private CompressionOutputStream out;
    
        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            
            String buf = null;
            Configuration conf = context.getConfiguration();
            long pos = ((FileSplit) context.getInputSplit()).getStart();
            String path = ((FileSplit) context.getInputSplit()).getPath().toString();
            //....省略
        }
    
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {        //根據path和pos生成你的key-value,key中包含日志名以及偏移量,value是這個Split的壓縮文件路徑
            context.write(key,value);
        }
    
        @Override
        protected void map(
                FileAndPos key,
                ByteBuffer value,
                org.apache.hadoop.mapreduce.Mapper<FileAndPos, ByteBuffer, Text, Text>.Context context)
                throws IOException, InterruptedException {
            out.write(value.buffer, 0, value.length);
            delta += value.length;
            if(delta >= 5 * 1024 * 1024){ //寫5M休息一會
                Thread.sleep(10);
                delta = 0;
            }
        }
    }
  • reduce實現 
  • public class CompressMergeReduce extends
            Reducer<Text, Text, NullWritable, NullWritable> {
        private FileSystem fs;
    
        //...省略
    
        @Override
        protected void reduce(
                Text key,
                java.lang.Iterable<Text> values,
                org.apache.hadoop.mapreduce.Reducer<Text, Text, NullWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {
    
            //根據key中的offset排序,然后merge到最終的bz2文件中
        }
    }

     


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM