Hadoop之MapReduce學習筆記(二)


主要內容:

  • mapreduce編程模型再解釋;
  • ob提交方式:
    • windows->yarn 
    • windows->local ;
    • linux->local
    • linux->yarn;
  • 本地運行debug調試觀察

 

mapreduce體系很龐大,我們需要一條合適的線,來慢慢的去理解和學習。

1、mapreduce編程模型和mapreduce模型實現程序之間的關系

1.1、mapreduce的編程模型

對mapreduce的總結:

   如果只考慮數據處理的邏輯,撇開分布式的概念,其實mapreduce就只是一個編程模型了,而不是一個框架了。在這個編程模型里數據處理分為兩個節點,一個map階段一個reduce階段。

   map階段要做的事情:就是吧原始的輸入數據轉換成大量的key-value數據,結合wordcont實例,key相同的數據會作為一組,形成若干組數據,接着就是這些組數據,一組一組的進行reduce階段處理,每組reduce一次。

   reduce階段要做的事情:一組(key相同的數據)聚合運算一次。

一wordcount為例:數據被一行一行的讀進來,按照空格進行拆分,將句子形成一個個(word,1)形式的鍵值對,map階段就結束了;reduce階段就是把單詞相同的數據作為一組進行聚合,聚合邏輯就是把該組內的全部value累加在一起。

1.2、關系梳理

  以上就是mapreduce的編程模型,編程模型並不能代表hadoop中的mapreduce框架,mapreduce編程模型其實就是一種典型的數據運算的邏輯模型,無論是hadoop-mapreduce運算框架也好,還是spark運算框架也好,都是具體的程序,都是對mapreduce編程模型的一種實現。而且hadoop中實現該模型時,在map階段寫了一個程序叫做map Task,在reduce 階段寫了一個程序叫做reduce Task;子spark里面,只不過時換了另外的名字,思想都一樣。

  以后在寫mapreduce程序的時候,在寫業務邏輯的時候只需要考慮編程模型就可以了,框架已經將實現上的一些東西都封裝起來了,也就是說,要編寫一個業務邏輯我們需要考慮的是,map將來產生什么樣的key-vlue,將來相同的key就會作為一組沒reduce聚合一次。

2、job提交方式

2.1、windows-to-yarn / local

local:用於本地測試,無需打包成jar也無需提交。

Configuration conf = new Configuration();

//conf.set("fs.defaultFS", "file:///"); //默認指就是這樣
//conf.set("mapreduce.framework.name", "local"); //默認就是這樣

 若出現如下,錯誤,需要將hadoop配入window的環境變量中,同時將hadoop的bin目錄配置到path中。

 

yarn:【比較繁瑣】

  目前為止我們需要寫一個mapper實現類實現map階段的邏輯,和寫一個reduce實現類實現reduce階段的 邏輯,和一個job提交器,提交job。

  提交方式有多中,在上個筆記中,介紹了windows跨平台提交到yarn集群中,比較麻煩需要指定文件系統,需要知名job提交到哪里運行,還需要提供有權限的hdfs用戶,還需要兼容跨平台。如下:

// 在代碼中設置JVM系統參數,用於給job對象來獲取訪問HDFS的用戶身份
        // 或者通過eclipse圖形化界面來設置 -DHADOOP_USER_NAME=root
        System.setProperty("HADOOP_USER_NAME", "root") ;
        
        
        Configuration conf = new Configuration();
        // 1、設置job運行時要訪問的默認文件系統, map階段要去讀數據,reduce階段要寫數據
        conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
        // 2、設置job提交到哪去運行:有本地模擬的方式local
        conf.set("mapreduce.framework.name", "yar n");
        conf.set("yarn.resourcemanager.hostname", "hdp-01");
        // 3、如果要從windows系統上運行這個job提交客戶端程序,則需要加這個跨平台提交的參數
        conf.set("mapreduce.app-submission.cross-platform","true");

 

2.2、linux-to-yarn / local

