-
氣象數據集
我們要寫一個氣象數據挖掘的程序。氣象數據是通過分布在美國各地區的很多氣象傳感器每隔一小時進行收集,這些數據是半結構化數據且是按照記錄方式存儲的,因此非常適合使用 MapReduce 程序來統計分析。
-
數據格式
我們使用的數據來自美國國家氣候數據中心、美國國家海洋和大氣管理局(簡稱 NCDC NOAA),這些數據按行並以 ASCII 格式存儲,其中每一行是一條記錄。
-
下面我們展示一行采樣數據,其中重要的字段被突出顯示。該行數據被分割成很多行以突出每個字段,但在實際文件中,這些字段被整合成一行且沒有任何分隔符。
1998 # 年
03 # 月
09 # 日
17 # 時
11 # 氣溫
-100 # 濕度
10237 # 氣壓
60 # 風向
72 # 風速
0 # 天氣狀況
0 # 每一小時的降雨量
-9999 # 每一小時的降雨量
- 數據文件按照氣象站和日期進行組織,數據文件示例如下所示:
-
分析
MapReduce 任務過程分為兩個處理階段:map 階段和reduce階段。每個階段都以鍵值對作為輸入和輸出,其類型由我們自己選擇。 我們還需要寫兩個函數:map 函數和reduce 函數。
在這里,map階段的輸入是NCDC NOAA原始數據。我們選擇文本格式作為輸入格式,將數據集的每一行作為文本輸入。鍵是某一行起始位置相對於文件起始位置的偏移量,不過我們不需要這個信息,所以將其忽略。
我們的map函數很簡單。由於我們只對氣象站和氣溫感興趣,所以只需要取出這兩個字段數據。在本實戰中,map 函數只是一個數據准備階段,通過這種方式來准備數據,使 reducer 函數繼續對它進行處理:即統計出每個氣象站30年來的平均氣溫。map 函數還是一個比較合適去除已損記錄的地方,在 map 函數里面,我們可以篩掉缺失的或者錯誤的氣溫數據。
為了全面了解 map 的工作方式,輸入以下數據作為演示
1985 07 31 02 200 94 10137 220 26 1 0 -9999
1985 07 31 03 172 94 10142 240 0 0 0 -9999
1985 07 31 04 156 83 10148 260 10 0 0 -9999
1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999
1985 07 31 06 122 72 -9999 90 0 -9999 0 0
1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999
1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999
1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999
1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999
1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999
這些數據,以鍵/值對的方式作為 map函數的輸入,如下所示
(0, 1985 07 31 02 200 94 10137 220 26 1 0 -9999)
(62, 1985 07 31 03 172 94 10142 240 0 0 0 -9999)
(124,1985 07 31 04 156 83 10148 260 10 0 0 -9999)
(186,1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999)
(248,1985 07 31 06 122 72 -9999 90 0 -9999 0 0)
(310,1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999)
(371,1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999)
(434,1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999)
(497,1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999)
(560,1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999)
鍵(key)是文件中的偏移量,這里不需要這個信息,所以將其忽略。map 函數的功能僅限於提取氣象站和氣溫信息,並將它們輸出;map 函數的輸出經由 MapReduce 框架處理后,最后發送到reduce函數。這個處理過程基於鍵來對鍵值對進行排序和分組。因此在這個示例中,reduce 函數看到的是如下輸入:
(03103,[200,172,156,133,122,117,111,111,106,100])
每個氣象站后面緊跟着一系列氣溫數據,reduce 函數現在要做的是遍歷整個列表並統計出平均氣溫:
03103 132
上面就是最終輸出結果即每一個氣象站歷年的平均氣溫。
-
實現
上面已經分析完畢,下面我們就着手實現它。這里需要編寫三塊代碼內容:
-
map 函數、
-
reduce函數
-
一些用來運行作業的代碼。
-
map 函數
下面我們來編寫 Mapper 類,實現 map() 函數,提取氣象站和氣溫數據
public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {
/**
* 解析氣象站數據
*/
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 每行氣象數據
String line = value.toString();
// 每小時氣溫值
int temperature = Integer.parseInt(line.substring(14, 19).trim());
// 過濾無效數據
if (temperature != -9999) {
FileSplit fileSplit = (FileSplit) context.getInputSplit();
// 通過文件名稱提取氣象站id
String weatherStationId = fileSplit.getPath().getName().substring(5, 10);
context.write(new Text(weatherStationId), new IntWritable(temperature));
}
}
這個 Mapper 類是一個泛型類型,它有四個形參類型,分別指定 map 函數的輸入鍵、輸入值、輸出鍵和輸出值的類型。 就本示例來說,輸入鍵是一個長整數偏移量,輸入值是一行文本,輸出鍵是氣象站id,輸出值是氣溫(整數)。Hadoop 本身提供了一套可優化網絡序列化傳輸的基本類型,而不是使用 java 內嵌的類型。這些類型都在 org.apache.hadoop.io 包中。 這里使用 LongWritable 類型(相當於 Java 的 Long 類型)、Text 類型(相當於 Java 中的 String 類型)和 IntWritable 類型(相當於 Java 的 Integer 類型)。
map() 方法的輸入是一個鍵(key)和一個值(value),我們首先將 Text 類型的 value 轉換成 Java 的 String 類型, 之后使用 substring()方法截取我們業務需要的值。map() 方法還提供了 Context 實例用於輸出內容的寫入。 在這種情況下,我們將氣象站id按Text對象進行讀/寫(因為我們把氣象站id當作鍵),將氣溫值封裝在 IntWritale 類型中。只有氣溫數據不缺失,這些數據才會被寫入輸出記錄中。
-
reduce函數
下面我們來編寫 Reducer類,實現reduce函數,統計每個氣象站的平均氣溫。
public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {
/**
* 統計美國各個氣象站的平均氣溫
*/
public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {
IntWritable result = new IntWritable();
int sum = 0;
int count = 0;
// 統計每個氣象站的氣溫值總和
for (IntWritable val : values) {
sum += val.get();
count++;
}
// 求每個氣象站的氣溫平均值
result.set(sum / count);
context.write(key, result);
}
}
同樣,reduce 函數也有四個形式參數類型用於指定輸入和輸出類型。reduce 函數的輸入類型必須匹配 map 函數的輸出類型:即 Text 類型和 IntWritable 類型。 在這種情況下,reduce 函數的輸出類型也必須是 Text 和 IntWritable 類型,分別是氣象站id和平均氣溫。在 map 的輸出結果中,所有相同的氣象站(key)被分配到同一個reduce執行,這個平均氣溫就是針對同一個氣象站(key),通過循環所有氣溫值(values)求和並求平均數所得到的。
-
一些用來運行作業的代碼
/**
* 任務驅動方法
*
* @param arg0
* @throws Exception
*/
@Override
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
// 讀取配置文件
Configuration conf = new Configuration();
Path mypath = new Path(arg0[1]);
FileSystem hdfs = mypath.getFileSystem(conf);
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
// 新建一個任務
Job job = new Job(conf, "temperature");
// 設置主類
job.setJarByClass(Temperature.class);
// 輸入路徑
FileInputFormat.addInputPath(job, new Path(arg0[0]));
// 輸出路徑
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));
// Mapper
job.setMapperClass(TemperatureMapper.class);
// Reducer
job.setReducerClass(TemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交任務
return job.waitForCompletion(true)?0:1;
}
/**
* main 方法
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 數據輸入路徑和輸出路徑
String[] args0 = {
"hdfs://ljc:9000/buaa/weather/",
"hdfs://ljc:9000/buaa/weatherout/"
};
int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);
System.exit(ec);
}
Configuration 類讀取 Hadoop 的配置文件,如 site-core.xml、mapred-site.xml、hdfs-site.xml 等。
Job 對象指定作業執行規范,我們可以用它來控制整個作業的運行。我們在 Hadoop 集群上運行這個作業時,要把代碼打包成一個JAR文件(Hadoop在集群上發布這個文件)。 不必明確指定 JAR 文件的名稱,在 Job 對象的 setJarByClass 方法中傳遞一個類即可,Hadoop 利用這個類來查找包含它的 JAR 文件,進而找到相關的 JAR 文件。
構造 Job 對象之后,需要指定輸入和輸出數據的路徑。
- 調用 FileInputFormat 類的靜態方法 addInputPath() 來定義輸入數據的路徑,這個路徑可以是單個的文件、一個目錄(此時,將目錄下所有文件當作輸入)或符合特定文件模式的一系列文件。由函數名可知,可以多次調用 addInputPath() 來實現多路徑的輸入。
- 調用 FileOutputFormat 類中的靜態方法 setOutputPath() 來指定輸出路徑(只能有一個輸出路徑)。這個方法指定的是 reduce 函數輸出文件的寫入目錄。 在運行作業前該目錄是不應該存在的,否則 Hadoop 會報錯並拒絕運行作業。這種預防措施的目的是防止數據丟失(長時間運行的作業如果結果被意外覆蓋,肯定是件可怕的事情)。
- 通過 setMapperClass() 和 setReducerClass() 指定 map 類型和reduce 類型。
- 通過setOutputKeyClass() 和 setOutputValueClass() 控制 map 和 reduce 函數的輸出類型,正如本例所示,這兩個輸出類型一般都是相同的。如果不同,則通過 setMapOutputKeyClass()和setMapOutputValueClass()來設置 map 函數的輸出類型。
- 輸入的類型通過 InputFormat 類來控制,我們的例子中沒有設置,因為使用的是默認的 TextInputFormat(文本輸入格式)。
- Job 中的 waitForCompletion() 方法提交作業並等待執行完成。該方法中的布爾參數是個詳細標識,所以作業會把進度寫到控制台。 waitForCompletion() 方法返回一個布爾值,表示執行的成(true)敗(false),這個布爾值被轉換成程序的退出代碼 0 或者 1。
-
-
結果
-
其他問題
如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【劉超★ljc】。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。