Hadoop| MapperReduce02 框架原理


 

 MapReduce框架原理

 

 

MapReduce核心思想

1)分布式的運算程序往往需要分成至少2個階段。

2)第一個階段的MapTask並發實例,完全並行運行,互不相干。

3)第二個階段的ReduceTask並發實例互不相干,但是他們的數據依賴於上一個階段的所有MapTask並發實例的輸出。

4)MapReduce編程模型只能包含一個Map階段和一個Reduce階段,如果用戶的業務邏輯非常復雜,那就只能多個MapReduce程序,串行運行。

1、InputFormat(切片| 把切片變成k,v值)數據輸入

一. 默認的FileInputFormat--TextInputFormat

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> public class TextInputFormat extends FileInputFormat<LongWritable, Text> Rich leaning form --每條記錄對應的鍵值對---> (0,Rich leaning form )
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,TaskAttemptContext context) { String delimiter = context.getConfiguration().get("textinputformat.record.delimiter"); byte[] recordDelimiterBytes = null; if (null != delimiter) recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8); return new LineRecordReader(recordDelimiterBytes); }

 

默認切片規則的實現類FileInputFormat;默認把切片變成key,value值的實現類為TextInputFormat(按行讀取每條記錄,鍵是存儲該行在整個文件中的起始偏移量,LongWritable類型;值是這行內容 Text類型),它返回的RecordReader類型為LineRecordReader。

 

MapTask的數量是由InputFormat來指定的,InputFormat生成多少個InputSpilt切片數就會有多少個task。

 切片與MapTask並行度決定機制

MapTask的並行度決定Map階段的任務處理並發度,進而影響到整個Job的處理速度。

MapTask並行度決定機制

數據塊:Block是HDFS物理上把數據分成一塊一塊。

數據切片:數據切片只是在邏輯上對輸入進行分片,並不會在磁盤上將其切分成片進行存儲。

yarn優化策略,本地啟動任務啟動MapTask,盡量不產生網絡IO;按照一個個文件來切,判斷是否大於128M;切片數量默認>=文件數量。

切片的原理

FileInputFormat切片源碼解析

 

 FileInputFormat切片大小參數設置

  Job提交流程

源碼

waitForCompletion()

submit();

// 1建立連接
    connect();    
        // 1)創建提交Job的代理
        new Cluster(getConfiguration());
            // (1)判斷是本地yarn還是遠程
            initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
    // 1)創建給集群提交數據的Stag路徑
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

    // 2)獲取jobid ,並創建Job路徑
    JobID jobId = submitClient.getNewJobID();

    // 3)拷貝jar包到集群
copyAndConfigureFiles(job, submitJobDir);    
    rUploader.uploadFiles(job, jobSubmitDir);

// 4)計算切片,生成切片規划文件
writeSplits(job, submitJobDir);
        maps = writeNewSplits(job, jobSubmitDir);
        input.getSplits(job);

// 5)向Stag路徑寫XML配置文件
writeConf(conf, submitJobFile);
    conf.writeXml(out);

// 6)提交Job,返回提交狀態
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
View Code

 

確認文件夾是否存在; 獲取jobId;要往jobId中提交一些文件:

copyAndConfigureFiles(Job,submitJobDir)即 xml文件切片文件信息 writeConf(conf,SubmitJobFile)

切片的方法:

①遍歷所有文件 |
②判斷是否可切 |

③計算塊大小,切片大小 | minsize  blocksize  maxsize -->取中間值;splitSize要比blockSize大,則minsize設置 > blocksize;splitsize< blicksize,則maxsize < blocksize

return Math.max(minSize, Math.min(maxSize, blockSize));

本地模式沒有塊,默認32M;

④ Long bytesRemaining / splitsize > 1.1(本地剩余/切片數量)

  但切時按1倍切,保證切片體積不至於太小;bytesRemaining -=splitSize

 默認切片規則的實現類FileInputFormat;默認把切片變成key,value值的實現類為TextInputFormat(按行讀取每條記錄,鍵是存儲該行在整個文件中的起始偏移量,LongWritable類型;值是這行內容 Text類型),它返回的RecordReader類型為LineRecordReader。

 

 

 