若不配是上述參數直接將jar包上傳到hadoop集群中的任何一台機器上,在linxu機器中運行jar包中的job提交器(自己寫的jobsubmit),工具類會將jar包提交給local or(yarn,要看linux機器的配置參數是yarn還是local)無需在配置上述提到的參數,為什么呢?

        Configuration conf = new Configuration();
        //沒有指定默認文件系統
     //沒有指定mapreduce-job提交到哪里運行
     job.getInstance(conf)

 

使用hadoop jar命令而不是java -cp path1:path2... xxx.xx.xx.jobsubmiter

hadoop jar會把這台機器上的hadoop安裝包中的所有jar包,以及所有配置文件都加載到本次運行java程序的classpath中。

這就是不用配置上述提到的參數,的原因,job提交工具程序中有一行代碼如下,會將類路徑下的配置信息全部加載進去,會將mapred-defalut.xml讀入。

 

Configuration conf = new Configuration();
/* 
* 如果要在hadoop集群的某台機器上啟動這個job提交客戶端的話
 * conf里面就不需要指定 fs.defaultFS   mapreduce.framework.name
 * 
 * 因為在集群機器上用 hadoop jar xx.jar cn.edu360.mr.wc.JobSubmitter2 命令來啟動客戶端main方法時,
 *   hadoop jar這個命令會將所在機器上的hadoop安裝目錄中的jar包和配置文件加入到運行時的classpath中
 *   
 *   那么,我們的客戶端main方法中的new Configuration()語句就會加載classpath中的配置文件,自然就有了 
 *   fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 這些參數配置
*/

 

如下圖,在window中提交job是,從日志信息可以看出,首先連接ResourceManager,連接成功之后ResourceManager為其指定本次的jobID。對比在linux中提交,發現linux中運行速度很快,而且沒有日志顯示連接ResourceManager,而且jobID也命名中有local字樣,因為沒有指定job提交到yarn集群,默認提交到了本地模擬器(LocalJobRunner)。因為參數mapreduce.framework.name默認locl。我們可以在代碼中添加配置,無論提交到集群中的哪一台機子,都會去找yarn中的ResourceManager(配置文件中配置了地址),或者修改服務器的mapred-site.xml的參數值為yarn來覆蓋jar包中mapred-default.xml中的local。

jar包中的mapred-defalut.xml中的默認值。

 

 3、案例一

3.1、流量統計

現在有一批用戶上網行為日志,需要統計日志記錄中的用戶上行流量和下行流量,以及流量總和;

需要統計多個value值時,可以考慮將多個value封裝成一個valueBean對象,當然Bean對象需要實現hadoop的序列化接口(必須提供無參構造)

分析:Mapper<LongWritable, Text, Text, FlowBean>

      Reducer<Text, FlowBean, Text, FlowBean>

1363157993044     182******61    94-**-**-**-**-18:XXXX-YYYY    xxx.xxx.xxx.xx    iface.qiyi.com    視頻網站    15    12    1527    2106    200

 

3.1.1、自定義數據類型value

需要實現hadoop網絡序列化接口,需要實現序列化和反序列化方法

本案例的功能:演示自定義數據類型如何實現hadoop的序列化接口
  1、該類一定要保留空參構造函數
  2、write方法中輸出字段二進制數據的順序 要與 readFields方法讀取數據的順序一致

/**
     * hadoop系統在序列化該類的對象時要調用的方法
     */
    @Override
    public void write(DataOutput out) throws IOException {
    }

    /**
     * hadoop系統在反序列化該類的對象時要調用的方法
     */
    @Override
    public void readFields(DataInput in) throws IOException {
    }

3.1.2、自定義類型Key-Comparable

mapReduce的reduce在收集key-value的時候會按照key進行排序(內部排序機制),因此提供自定義得數據類型,作為key,必須實現比較接口和序列化接口,hadoop提供了一個合二為一的接口WritableComparable extend writable,Comparable

 3.2、topK統計

現有一批url訪問日志,統計出訪問量最高的前5個網站。

分析:當存在不止1個reduceTask的時候,每個reduceTask拿到的數據都是局部信息,統計得到的結果也都是局部結果。

方案1:只提供一個reduce Task,使用數據量很小的時候

方案2:多階段mapreduce當數據量很大的時候,上述方法就失去了分布式的優勢,此時可以提供多階段的mapReduce任務,下一次任務利用上一次產生的數據。

3.2.1、cleanup(Context context)

要點1:每一個reduce worker程序,會在處理完自己的所有數據后,調用一次cleanup方法

cleanup()函數的執行時機:假如該 reduceTask 接收到3組聚合數據,待3組數據的聚合工作都完成時候,會調用 一次cleanup()函數。

因此可以在cleanup()函數中進行結果排序,找出前幾名。(TreeMap是有序的)

 3.2.2、通過conf傳參topK

要點2:如何向map和reduce傳自定義參數

從JobSubmitter的main方法中,可以向map worker和reduce worker傳遞自定義參數(通過configuration對象來寫入自定義參數);然后,我們的map方法和reduce方法中,可以通過context.getConfiguration()來取自定義參數

 

Configuration conf = new Configuration() //

這一句代碼,會加載mr工程jar包中的hadoop依賴jar中的各默認配置文件*-default.xml

然后,會加載mr工程中自己的放置的*-site.xml

然后,還可以在代碼中conf.set("參數名","參數值")

 

另外,mr工程打成jar包后,在hadoop集群的機器上,用hadoop jar mr.jar xx.yy.MainClass

運行時,hadoop jar命令會將這台機器上的hadoop安裝目錄中的所有jar包和配置文件通通加入運行時的classpath,

 

配置參數的優先級:

1、依賴jar中的默認配置

2、環境中的*-site.xml

3、工程中的*-site.xml

4、代碼中set的參數

優先級一次增大,高優先級的參數值會覆蓋低優先級的參數值

 

可以通過conf將參數傳遞到reducer中。

reducer方法有個參數Context context;context.getConfiguration()可以拿到job提交器中設置的參數。

傳遞方式有多多種

/** 
* 通過代碼設置參數
*/ conf.setInt("top.n", 3); conf.setInt("top.n", Integer.parseInt(args[0]));

 

/**
* 通過屬性配置文件獲取參數 */
Properties props = new Properties();
props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties"));
conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));

 

通過main函數傳遞參數

通過.xml配置文件傳參

new Configration()默認加載core-default.xml core-site.xml  不會加載jar包里的hdfs-site.xml hdfs-default.xml,mapred-site.xml

可以加載自定義的xml文件

<configuration>
    <property>
        <name>top.n</name>
        <value>6</value>
    </property>
</configuration>

 

public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.addResource("xx-oo.xml");
        
        System.out.println(conf.get("top.n"));
        System.out.println(conf.get("mygirlfriend"));
    }

 3.3、全局排序

方案1:一個reduceTask,添加一個緩存和:treeMap(內存:數據量不可太大),在cleanup(Context context)處理treeMap中的數據

 

方案2:多階段mapreduce,上一個mapreduce產生的結果(eg:url 總次數)作為下一側mapreduce的輸入。同時利用mapreduce對key的排序機制。二階段只是用一個reduceTask即可,當一階段產生的數據也更十分巨大時候,二級同樣可以設置多個reduceTask,但要對聚合數據的分發機制進行控制(控制數據分發:比如:大於1000w的都發給reduceTask A, 500w-1000w的發給 B)。

需求:統計request.dat中每個頁面被訪問的總次數,同時,要求輸出結果文件中的數據按照次數大小倒序排序

 

關鍵技術點:

mapreduce程序內置了一個排序機制:

map worker 和reduce worker ,都會對數據按照key的大小來排序

所以最終的輸出結果中,一定是按照key有順序的結果

 

思路:

本案例中,就可以利用這個機制來實現需求:

1、先寫一個mr程序,將每個頁面的訪問總次數統計出來

2、再寫第二個mr程序:

map階段: 讀取第一個mr產生的結果文件,將每一條數據解析成一個java對象UrlCountBean(封裝着一個url和它的總次數),然后將這個對象作為key,null作為value返回

要點:這個java對象要實現WritableComparable接口,以讓worker可以調用對象的compareTo方法來進行排序

 

reduce階段:由於worker已經對收到的數據按照UrlCountBean的compareTo方法排了序,所以,在reduce方法中,只要將數據輸出即可,最后的結果自然是按總次數大小的有序結果

3.4、手機歸屬地分區

統計每一個用戶的總流量信息,並且按照其歸屬地,將統計結果輸出在不同的文件中

1、思路:

想辦法讓map端worker在將數據分區時,按照我們需要的按歸屬地划分

實現方式:自定義一個Partitioner

2、實現

先寫一個自定義Paritioner

 

 3.4.1、數據分發機制 Partitioner

決定mapTask產生的數據發給哪一個reduceTask,分發數據的動作有mapTask來完成,數據的分發邏輯有Partitioner指定。

分發數據的動作有mapTask來完成,數據的分發邏輯有Partitioner指定。

默認按照 key 的 hashcode % reduceTask個數

 

 

如果手機號作為key,但是要求同一個省的手機號要發給同一個reduceTask,這是就需要重新設計數據的分發機制。

一個規則在程序的世界里就是一個算法,一個算法在程序的世界里就是一段代碼,一段代碼在程序的世界里一定是封裝在對象里的,一個對象在java的世界里一定是繼承某個父類,或者是實現一個結構。

框架的靈活性就在於,我們一定可以自定義一個類來實現這個結構或者繼承這個父類,提交給框架,改變原有的規則。

/**
 * 本類是提供給MapTask用的
 * MapTask通過這個類的getPartition方法,來計算它所產生的每一對kv數據該分發給哪一個reduce task
 * @author ThinkPad
 *
 */
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{
    static HashMap<String,Integer> codeMap = new HashMap<>();
    static{
        
        codeMap.put("135", 0);
        codeMap.put("136", 1);
        codeMap.put("137", 2);
        codeMap.put("138", 3);
        codeMap.put("139", 4);
        
    }
    
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        
        Integer code = codeMap.get(key.toString().substring(0, 3));
        return code==null?5:code;
    }

}

 

 在job提交器中,指定數據分區邏輯

        // 設置參數:maptask在做數據分區時,用哪個分區邏輯類  (如果不指定,它會用默認的HashPartitioner)
        job.setPartitionerClass(ProvincePartitioner.class);
        // 由於我們的ProvincePartitioner可能會產生6種分區號,所以,需要有6個reduce task來接收
        job.setNumReduceTasks(6);

3.5、倒排索引

1、先寫一個mr程序:統計出每個單詞在每個文件中的總次數,

2、然后在寫一個mr程序,讀取上述結果數據:

    map: 根據“-“”切,以單詞做key,后面一段作為value

    reduce: 拼接values里面的每一段,以單詞做key,拼接結果做value,輸出即可

a.txt

hello tom

hello jim

hello kitty

hello rose
 

hello-a.txt 4

hello-b.txt 4

hello-c.txt 4

 

java-c.txt 1

jerry-b.txt 1

jerry-c.txt 1

->

 

 

hello  a.txt-->4  b.txt-->4  c.txt-->4

b.txt

hello jerry

hello jim

hello kitty

hello jack

->

 java   c.txt-->1
c.txt

hello jerry

hello java

hello c++

hello c++
   

jerry  b.txt-->1  c.txt-->1


要點1:map方法中,如何獲取所處理的這一行數據所在的文件名?

worker在調map方法時,會傳入一個context,而context中包含了這個worker所讀取的數據切片信息,而切片信息又包含這個切片所在的文件信息

那么,就可以在map中:

FileSplit split = FileSplit) context.getInputSplit();

String fileName = split.getpath().getName();

 

要點2:setup方法                                                                
worker在正式處理數據之前,會先調用一次setup方法,所以,常利用這個機制來做一些初始化操作;

3.5.1、數據切片

在mapTask創建之初就已經明確了要處理的切片,而且切片信息會被當作信息傳遞放在context上下文,啥信息都有)中傳遞給map和reduce。

maptask和輸入切片關系示意圖:

inputsplit是一個抽象類,mr框架在具體讀數據的時候會調用不同的數據組件,比如文本組件,數據庫組件,而不同的組件產生的數據切片split的描述信息是不同的。

// 從輸入切片信息中獲取當前正在處理的一行數據所屬的文件
            FileSplit inputSplit = (FileSplit) context.getInputSplit();

 

 

3.6、分組topn

(排序控制,分區控制,分組控制)

order001,u001,小米6,1999.9,2

order001,u001,雀巢咖啡,99.0,2

order001,u001,安慕希,250.0,2

order001,u001,經典紅雙喜,200.0,4

order001,u001,防水電腦包,400.0,2

order002,u002,小米手環,199.0,3

order002,u002,榴蓮,15.0,10

order002,u002,蘋果,4.5,20

order002,u002,肥皂,10.0,40

需要求出每一個訂單中成交金額最大的三筆

本質:求分組TOPN

 

思路1:

map階段:order作為key,orderBean作為value

// 從這里交給maptask的kv對象,會被maptask序列化后存儲,所以不用擔心覆蓋的問題
            context.write(k, orderBean);

 

reduce階段:

收集同一個key(orderID為key)的所有orderBean(實現接口WritableComparable<>),將其放入集合中,對集合進行排序,輸出前n個。

public class OrderBean implements WritableComparable<OrderBean> {
@Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.orderId);
        out.writeUTF(this.userId);
        out.writeUTF(this.pdtName);
        out.writeFloat(this.price);
        out.writeInt(this.number);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.userId = in.readUTF();
        this.pdtName = in.readUTF();
        this.price = in.readFloat();
        this.number = in.readInt();
        this.amountFee = this.price * this.number;
    }

    // 比較規則:先比總金額,如果相同,再比商品名稱
    @Override
    public int compareTo(OrderBean o) {
        
        return Float.compare(o.getAmountFee(), this.getAmountFee())==0?this.pdtName.compareTo(o.getPdtName()):Float.compare(o.getAmountFee(), this.getAmountFee());
        
    }
}

 

 

map中context.write(objectkey, objectvalue),,可以將objectkey提到成員變量的位置,每次在context.wirte之前,重新是指新的值,然后輸出。context.wirte這里底層會將對象序列化並追加到臨時的文件中去,而不會像在hashMap中反復add同一個不同修改值的對象。

mr框架是一定會執行,分區,排序,分組的,因此沒有必要在思路1的reduce中排序,可以考慮利用框架的排序功能,如下

思路2:(見GroupingComparator)

實現思路:

map: 讀取數據切分字段,封裝數據到一個bean中作為key傳輸,key要按照成交金額比大小

 

reduce:利用自定義GroupingComparator將數據按訂單id進行分組,然后在reduce方法中輸出每組數據的前N條即可

3.6.1、序列化

public static class OrderTopnMapper extends Mapper<LongWritable, Text, Text, OrderBean>{
        OrderBean orderBean = new OrderBean();
        Text k = new Text();
        
        
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderBean>.Context context)
                throws IOException, InterruptedException {
            String[] fields = value.toString().split(",");
            
            orderBean.set(fields[0], fields[1], fields[2], Float.parseFloat(fields[3]), Integer.parseInt(fields[4]));
            k.set(fields[0]);
            
            // 從這里交給maptask的kv對象,會被maptask序列化后存儲,所以不用擔心覆蓋的問題
            context.write(k, orderBean);
            
        }
        
    }

 

// reduce task提供的values迭代器,每次迭代返回給我們的都是同一個對象,只是set了不同的值
            for (OrderBean orderBean : values) {
                
                // 構造一個新的對象,來存儲本次迭代出來的值
                OrderBean newBean = new OrderBean();
                newBean.set(orderBean.getOrderId(), orderBean.getUserId(), orderBean.getPdtName(), orderBean.getPrice(), orderBean.getNumber());
                
                beanList.add(newBean);
            }

 

如下:hashmap中會保留三個一樣的引用

public static void main(String[] args) throws FileNotFoundException, IOException {
        
        ArrayList<OrderBean> beans = new ArrayList<>();
        
        OrderBean bean = new OrderBean();
        
        bean.set("1", "u", "a", 1.0f, 2);

        bean.set("2", "t", "b", 2.0f, 3);

        bean.set("3", "r", "c", 2.0f, 3);
     System.out.println(beans);    
    }

 

 3.6.2、GroupingComparator-如何控制分組

在數據按照特定的分發規則發給reduceTask之前,數據會傳遞給mr框架,框架對收到的數據按照key自帶的排序規則進行排序,接下來將數據發給對應的reduceTask,對數據統一組的數據進行一次聚合,這里就涉及一個分組機制GroupingComparator(內部有一個compare(obj1,obj2)方法),因為reduceTask需要知道哪些數據是同一組。

還以分組topn為例

mapreduce機制總結 數據分發Partitioner、key值排序ComparableGroupingComparator

GroupingComparator應用示例--求分組topn

1、reduce中values迭代器,沒迭代一次,key的值也會跟新一次

2、reduce會把mapTask傳遞過來的數據保存到硬盤文件中(數據量很大的時候內存中是放不下的),既然放在文件中,就會涉及序列化和反序列化。

3、GroupingComparator中必須要需要明確反序列化的類型

 

分組topn

orderBean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;

import org.apache.hadoop.io.WritableComparable;

public class OrderBean implements WritableComparable<OrderBean>{

    private String orderId;
    private String userId;
    private String pdtName;
    private float price;
    private int number;
    private float amountFee;

    public void set(String orderId, String userId, String pdtName, float price, int number) {
        this.orderId = orderId;
        this.userId = userId;
        this.pdtName = pdtName;
        this.price = price;
        this.number = number;
        this.amountFee = price * number;
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getUserId() {
        return userId;
    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    public String getPdtName() {
        return pdtName;
    }

    public void setPdtName(String pdtName) {
        this.pdtName = pdtName;
    }

    public float getPrice() {
        return price;
    }

    public void setPrice(float price) {
        this.price = price;
    }

    public int getNumber() {
        return number;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public float getAmountFee() {
        return amountFee;
    }

    public void setAmountFee(float amountFee) {
        this.amountFee = amountFee;
    }

    @Override
    public String toString() {

        return this.orderId + "," + this.userId + "," + this.pdtName + "," + this.price + "," + this.number + ","
                + this.amountFee;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(this.orderId);
        out.writeUTF(this.userId);
        out.writeUTF(this.pdtName);
        out.writeFloat(this.price);
        out.writeInt(this.number);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.userId = in.readUTF();
        this.pdtName = in.readUTF();
        this.price = in.readFloat();
        this.number = in.readInt();
        this.amountFee = this.price * this.number;
    }

    // 比較規則:先比總金額,如果相同,再比商品名稱
    @Override
    public int compareTo(OrderBean o) {
        
        return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getAmountFee(), this.getAmountFee()):this.orderId.compareTo(o.getOrderId());
        
    }

}
View Code

 

partitioner

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class OrderIdPartitioner extends Partitioner<OrderBean, NullWritable>{

    @Override
    public int getPartition(OrderBean key, NullWritable value, int numPartitions) {
        // 按照訂單中的orderid來分發數據
        return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
    }

}
View Code

 

groupcomparator

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class OrderIdGroupingComparator extends WritableComparator{
    
    public OrderIdGroupingComparator() {
        super(OrderBean.class,true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        
        OrderBean o1 = (OrderBean) a;
        OrderBean o2 = (OrderBean) b;
        
        return o1.getOrderId().compareTo(o2.getOrderId());
    }
    
    

}
View Code

 

mr、job

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class OrderTopn {

    public static class OrderTopnMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
        OrderBean orderBean = new OrderBean();
        NullWritable v = NullWritable.get();
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            
            String[] fields = value.toString().split(",");
            
            orderBean.set(fields[0], fields[1], fields[2], Float.parseFloat(fields[3]), Integer.parseInt(fields[4]));
            
            context.write(orderBean,v);
        }
        
        
    }
    
    
    public static class OrderTopnReducer extends Reducer< OrderBean, NullWritable,  OrderBean, NullWritable>{
        
        /**
         * 雖然reduce方法中的參數key只有一個,但是只要迭代器迭代一次,key中的值就會變
         */
        @Override
        protected void reduce(OrderBean key, Iterable<NullWritable> values,
                Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context)
                throws IOException, InterruptedException {
            int i=0;
            for (NullWritable v : values) {
                context.write(key, v);
                if(++i==3) return;
            }
            
        }
        
        
    }
    
    public static void main(String[] args) throws Exception {

        
        Configuration conf = new Configuration(); // 默認只加載core-default.xml core-site.xml
        conf.setInt("order.top.n", 2);
        
        Job job = Job.getInstance(conf);

        job.setJarByClass(OrderTopn.class);

        job.setMapperClass(OrderTopnMapper.class);
        job.setReducerClass(OrderTopnReducer.class);
        
        job.setPartitionerClass(OrderIdPartitioner.class);//控制分區
        job.setGroupingComparatorClass(OrderIdGroupingComparator.class);//控制分組
        
        job.setNumReduceTasks(2);

        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\order\\input"));
        FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\order\\out-3"));

        job.waitForCompletion(true);
    }
    
}
View Code

 

3.7、共同好友

 

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

->

one

map:

B是A的好友

B是E的好友

B是J的好友

reduce:

(B:A E J)

A-E:B

A-J:B

E-j:B

->

two

 

 

map:

wirte(A-E,B)

 

 

 

recude:

A-E:B,?,?

3.8、控制輸入,輸出

不僅僅局限於讀取hdfs文件,可以替換數據輸入組件數據輸出組件,對象可以是數據庫等。

FileInputFormat

  |--TextInputFormat

  |--SequenceFileInputFormat

  |--DBInputFormat

FileOutputFormat

  |--TextOutputFormat

  |--SequenceFileOutputFormat

 

SequenceFile文件是hadoop定義的一種文件,里面存放的是大量key-value的對象序列化字節(文件頭部還存放了key和value所屬的類型名);

3.9、 數據傾斜

 將key特別多的那組數據分散個不同的reduce。這樣一來recude聚合的數據就會是局部的,有可能需要在做一步mapreduce,得到全局的結果。

通用解決方案:將相同的key打散

具體做法:任何一個key都追加一個隨機字符串/數字

 

3.10、combiner

mapTask段可以利用combiner(直接使用reduce接口)進行局部聚合,reduceTask做的是全局聚合;

combiner主要用來避免mapTask產生大量數據,占用網絡帶寬,形成性能瓶頸;

當然也可以用來解決數據傾斜

 

// 設置maptask端的局部聚合邏輯類
        job.setCombinerClass(WordcountReducer.class);

 

 

 

 3.11、join場景

訂單信息在一張表,用戶信息在一張表;現要將用戶信息追加到點單表中。

4、mapreduce內部核心機制原理

mr框架如何控制分區

mr框架如何控制排序

mr框架如何扣控制分組

mr框架如何輸入輸出組件

map邏輯

reduce邏輯

4.1、mapreduce框架內部核心工作機制詳解

 

4.2、mapreduce程序在YARN上啟動-運行-注銷的全流程

mrappmaster

 

4.2.1、yarn的資源參數配置

yarn.scheduler.minimum-allocation-mb  默認值:1024  // yarn分配一個容器時最低內存

yarn.scheduler.maximum-allocation-mb  默認值:8192  // yarn分配一個容器時最大內存

yarn.scheduler.minimum-allocation-vcores  默認值:1  // yarn分配一個容器時最少cpu核數

yarn.scheduler.maximum-allocation-vcores  默認值:32 // yarn分配一個容器時最多cpu核數



// 1個nodemanager擁有的總內存資源

yarn.nodemanager.resource.memory-mb  默認值:8192 

 

// 1個nodemanager擁有的總cpu資源(邏輯的,表示比例而已)

yarn.nodemanager.resource.cpu-vcores   默認值:8 

 

4.3、Hadoop-HA機制整體解析

 

 

 mapreduce要點復習

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM