主要內容:
- mapreduce編程模型再解釋;
- ob提交方式:
- windows->yarn
- windows->local ;
- linux->local
- linux->yarn;
- 本地運行debug調試觀察
mapreduce體系很龐大,我們需要一條合適的線,來慢慢的去理解和學習。
1、mapreduce編程模型和mapreduce模型實現程序之間的關系
1.1、mapreduce的編程模型
對mapreduce的總結:
如果只考慮數據處理的邏輯,撇開分布式的概念,其實mapreduce就只是一個編程模型了,而不是一個框架了。在這個編程模型里數據處理分為兩個節點,一個map階段一個reduce階段。
map階段要做的事情:就是吧原始的輸入數據轉換成大量的key-value數據,結合wordcont實例,key相同的數據會作為一組,形成若干組數據,接着就是這些組數據,一組一組的進行reduce階段處理,每組reduce一次。
reduce階段要做的事情:一組(key相同的數據)聚合運算一次。
一wordcount為例:數據被一行一行的讀進來,按照空格進行拆分,將句子形成一個個(word,1)形式的鍵值對,map階段就結束了;reduce階段就是把單詞相同的數據作為一組進行聚合,聚合邏輯就是把該組內的全部value累加在一起。
1.2、關系梳理
以上就是mapreduce的編程模型,編程模型並不能代表hadoop中的mapreduce框架,mapreduce編程模型其實就是一種典型的數據運算的邏輯模型,無論是hadoop-mapreduce運算框架也好,還是spark運算框架也好,都是具體的程序,都是對mapreduce編程模型的一種實現。而且hadoop中實現該模型時,在map階段寫了一個程序叫做map Task,在reduce 階段寫了一個程序叫做reduce Task;子spark里面,只不過時換了另外的名字,思想都一樣。
以后在寫mapreduce程序的時候,在寫業務邏輯的時候只需要考慮編程模型就可以了,框架已經將實現上的一些東西都封裝起來了,也就是說,要編寫一個業務邏輯我們需要考慮的是,map將來產生什么樣的key-vlue,將來相同的key就會作為一組沒reduce聚合一次。
2、job提交方式
2.1、windows-to-yarn / local
local:用於本地測試,無需打包成jar也無需提交。
Configuration conf = new Configuration(); //conf.set("fs.defaultFS", "file:///"); //默認指就是這樣 //conf.set("mapreduce.framework.name", "local"); //默認就是這樣
若出現如下,錯誤,需要將hadoop配入window的環境變量中,同時將hadoop的bin目錄配置到path中。
yarn:【比較繁瑣】
目前為止我們需要寫一個mapper實現類實現map階段的邏輯,和寫一個reduce實現類實現reduce階段的 邏輯,和一個job提交器,提交job。
提交方式有多中,在上個筆記中,介紹了windows跨平台提交到yarn集群中,比較麻煩需要指定文件系統,需要知名job提交到哪里運行,還需要提供有權限的hdfs用戶,還需要兼容跨平台。如下:
// 在代碼中設置JVM系統參數,用於給job對象來獲取訪問HDFS的用戶身份 // 或者通過eclipse圖形化界面來設置 -DHADOOP_USER_NAME=root System.setProperty("HADOOP_USER_NAME", "root") ; Configuration conf = new Configuration(); // 1、設置job運行時要訪問的默認文件系統, map階段要去讀數據,reduce階段要寫數據 conf.set("fs.defaultFS", "hdfs://hdp-01:9000"); // 2、設置job提交到哪去運行:有本地模擬的方式local conf.set("mapreduce.framework.name", "yar n"); conf.set("yarn.resourcemanager.hostname", "hdp-01"); // 3、如果要從windows系統上運行這個job提交客戶端程序,則需要加這個跨平台提交的參數 conf.set("mapreduce.app-submission.cross-platform","true");
2.2、linux-to-yarn / local
若不配是上述參數直接將jar包上傳到hadoop集群中的任何一台機器上,在linxu機器中運行jar包中的job提交器(自己寫的jobsubmit),工具類會將jar包提交給local or(yarn,要看linux機器的配置參數是yarn還是local)無需在配置上述提到的參數,為什么呢?
Configuration conf = new Configuration(); //沒有指定默認文件系統
//沒有指定mapreduce-job提交到哪里運行
job.getInstance(conf)
使用hadoop jar命令而不是java -cp path1:path2... xxx.xx.xx.jobsubmiter
hadoop jar會把這台機器上的hadoop安裝包中的所有jar包,以及所有配置文件都加載到本次運行java程序的classpath中。
這就是不用配置上述提到的參數,的原因,job提交工具程序中有一行代碼如下,會將類路徑下的配置信息全部加載進去,會將mapred-defalut.xml讀入。
Configuration conf = new Configuration();
/* * 如果要在hadoop集群的某台機器上啟動這個job提交客戶端的話 * conf里面就不需要指定 fs.defaultFS mapreduce.framework.name * * 因為在集群機器上用 hadoop jar xx.jar cn.edu360.mr.wc.JobSubmitter2 命令來啟動客戶端main方法時, * hadoop jar這個命令會將所在機器上的hadoop安裝目錄中的jar包和配置文件加入到運行時的classpath中 * * 那么,我們的客戶端main方法中的new Configuration()語句就會加載classpath中的配置文件,自然就有了 * fs.defaultFS 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 這些參數配置 */
如下圖,在window中提交job是,從日志信息可以看出,首先連接ResourceManager,連接成功之后ResourceManager為其指定本次的jobID。對比在linux中提交,發現linux中運行速度很快,而且沒有日志顯示連接ResourceManager,而且jobID也命名中有local字樣,因為沒有指定job提交到yarn集群,默認提交到了本地模擬器(LocalJobRunner)。因為參數mapreduce.framework.name默認locl。我們可以在代碼中添加配置,無論提交到集群中的哪一台機子,都會去找yarn中的ResourceManager(配置文件中配置了地址),或者修改服務器的mapred-site.xml的參數值為yarn來覆蓋jar包中mapred-default.xml中的local。
jar包中的mapred-defalut.xml中的默認值。
3、案例一
3.1、流量統計
現在有一批用戶上網行為日志,需要統計日志記錄中的用戶上行流量和下行流量,以及流量總和;
需要統計多個value值時,可以考慮將多個value封裝成一個valueBean對象,當然Bean對象需要實現hadoop的序列化接口(必須提供無參構造)
分析:Mapper<LongWritable, Text, Text, FlowBean>
Reducer<Text, FlowBean, Text, FlowBean>
1363157993044 182******61 94-**-**-**-**-18:XXXX-YYYY xxx.xxx.xxx.xx iface.qiyi.com 視頻網站 15 12 1527 2106 200
3.1.1、自定義數據類型value
需要實現hadoop網絡序列化接口,需要實現序列化和反序列化方法
本案例的功能:演示自定義數據類型如何實現hadoop的序列化接口
1、該類一定要保留空參構造函數
2、write方法中輸出字段二進制數據的順序 要與 readFields方法讀取數據的順序一致
/** * hadoop系統在序列化該類的對象時要調用的方法 */ @Override public void write(DataOutput out) throws IOException { } /** * hadoop系統在反序列化該類的對象時要調用的方法 */ @Override public void readFields(DataInput in) throws IOException { }
3.1.2、自定義類型Key-Comparable
mapReduce的reduce在收集key-value的時候會按照key進行排序(內部排序機制),因此提供自定義得數據類型,作為key,必須實現比較接口和序列化接口,hadoop提供了一個合二為一的接口WritableComparable extend writable,Comparable
3.2、topK統計
現有一批url訪問日志,統計出訪問量最高的前5個網站。
分析:當存在不止1個reduceTask的時候,每個reduceTask拿到的數據都是局部信息,統計得到的結果也都是局部結果。
方案1:只提供一個reduce Task,使用數據量很小的時候
方案2:多階段mapreduce當數據量很大的時候,上述方法就失去了分布式的優勢,此時可以提供多階段的mapReduce任務,下一次任務利用上一次產生的數據。
3.2.1、cleanup(Context context)
要點1:每一個reduce worker程序,會在處理完自己的所有數據后,調用一次cleanup方法
cleanup()函數的執行時機:假如該 reduceTask 接收到3組聚合數據,待3組數據的聚合工作都完成時候,會調用 一次cleanup()函數。
因此可以在cleanup()函數中進行結果排序,找出前幾名。(TreeMap是有序的)
3.2.2、通過conf傳參topK
要點2:如何向map和reduce傳自定義參數
從JobSubmitter的main方法中,可以向map worker和reduce worker傳遞自定義參數(通過configuration對象來寫入自定義參數);然后,我們的map方法和reduce方法中,可以通過context.getConfiguration()來取自定義參數
Configuration conf = new Configuration() //
這一句代碼,會加載mr工程jar包中的hadoop依賴jar中的各默認配置文件*-default.xml
然后,會加載mr工程中自己的放置的*-site.xml
然后,還可以在代碼中conf.set("參數名","參數值")
另外,mr工程打成jar包后,在hadoop集群的機器上,用hadoop jar mr.jar xx.yy.MainClass
運行時,hadoop jar命令會將這台機器上的hadoop安裝目錄中的所有jar包和配置文件通通加入運行時的classpath,
配置參數的優先級:
1、依賴jar中的默認配置
2、環境中的*-site.xml
3、工程中的*-site.xml
4、代碼中set的參數
優先級一次增大,高優先級的參數值會覆蓋低優先級的參數值
可以通過conf將參數傳遞到reducer中。
reducer方法有個參數Context context;context.getConfiguration()可以拿到job提交器中設置的參數。
傳遞方式有多多種
/**
* 通過代碼設置參數 */ conf.setInt("top.n", 3); conf.setInt("top.n", Integer.parseInt(args[0]));
/** * 通過屬性配置文件獲取參數 */ Properties props = new Properties(); props.load(JobSubmitter.class.getClassLoader().getResourceAsStream("topn.properties")); conf.setInt("top.n", Integer.parseInt(props.getProperty("top.n")));
通過main函數傳遞參數
通過.xml配置文件傳參
new Configration()默認加載core-default.xml core-site.xml 不會加載jar包里的hdfs-site.xml hdfs-default.xml,mapred-site.xml
可以加載自定義的xml文件
<configuration> <property> <name>top.n</name> <value>6</value> </property> </configuration>
public static void main(String[] args) { Configuration conf = new Configuration(); conf.addResource("xx-oo.xml"); System.out.println(conf.get("top.n")); System.out.println(conf.get("mygirlfriend")); }
3.3、全局排序
方案1:一個reduceTask,添加一個緩存和:treeMap(內存:數據量不可太大),在cleanup(Context context)處理treeMap中的數據
方案2:多階段mapreduce,上一個mapreduce產生的結果(eg:url 總次數)作為下一側mapreduce的輸入。同時利用mapreduce對key的排序機制。二階段只是用一個reduceTask即可,當一階段產生的數據也更十分巨大時候,二級同樣可以設置多個reduceTask,但要對聚合數據的分發機制進行控制(控制數據分發:比如:大於1000w的都發給reduceTask A, 500w-1000w的發給 B)。
需求:統計request.dat中每個頁面被訪問的總次數,同時,要求輸出結果文件中的數據按照次數大小倒序排序
關鍵技術點:
mapreduce程序內置了一個排序機制:
map worker 和reduce worker ,都會對數據按照key的大小來排序
所以最終的輸出結果中,一定是按照key有順序的結果
思路:
本案例中,就可以利用這個機制來實現需求:
1、先寫一個mr程序,將每個頁面的訪問總次數統計出來
2、再寫第二個mr程序:
map階段: 讀取第一個mr產生的結果文件,將每一條數據解析成一個java對象UrlCountBean(封裝着一個url和它的總次數),然后將這個對象作為key,null作為value返回
要點:這個java對象要實現WritableComparable接口,以讓worker可以調用對象的compareTo方法來進行排序
reduce階段:由於worker已經對收到的數據按照UrlCountBean的compareTo方法排了序,所以,在reduce方法中,只要將數據輸出即可,最后的結果自然是按總次數大小的有序結果
3.4、手機歸屬地分區
統計每一個用戶的總流量信息,並且按照其歸屬地,將統計結果輸出在不同的文件中
1、思路:
想辦法讓map端worker在將數據分區時,按照我們需要的按歸屬地划分
實現方式:自定義一個Partitioner
2、實現
先寫一個自定義Paritioner
3.4.1、數據分發機制 Partitioner
決定mapTask產生的數據發給哪一個reduceTask,分發數據的動作有mapTask來完成,數據的分發邏輯有Partitioner指定。
分發數據的動作有mapTask來完成,數據的分發邏輯有Partitioner指定。
默認按照 key 的 hashcode % reduceTask個數
如果手機號作為key,但是要求同一個省的手機號要發給同一個reduceTask,這是就需要重新設計數據的分發機制。
一個規則在程序的世界里就是一個算法,一個算法在程序的世界里就是一段代碼,一段代碼在程序的世界里一定是封裝在對象里的,一個對象在java的世界里一定是繼承某個父類,或者是實現一個結構。
框架的靈活性就在於,我們一定可以自定義一個類來實現這個結構或者繼承這個父類,提交給框架,改變原有的規則。
/** * 本類是提供給MapTask用的 * MapTask通過這個類的getPartition方法,來計算它所產生的每一對kv數據該分發給哪一個reduce task * @author ThinkPad * */ public class ProvincePartitioner extends Partitioner<Text, FlowBean>{ static HashMap<String,Integer> codeMap = new HashMap<>(); static{ codeMap.put("135", 0); codeMap.put("136", 1); codeMap.put("137", 2); codeMap.put("138", 3); codeMap.put("139", 4); } @Override public int getPartition(Text key, FlowBean value, int numPartitions) { Integer code = codeMap.get(key.toString().substring(0, 3)); return code==null?5:code; } }
在job提交器中,指定數據分區邏輯
// 設置參數:maptask在做數據分區時,用哪個分區邏輯類 (如果不指定,它會用默認的HashPartitioner) job.setPartitionerClass(ProvincePartitioner.class); // 由於我們的ProvincePartitioner可能會產生6種分區號,所以,需要有6個reduce task來接收 job.setNumReduceTasks(6);
3.5、倒排索引
1、先寫一個mr程序:統計出每個單詞在每個文件中的總次數,
2、然后在寫一個mr程序,讀取上述結果數據:
map: 根據“-“”切,以單詞做key,后面一段作為value
reduce: 拼接values里面的每一段,以單詞做key,拼接結果做value,輸出即可
a.txt | hello tom hello jim hello kitty hello rose |
hello-a.txt 4 hello-b.txt 4 hello-c.txt 4
java-c.txt 1 jerry-b.txt 1 jerry-c.txt 1 |
->
|
hello a.txt-->4 b.txt-->4 c.txt-->4 |
|
b.txt | hello jerry hello jim hello kitty hello jack |
-> |
java c.txt-->1 | ||
c.txt | hello jerry hello java hello c++ hello c++ |
jerry b.txt-->1 c.txt-->1 |
要點1:map方法中,如何獲取所處理的這一行數據所在的文件名?
worker在調map方法時,會傳入一個context,而context中包含了這個worker所讀取的數據切片信息,而切片信息又包含這個切片所在的文件信息
那么,就可以在map中:
FileSplit split = FileSplit) context.getInputSplit();
String fileName = split.getpath().getName();
要點2:setup方法
worker在正式處理數據之前,會先調用一次setup方法,所以,常利用這個機制來做一些初始化操作;
3.5.1、數據切片
在mapTask創建之初就已經明確了要處理的切片,而且切片信息會被當作信息傳遞放在context(上下文,啥信息都有)中傳遞給map和reduce。
maptask和輸入切片關系示意圖:
inputsplit是一個抽象類,mr框架在具體讀數據的時候會調用不同的數據組件,比如文本組件,數據庫組件,而不同的組件產生的數據切片split的描述信息是不同的。
// 從輸入切片信息中獲取當前正在處理的一行數據所屬的文件 FileSplit inputSplit = (FileSplit) context.getInputSplit();
3.6、分組topn
(排序控制,分區控制,分組控制)
order001,u001,小米6,1999.9,2 order001,u001,雀巢咖啡,99.0,2 order001,u001,安慕希,250.0,2 order001,u001,經典紅雙喜,200.0,4 order001,u001,防水電腦包,400.0,2 order002,u002,小米手環,199.0,3 order002,u002,榴蓮,15.0,10 order002,u002,蘋果,4.5,20 order002,u002,肥皂,10.0,40 |
需要求出每一個訂單中成交金額最大的三筆
本質:求分組TOPN
思路1:
map階段:order作為key,orderBean作為value
// 從這里交給maptask的kv對象,會被maptask序列化后存儲,所以不用擔心覆蓋的問題 context.write(k, orderBean);
reduce階段:
收集同一個key(orderID為key)的所有orderBean(實現接口WritableComparable<>),將其放入集合中,對集合進行排序,輸出前n個。
public class OrderBean implements WritableComparable<OrderBean> { @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.orderId); out.writeUTF(this.userId); out.writeUTF(this.pdtName); out.writeFloat(this.price); out.writeInt(this.number); } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.userId = in.readUTF(); this.pdtName = in.readUTF(); this.price = in.readFloat(); this.number = in.readInt(); this.amountFee = this.price * this.number; } // 比較規則:先比總金額,如果相同,再比商品名稱 @Override public int compareTo(OrderBean o) { return Float.compare(o.getAmountFee(), this.getAmountFee())==0?this.pdtName.compareTo(o.getPdtName()):Float.compare(o.getAmountFee(), this.getAmountFee()); } }
map中context.write(objectkey, objectvalue),,可以將objectkey提到成員變量的位置,每次在context.wirte之前,重新是指新的值,然后輸出。context.wirte這里底層會將對象序列化並追加到臨時的文件中去,而不會像在hashMap中反復add同一個不同修改值的對象。
mr框架是一定會執行,分區,排序,分組的,因此沒有必要在思路1的reduce中排序,可以考慮利用框架的排序功能,如下
思路2:(見GroupingComparator)
實現思路:
map: 讀取數據切分字段,封裝數據到一個bean中作為key傳輸,key要按照成交金額比大小
reduce:利用自定義GroupingComparator將數據按訂單id進行分組,然后在reduce方法中輸出每組數據的前N條即可
3.6.1、序列化
public static class OrderTopnMapper extends Mapper<LongWritable, Text, Text, OrderBean>{ OrderBean orderBean = new OrderBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, OrderBean>.Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); orderBean.set(fields[0], fields[1], fields[2], Float.parseFloat(fields[3]), Integer.parseInt(fields[4])); k.set(fields[0]); // 從這里交給maptask的kv對象,會被maptask序列化后存儲,所以不用擔心覆蓋的問題 context.write(k, orderBean); } }
// reduce task提供的values迭代器,每次迭代返回給我們的都是同一個對象,只是set了不同的值 for (OrderBean orderBean : values) { // 構造一個新的對象,來存儲本次迭代出來的值 OrderBean newBean = new OrderBean(); newBean.set(orderBean.getOrderId(), orderBean.getUserId(), orderBean.getPdtName(), orderBean.getPrice(), orderBean.getNumber()); beanList.add(newBean); }
如下:hashmap中會保留三個一樣的引用
public static void main(String[] args) throws FileNotFoundException, IOException { ArrayList<OrderBean> beans = new ArrayList<>(); OrderBean bean = new OrderBean(); bean.set("1", "u", "a", 1.0f, 2); bean.set("2", "t", "b", 2.0f, 3); bean.set("3", "r", "c", 2.0f, 3); System.out.println(beans); }
3.6.2、GroupingComparator-如何控制分組
在數據按照特定的分發規則發給reduceTask之前,數據會傳遞給mr框架,框架對收到的數據按照key自帶的排序規則進行排序,接下來將數據發給對應的reduceTask,對數據統一組的數據進行一次聚合,這里就涉及一個分組機制GroupingComparator(內部有一個compare(obj1,obj2)方法),因為reduceTask需要知道哪些數據是同一組。
還以分組topn為例
mapreduce機制總結 數據分發Partitioner、key值排序Comparable、GroupingComparator
GroupingComparator應用示例--求分組topn
1、reduce中values迭代器,沒迭代一次,key的值也會跟新一次
2、reduce會把mapTask傳遞過來的數據保存到硬盤文件中(數據量很大的時候內存中是放不下的),既然放在文件中,就會涉及序列化和反序列化。
3、GroupingComparator中必須要需要明確反序列化的類型
分組topn
orderBean

import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; import org.apache.hadoop.io.WritableComparable; public class OrderBean implements WritableComparable<OrderBean>{ private String orderId; private String userId; private String pdtName; private float price; private int number; private float amountFee; public void set(String orderId, String userId, String pdtName, float price, int number) { this.orderId = orderId; this.userId = userId; this.pdtName = pdtName; this.price = price; this.number = number; this.amountFee = price * number; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getUserId() { return userId; } public void setUserId(String userId) { this.userId = userId; } public String getPdtName() { return pdtName; } public void setPdtName(String pdtName) { this.pdtName = pdtName; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } public int getNumber() { return number; } public void setNumber(int number) { this.number = number; } public float getAmountFee() { return amountFee; } public void setAmountFee(float amountFee) { this.amountFee = amountFee; } @Override public String toString() { return this.orderId + "," + this.userId + "," + this.pdtName + "," + this.price + "," + this.number + "," + this.amountFee; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.orderId); out.writeUTF(this.userId); out.writeUTF(this.pdtName); out.writeFloat(this.price); out.writeInt(this.number); } @Override public void readFields(DataInput in) throws IOException { this.orderId = in.readUTF(); this.userId = in.readUTF(); this.pdtName = in.readUTF(); this.price = in.readFloat(); this.number = in.readInt(); this.amountFee = this.price * this.number; } // 比較規則:先比總金額,如果相同,再比商品名稱 @Override public int compareTo(OrderBean o) { return this.orderId.compareTo(o.getOrderId())==0?Float.compare(o.getAmountFee(), this.getAmountFee()):this.orderId.compareTo(o.getOrderId()); } }
partitioner

import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner; public class OrderIdPartitioner extends Partitioner<OrderBean, NullWritable>{ @Override public int getPartition(OrderBean key, NullWritable value, int numPartitions) { // 按照訂單中的orderid來分發數據 return (key.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
groupcomparator

import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class OrderIdGroupingComparator extends WritableComparator{ public OrderIdGroupingComparator() { super(OrderBean.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { OrderBean o1 = (OrderBean) a; OrderBean o2 = (OrderBean) b; return o1.getOrderId().compareTo(o2.getOrderId()); } }
mr、job

import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class OrderTopn { public static class OrderTopnMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{ OrderBean orderBean = new OrderBean(); NullWritable v = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException { String[] fields = value.toString().split(","); orderBean.set(fields[0], fields[1], fields[2], Float.parseFloat(fields[3]), Integer.parseInt(fields[4])); context.write(orderBean,v); } } public static class OrderTopnReducer extends Reducer< OrderBean, NullWritable, OrderBean, NullWritable>{ /** * 雖然reduce方法中的參數key只有一個,但是只要迭代器迭代一次,key中的值就會變 */ @Override protected void reduce(OrderBean key, Iterable<NullWritable> values, Reducer<OrderBean, NullWritable, OrderBean, NullWritable>.Context context) throws IOException, InterruptedException { int i=0; for (NullWritable v : values) { context.write(key, v); if(++i==3) return; } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); // 默認只加載core-default.xml core-site.xml conf.setInt("order.top.n", 2); Job job = Job.getInstance(conf); job.setJarByClass(OrderTopn.class); job.setMapperClass(OrderTopnMapper.class); job.setReducerClass(OrderTopnReducer.class); job.setPartitionerClass(OrderIdPartitioner.class);//控制分區 job.setGroupingComparatorClass(OrderIdGroupingComparator.class);//控制分組 job.setNumReduceTasks(2); job.setMapOutputKeyClass(OrderBean.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderBean.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\order\\input")); FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\order\\out-3")); job.waitForCompletion(true); } }
3.7、共同好友
A:B,C,D,F,E,O |
-> | one map: B是A的好友 B是E的好友 B是J的好友 reduce: (B:A E J) A-E:B A-J:B E-j:B |
-> |
two
map: wirte(A-E,B)
recude: A-E:B,?,? |
3.8、控制輸入,輸出
不僅僅局限於讀取hdfs文件,可以替換數據輸入組件和數據輸出組件,對象可以是數據庫等。
FileInputFormat
|--TextInputFormat
|--SequenceFileInputFormat
|--DBInputFormat
FileOutputFormat
|--TextOutputFormat
|--SequenceFileOutputFormat
SequenceFile文件是hadoop定義的一種文件,里面存放的是大量key-value的對象序列化字節(文件頭部還存放了key和value所屬的類型名);
3.9、 數據傾斜
將key特別多的那組數據分散個不同的reduce。這樣一來recude聚合的數據就會是局部的,有可能需要在做一步mapreduce,得到全局的結果。
通用解決方案:將相同的key打散
具體做法:任何一個key都追加一個隨機字符串/數字
3.10、combiner
mapTask段可以利用combiner(直接使用reduce接口)進行局部聚合,reduceTask做的是全局聚合;
combiner主要用來避免mapTask產生大量數據,占用網絡帶寬,形成性能瓶頸;
當然也可以用來解決數據傾斜
// 設置maptask端的局部聚合邏輯類 job.setCombinerClass(WordcountReducer.class);
3.11、join場景
訂單信息在一張表,用戶信息在一張表;現要將用戶信息追加到點單表中。
4、mapreduce內部核心機制原理
mr框架如何控制分區
mr框架如何控制排序
mr框架如何扣控制分組
mr框架如何輸入輸出組件
map邏輯
reduce邏輯
4.1、mapreduce框架內部核心工作機制詳解
4.2、mapreduce程序在YARN上啟動-運行-注銷的全流程
mrappmaster
4.2.1、yarn的資源參數配置
yarn.scheduler.minimum-allocation-mb 默認值:1024 // yarn分配一個容器時最低內存 yarn.scheduler.maximum-allocation-mb 默認值:8192 // yarn分配一個容器時最大內存 yarn.scheduler.minimum-allocation-vcores 默認值:1 // yarn分配一個容器時最少cpu核數 yarn.scheduler.maximum-allocation-vcores 默認值:32 // yarn分配一個容器時最多cpu核數 // 1個nodemanager擁有的總內存資源 yarn.nodemanager.resource.memory-mb 默認值:8192 // 1個nodemanager擁有的總cpu資源(邏輯的,表示比例而已) yarn.nodemanager.resource.cpu-vcores 默認值:8
4.3、Hadoop-HA機制整體解析
mapreduce要點復習