CombineTextInputFormat 改變了切片規則;雖重寫了從切片到k,v但返回的還是<LongWritable, Text>;適用於小文件過多的場景;
CombineTextInputFormat用於小文件過多的場景,它可以將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個MapTask處理。

public abstract class CombineFileInputFormat<K, V> extends FileInputFormat<K, V>
public class CombineTextInputFormat extends CombineFileInputFormat<LongWritable, Text>

CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
注意:虛擬存儲切片最大值設置最好根據實際的小文件大小情況來設置具體的值。
生成切片過程包括:虛擬存儲過程和切片過程二部分。
(1)虛擬存儲過程:
將輸入目錄下所有文件大小,依次和設置的setMaxInputSplitSize值比較,如果不大於設置的最大值,邏輯上划分一個塊。如果輸入文件大於設置的最大值且大於兩倍,那么以最大值切割一塊;當剩余數據大小超過設置的最大值且不大於最大值2倍,此時將文件均分成2個虛擬存儲塊(防止出現太小切片)。
例如setMaxInputSplitSize值為4M,輸入文件大小為8.02M,則先邏輯上分成一個4M。剩余的大小為4.02M,如果按照4M邏輯划分,就會出現0.02M的小的虛擬存儲文件,所以將剩余的4.02M文件切分成(2.01M和2.01M)兩個文件。
(2)切片過程:
(a)判斷虛擬存儲的文件大小是否大於setMaxInputSplitSize值,大於等於則單獨形成一個切片。
(b)如果不大於則跟下一個虛擬存儲文件進行合並,共同形成一個切片。
(c)測試舉例:有4個小文件大小分別為1.7M、5.1M、3.4M以及6.8M這四個小文件,則虛擬存儲之后形成6個文件塊,大小分別為:
1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M)
最終會形成3個切片,大小分別為:
(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
View Code

以后都是FileInputFormat的子類:

KeyValueTextInputFormat 改變了切片到k,v值;每一行均為一條記錄,被分隔符分割為key,value;
NLineInputFormat 按行切片;切片-->k,v用默認方式;(不再按塊去划分,而是按NLineInputFormat指定的行數N來划分即:輸入文件的總行數 / N = 切片數)
一個文件一個切片,RecordReader把切片變成k,v值
默認輸入輸出類型:InputFormat從數據源頭的類型就改變了;

二. 自定義InputFormat

//自定義InputFormat,繼承FileInputFormat
public class WholeInputFormat extends FileInputFormat<Text, BytesWritable> {

    /**
     * 1.設置為不切分,一次讀取一個完整文件封裝為k,v
     * @param context
     * @param filename
     * @return
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    /**
     *2. 重寫RecordReader --> return new WholeRecordReader()
     * @param split
     * @param context
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        return new WholeRecordReader();
    }
}




public class WholeRecordReader extends RecordReader<Text, BytesWritable> {
    private Text key = new Text();
    private BytesWritable value = new BytesWritable();
    private boolean isRead = false;
    private FileSystem fileSystem;
    private FileSplit fs;
    private Path path;
    private FSDataInputStream fis;

    /**
     * 1. 
     * @param split
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        fileSystem = FileSystem.get(context.getConfiguration());//new Configuration(),這里已經開了,只需調用
        fs = (FileSplit) split;  //從輸入InputSplit中解析出一個個key/value
        path = fs.getPath();
        fis = fileSystem.open(path);

    }

    /**
     * 2.true讀取key, value
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!isRead){
            //ture, 讀取key
            String path = fs.getPath().toString();
            key.set(path);

            //讀取value
            long length = fs.getLength();
            byte[] bytes = new byte[(int) length];
            fis.read(bytes);
            value.set(bytes, 0, bytes.length);

            //設為true
            isRead = true;
            return true;
        }else {
            return false;
        }
    }

    /**
     * 3. 獲取key
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public Text getCurrentKey() throws IOException, InterruptedException {
        return key;
    }

    /**
     * 4.獲取value
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return value;
    }

    /**
     * 5. 獲取讀取進度;
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public float getProgress() throws IOException, InterruptedException {
        return isRead ? 1 : 0;
    }

    /**
     * 6. 關流
     * @throws IOException
     */
    @Override
    public void close() throws IOException {
        fileSystem.close();
        IOUtils.closeStream(fis);
    }
}



public class WholeDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        //1.實例化
        Job job = Job.getInstance(new Configuration());
        //2.設置jar類路徑
        job.setJarByClass(WholeDriver.class);
        //3.設置自定義的InputFormat
        job.setInputFormatClass(WholeInputFormat.class);
        //輸出文件類型;輸出時用SequenceFileOutputFormat輸出合並文件
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        //4.設置Map輸出端的k v類型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);
        //5.設置輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path("F://input"));
        FileOutputFormat.setOutputPath(job, new Path("F://output"));
        //6.提交任務
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }

}
View Code

 

2、 MapReduce工作流程

 

在有限的內存實現全排序;
Job.split、wc.jar、Job.xml等文件通過客戶端提交給yarn,yarn根據切片信息啟動相應數量的MapTask
MapTask1處理切片128M的數據塊,它調用默認的TextInputFormat的RecoderReader類型的方法讀取數據,它一行行讀到的 k,v值(行號,行內容)給Mapper();
Mapper()調用map(k,v)方法 通過context.write(k, v)寫出去(給框架) ----通過outputCollector收集(MapTask的處理結果)<K,V,P>環形緩沖區中;
排序的時候要先完成分區(就是把分區號一樣的在一塊);再在分區內部排序-->分區排序--局部的快排序(內存保證輸出的每個文件內部是有序的,按照文件key做全排序,相同的key挨一起
緩沖區滿就溢出到文件磁盤(文件特點:①一個文件;②分區且區內有序;溢寫完之后文件分兩個部分spill.index索引(文件有多少個分區,分區從哪結束)和spill.out輸出文件)
---->>多次溢出(取決於map方法)形成的多個文件,不同文件之間是無序的--->多個溢寫的文件合並成一個文件,按照分區歸並排序(A、B文件之間做比較,若A第一個值 < B第一個值,就寫到C中;接着再比較下面的依次...)
首先分區1進行歸並寫到文件的前半部分,分區2再進行歸並寫到文件的后半部分...(歸並是數據流)
到此為止,Map階段結束(生成一個分區且區內有序的完整的輸出文件,shuffle的前半部分);

MapTask輸出文件分區且區內有序多個MapTask形成多個分區且區內有序的文件;多個map輸出文件交給Reduce來處理,全都下載到本地;
--->>根據分區數啟動相應數量的ReduceTask,ReduceTask1下載所有分區patition0即分區1的數據拷貝到內存緩沖區buffer,內存不夠溢出到磁盤;ReduceTask2下載所有分區2的數據;
---->>各自合並文件,歸並排序(多個文件歸並得到一個文件,文件特點是:按照key有序);
至此完成了從map輸出到reduce輸入的3次排序--全排序;有了這個全排序文件才能進行分組-->輸出到reduce()方法里邊;這就是shuffle的全過程(Map方法之后,Reduce方法之后;3次全排序+分組);

--->>一次讀取一組,按照相同key分組 --> GroupingComparator(k, knext) 分組 --->>Reduce(k,v)方法 context.write(k,v)-->> 
經過outputformat(默認TextOutputFormat)RecordWriter--- write(k, v) --->>Part-r-00000文件;


如果啟動了combiner:(前提是求和操作而不是求平均值)
第一次排序結束后在落盤(溢出寫入磁盤)之前會經過combiner的合並(沒有重復的單詞,但兩個ReduceTask之間有重復的單詞);
第二次排序即歸並排序完之后還沒落盤之前(沒有重復的的單詞)數據流會流入combiner(之后可進行壓縮),寫到磁盤上;
第三次排序,兩個相同分區的數據下載(有可能有重復單詞)合並---> 啟動Reduce,沒有combine

3、shuffle機制

目的:分組 --->方法是全排序;

Map方法之后,Reduce方法之前的數據處理過程稱之為Shuffle。

 

環形緩沖區:在邏輯上就是環形的,特點沒頭沒尾;<k,v,p>進入環形緩沖區,傳遞過來的是序列化的數據,右環是數據,左環是索引;

寫之前要排序,數據要反序列化成對象才能比較,看1、2數據是否需要交換順序,如果需要交換交換的是索引;按照索引一組一組把數組寫到磁盤上;

分區也是在排序;首先按P排序,再按k排序--->寫到磁盤上; 分區和排序全都發生在緩沖區當中,且分區和排序是同時完成的;在數據沒填滿(達到80%)時就會溢寫(溢出之前就已經排好序了)到磁盤;

 

shuffle是一部分MapTask,一部分ReduceTask;MapTask會有一個單獨的輸出文件;ReduceTask要從MapTask中下載數據到本地,要等到所有的MapTask運行完之后才能運行;
按key分組,全排序---->按key排好,兩個數據就可以決定分組,(默認調用compareTo進行分組,否則就用自定義的,自定義是繼承類WritableComparator;)這個數據跟下個數據比較,一樣就分同一組,不一樣就新啟動一組;
內存里邊只有兩個數據,這一個和下一個;
從map到reduce進行的全排序;快排,單軸排序,java用的雙軸排序;
分區只是決定這條數據去哪個reduceTask,不能決定reduceTask的數量;
怎么比較key是否相等;


① 分區

 
         
public class HashPartitioner<K, V> extends Partitioner<K, V> { public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }   默認分區是根據 Key的hashCode對ReduceTask個數取模得到,沒法控制哪個key存儲到哪個分區;
}
 return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; numReduceTasks是設置的ReduceTask的數量;取余 Integer.MAX_VALUE換算成位是一串011111...;假設key.hashCode是 0100100,它們做位運算,沒變化,但如果是以1開頭的,就是負數,取余還是負數; --->目的就是防止出現負數;

把不同的數據分到不同的ReduceTask分別處理叫分區; 

public class MyPartitioner extends Partitioner<Text, FlowBean> { //自定義分區 public int getPartition(Text text, FlowBean flowBean, int i) { String substring = text.toString().substring(0, 3); switch (substring){ case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 4; } } } public class PatitionDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.實例化 Job job = Job.getInstance(new Configuration()); //2.設置類jar路徑 job.setJarByClass(PatitionDriver.class); //3.設置Mapper和Reduce路徑 job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReduce.class); job.setPartitionerClass(MyPartitioner.class); //job.setNumReduceTasks(5); //job.setNumReduceTasks(6); //ReduceTask數量6 > 分區數量5; 產生空的輸出文件part-r-00005 //job.setNumReduceTasks(4); //ReduceTask數量4 < 分區數量5; 有部分數據無處放Illegal partition for 18271575951 (4) //如果分區數量跳過3,不按順序;Illegal partition for 13956435636 (4);分區號不能跳,會浪費;分區號必須從0開始,逐一累加; job.setNumReduceTasks(1); //ReduceTask數量1,不管MapTask輸出多少個分區文件,最終結果都交給這一個ReduceTask,最終也只會產生一個part-r-0000文件 //4.輸出類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //5.路徑 FileInputFormat.setInputPaths(job, new Path("F://input")); FileOutputFormat.setOutputPath(job, new Path("F://output")); //6.提交 boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

 

 
        

② WritableComparable排序

MapTask和ReduceTask均會對數據按照key進行排序,屬Hadoop默認行為,任何應用程序中的數據均會被排序,不管邏輯上是否需要;

默認排序是按字典順序排序,且實現該排序方法的是快速排序;

分類:

部分排序(MapReduce根據輸入記錄的鍵對數據集排序,保證輸出的每個文件內部有序);

全排序(最終輸出結果只有一個文件,且文件內部有序;實現方法是只設置一個ReduceTask,在處理大型文件時效率極低,一台機器處理所有文件,喪失了MapReduce所提供的行為架構);輔助排序(GroupingComparator分組,在Reduce端對key分組,應用於:在接受的key為bean對象時,想讓一個或幾個字段相同(全部字段比較不相同)的key進入到同一個reduce方法時可采用分組排序);

二次排序(自定義排序中,如果comparaTo中的判斷條件為兩個即二次排序;)

排序接口 WritableComparable,利用了Shuffle強制排序的過程
分區-->排序;把需要排序的放到key的位置就會自動排序(強制性)

自定義排序

自定義排序WritableComparable

  原理分析

bean對象做為key傳輸,需要實現WritableComparable接口重寫compareTo方法,就可以實現排序。

/OrderBeam是WritableComparable的實現類 //bean對象做為key傳輸,需要實現WritableComparable接口重寫compareTo方法,就可以實現排序。   public class OrderBean implements WritableComparable<OrderBean> { } @Override public int compareTo(OrderBean o) { //先按orderById分; 如果相同則按price int compare = this.getOrderById().compareTo(o.getOrderById()); if (compare == 0){ return -Double.compare(this.price, o.price); //默認是升序; 降序需加- }else { return compare; } } @Override //序列化把它寫進去 public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(orderById); dataOutput.writeUTF(productId); dataOutput.writeDouble(price); } @Override //反序列化要再把它讀出來 public void readFields(DataInput dataInput) throws IOException { orderById = dataInput.readUTF(); productId = dataInput.readUTF(); price = dataInput.readDouble(); }
排序的同時可自定義分區 public class SortPatitioner extends Partitioner<FlowBean, Text>{ @Override public int getPartition(FlowBean flowBean, Text text, int i) { String start = text.toString().substring(0, 3); switch (start) { case "136": return 0; case "137": return 1; case "138": return 2; case "139": return 3; default: return 4; } } }

 

③ Combiner合並

combiner應用的前提是求和操作,求平均值不行

 Reduce(K, V)它在ReduceTask中;    Combine在MapTask節點中運行;

Combine是對Mapper的局部匯總,不能改變Mapper的輸出類型,Combine輸入輸出泛型應該是一樣的;

如:

aaa  aaa  aaa bbb  bbb  bbb ccc  ccc  ccc 在map階段是(aaa,1)3;(bbb, 1)3;(ccc, 1)3     用了combine在map階段(aaa, 3),(bbb, 3),(ccc, 3)

reduce可以對所有的mapper進行全局匯總;combine是局部合並;

reduce(分組輸入,combine肯定也是分組輸入,)輸出文件至少是有序的;整個shuffle階段就是給reduce准備輸入文件,排好序;

job.setCombinerClass(WcReduce.class);  //不設置CombinerClass

 

 

④分組(輔助排序)

對Reduce階段的數據根據某一個或幾個字段進行分組。

分組排序步驟:

(1)自定義類繼承WritableComparator

(2)重寫compare()方法

@Override

public int compare(WritableComparable a, WritableComparable b) {

      // 比較的業務邏輯

      return result;

}

(3)創建一個構造將比較對象的類傳給父類

protected OrderGroupingComparator() {

     super(OrderBean.class, true);

}

自定義分組

分組規則和排序規則一致就不需要自定義分組;排序規則和分組規則不一樣,就用到自定義分組;
分組粒度 > 排序的粒度;分組的粒度更粗一點,排序的粒度更細點;
public class MyComparator extends WritableComparator{
    public MyComparator(){
        super(OrderBean.class, true);
    }
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean oa = (OrderBean) a;
        OrderBean ob = (OrderBean) b;      //構造兩個空對象拿到序列化的數據,把具有相同id的key放一個組;需要反序列化去比較
    return oa.getOrderById().compareTo(ob.getOrderById()); //分組,相同的key即id就分一個組  } }
 
public class OrderReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{ @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { // context.write(key, NullWritable.get()); //key之前是相同的,不相同的key進入同一個組;它取出來的是每個組第一個值;

        for (NullWritable value : values) { //遍歷所有的 context.write(key, value); } 對於reduce,key, value都只有一個對象; 它拿到的數據是已經排好序的k,v對象;往下遍歷一個,就反序列化一個, 再遍歷一個,繼續往下移動(只能往下移動,不能返回),反序列化; 對象始終只有兩個,對象的值卻是在不斷變化(通過反序列化的方法);
執行hasNaxt,調用WritableComparable跟下面比較下:若相等返回true就繼續往下迭代;把當前的迭代值與下一個比較,0返回true繼續迭代,非0返回false迭代結束;
結束后第二個reduceTask接着迭代的數據往下...分組和讀數據是同時進行的; Iterator
<NullWritable> iterator = values.iterator(); //單例模式,來回遍歷還是他自己 for (int i = 0; i < 2; i++){ if (!iterator.hasNext()) break; iterator.next(); context.write(key, NullWritable.get()); //查看組內前兩個值 //取前兩個值; }

NullWritalbe單例模式;私有化構造器,公有化一個get方法;餓漢式模式;
public class NullWritable implements WritableComparable<NullWritable> {
    private static final NullWritable THIS = new NullWritable();

    private NullWritable() {
    }
    public static NullWritable get() {
        return THIS;
    }

在大數據背景下,不允許有大量對象存在內存,在整個MapReduce框架運行過程中,所有的數據都是用序列化來傳遞的。需要用到對象時,現場序列化現場反序列化;
對於自定義的comparator比較的時候並沒有現成的對象用,需要先構造兩個空對象,拿到序列化的數據,反序列化進行比較;核心:減小IO;

分區發生在mapTask,分組發生在ReduceTask中;

combine輸入的數據肯定是要分組,默認用bean的排序規則,combine的分組規則不會用到GroupingComparator; 

GroupingComparator 起效只在reduce之前分組起效;

 4、OutputFormat數據輸出

 

 SequenceFileOutputFormat它是處在兩個MapReduce之間的臨時文件; SequenceFileOutputFormat   SequenceFileInputFormat發,接;什么類型出來,什么類型接,中間用它連接;

自定義OutputFormat

public class MyOutputFormat extends FileOutputFormat<LongWritable, Text> { @Override public RecordWriter<LongWritable, Text> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { MyRecordWriter recordWriter = new MyRecordWriter(); recordWriter.initialize(job); return recordWriter; } } public class MyRecordWriter extends RecordWriter<LongWritable, Text> { private FSDataOutputStream atguigu; private FSDataOutputStream others; public void initialize(TaskAttemptContext job) throws IOException { //job來獲取configuration //獲取文件系統 Configuration configuration = job.getConfiguration(); FileSystem fileSystem = FileSystem.get(configuration); //獲取文件目錄 String path = configuration.get(FileOutputFormat.OUTDIR); atguigu = fileSystem.create(new Path(path + "/atguigu.log")); others = fileSystem.create(new Path(path + "/others.log")); } @Override public void write(LongWritable key, Text value) throws IOException, InterruptedException { String line = value.toString() + "\n"; if (line.contains("atguigu")){ //包含就寫出去; FSDataOutputStream要以bytes的格式寫出去; 把k, v鍵值對變成text;  atguigu.write(line.getBytes()); }else { others.write(line.getBytes()); } } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { IOUtils.closeStream(atguigu); IOUtils.closeStream(others); } } public class OutDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(OutDriver.class); job.setOutputFormatClass(MyOutputFormat.class); FileInputFormat.setInputPaths(job, new Path("F://input")); FileOutputFormat.setOutputPath(job, new Path("F://output")); boolean b = job.waitForCompletion(true); System.exit(b ? 0 :1); } }

 

 5、Join的多種應用

 ① Reduce Join

原理:

Map端:為來自不同表或文件的key/value對,打標簽以區別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后輸出;

Reduce端:在Reduce端以連接字段作為key的分組已完成,只需在每一個分組中將來源不同文件的記錄(在map階段已打標志)分開,最后合並就ok

缺點:

這種方式合並的操作在Reduce階段完成,Reduce端的處理壓力太大,Map節點的運算負載則很低,資源利用率不高,且在Reduce階段極易產生數據傾斜;

解決方案:Map端實現數據合並。

 

ArrayList數組的底層是,數組中每個值指向內存地址,每個數組對象bean對象;o.add添加對象,這個對象不會改變;

 首先要獲得pname,要找到它,排序把它放到第一個,進來的時候就直接獲取到了;第二個就直接往外寫就可以了;  分組排序

 

① pid 訂單id amount 01 1001 1 02 1002 2 03 1003 3 01 1004 4 02 1005 5 03 1006 6 01 小米 0 02 華為 03 格力 ②先按pid排,再按pname排,二次排序 01 小米 0 01 x 1 1001 01 x 4 1004 02 華為 0 02 x 2 1002 02 x 5 1005 03 格力 0 03 x 3 1003 03 x 6 1006 ③把pid一樣的分到一個組; return ta.getPid().compareTo(tb.getPid()); 再經過Reduce方法處理: 01 小米 0 01 小米 1 1001 01 小米 4 1004 02 華為 0 02 華為 2 1002 02 華為 5 1005 03 格力 0 03 格力 3 1003 03 格力 6 1006 最后結果④ 訂單id pname amount 1004 小米 4 1001 小米 1 1005 華為 5 1002 華為 2 1006 格力 6 1003 格力 3

通過將關聯條件作為Map輸出的key,將兩表滿足Join條件的數據並攜帶數據所來源的文件信息,發往同一個ReduceTask,在Reduce中進行數據的串聯,如圖

 

 @Override public int compareTo(TableBean o) { //先按pid排序,再按名字排 int compare = this.pid.compareTo(o.pid); if (compare == 0){ return -this.pname.compareTo(o.pname); //默認是升序,沒名字的在前;降序加- ;把有pname的靠前;; }else { return compare; } } 在map階段完成封裝,首先按pid排序,然后再按pname排;
01  小米 排完序之后(排序和分組)應該這樣子; 把排好序的一塊輸到reduce里;
1001  01  1
1004  02  4
public class TableReducer extends Reducer<TableBean, NullWritable, TableBean, NullWritable> { @Override protected void reduce(TableBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { Iterator<NullWritable> iterator = values.iterator(); iterator.next(); //先遍歷第一個,指針首先是指向0的,next完之后它就指向了1, String pname = key.getPname(); //取出pname;把剩下的數據寫出去,因為它們都要進行替換 System.out.println(pname); while
(iterator.hasNext()){ iterator.next(); //先執行下; key.setPname(pname); 執行到這一步時key已經到第二行了即 1001  01  1 context.write(key, NullWritable.get()); setpanme,再寫出去 } } }
遍歷value時,key會變; key像一個杯子,先取出水pname放一個string里,再next下,里邊就是裝的其他內容;再把pname裝進去;最后把這個杯子整體倒出來到一個框架。


 

 ② Map Join

使用場景

Map Join適用於一張表十分小(大約25M,最好不超過10M)、另一張表大小無所謂。

優點

思考:在Reduce端處理過多的表,非常容易產生數據傾斜。怎么辦?

  在Map端緩存多張表,提前處理業務邏輯,這樣增加Map端業務,減少Reduce端數據的壓力,盡可能的減少數據傾斜

  不需要Reduce,Map出的key就不需專門排序了,也不需分組了;從Map---->outputFormat結束;

 沒有shuffle,不會產生數據傾斜;

並行數由切片數決定的;一個reduce處理多少數據看分區的均勻度;默認按hashcode值分區,hashcode要盡量平均,這是其一;key的重復過多(如全校前10排序,因為有尖子班,它們班占據前八) --->>數據傾斜(由分區引起);

如25M小文件,join 10G文件(分80份,80個MapTask),每個MapTask都需要拉來25M數據; 25*80=2000MB

 

具體辦法:采用DistributedCache

       (1)在Mapper的setup階段,將文件讀取到緩存集合中。

       (2)在驅動函數中加載緩存。

// 緩存普通文件到Task運行節點。

job.addCacheFile(new URI("file://e:/cache/pd.txt"));

public class DistributedMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

    //存放緩存數據的map
    private Map<String, String> pMap = new HashMap<>();
    private Text text = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        //①利用context獲取緩存文件,把pd.txt文件放到緩存文件中;用java的IO流讀取
        URI[] cacheFiles = context.getCacheFiles();  //可以添加多個
        String path = cacheFiles[0].getPath(); //獲取字符串路徑,/F:/input/pd.txt
        System.out.println("文件路徑為:" + path);
        BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

        //②一行行的讀取,放到緩存文件map集合中
        String line;
        while (StringUtils.isNotEmpty(line = reader.readLine())){  //循環讀取緩存文件
            String[] split = line.split("\t");  //切割
            pMap.put(split[0], split[1]); //把pid和pname存放集合中
            //System.out.println(split[0] + "\t" + split[1]);
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("\t");
        String pname = pMap.get(split[1]);
        String out = split[0] + "\t" + pname + "\t" + split[2]; //拼接寫出去;訂單id  pname   amount

        text.set(out);
        context.write(text, NullWritable.get()); //強制按照id進行排序
    }

}


public class DistributedCacheDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Job job = Job.getInstance(new Configuration());

        job.setJarByClass(DistributedCacheDriver.class);

        job.setMapperClass(DistributedMapper.class);
        job.setNumReduceTasks(0); //不需要Reduce階段

        job.addCacheFile(URI.create("file:///F:/input/pd.txt"));  //file://協議,表本地文件; 加載緩存文件
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("F://input/order.txt"));
        FileOutputFormat.setOutputPath(job, new Path("F://output"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);

    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM