MR案例:MR和Hive中使用Lzo壓縮


在MapReduce中使用lzo壓縮

 1).首先將數據文件在本地使用lzop命令壓縮。具體配置過詳見配置hadoop集群的lzo壓縮

//壓縮lzop,解壓縮lzop -d
[root@ncst word]# lzop words.txt 
[root@ncst word]# ls
words.txt  words.txt.lzo

 2).將lzo文件上傳到hdfs

[root@ncst word]# hadoop fs -put words.txt.lzo /test/in/words/
[root@ncst word]# hadoop fs -ls /test/in/words
Found 1 items
-rw-r--r--   1 root supergroup        115 2015-08-28 21:13 /test/in/words/words.txt.lzo

 3).給Lzo文件建立索引Index(兩種方式)

//單機版
[root@ncst word]# hadoop jar \
> /usr/local/hadoop/hadoop-2.2.0/share/hadoop/common/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar \ > com.hadoop.compression.lzo.LzoIndexer \ > /test/in/words
//集群版本
[root@ncst word]# hadoop jar \
> /usr/local/hadoop/hadoop-2.2.0/share/hadoop/common/lib/hadoop-lzo-0.4.20-SNAPSHOT.jar \
> com.hadoop.compression.lzo.DistributedLzoIndexer \
> /test/in/words
//索引文件以.index結尾
[root@ncst word]# hadoop fs -ls /test/in/words
Found 2 items
-rw-r--r--   1 root supergroup        115 2015-08-28 21:13 /test/in/words/words.txt.lzo
-rw-r--r--   1 root supergroup          8 2015-08-28 21:28 /test/in/words/words.txt.lzo.index

 4).編寫MapReduce程序(需要添加的額外包hadoop-lzo-0.4.13.jar)

package test0820;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.VLongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.hadoop.compression.lzo.LzopCodec;
import com.hadoop.mapreduce.LzoTextInputFormat;

public class WordCount0826 {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

//GenericOptionsParser類的getRemainingArgs()方法作用是從命令行獲取配置信息
String[] otherArgs
= new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf); job.setJarByClass(WordCount0826.class); job.setMapperClass(IIMapper.class); job.setReducerClass(IIReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(VLongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(VLongWritable.class);

//配置輸入類型為Lzo job.setInputFormatClass(LzoTextInputFormat.class);
//配置reduce結果壓縮以及壓縮格式 FileOutputFormat.setCompressOutput(job, true); FileOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
FileInputFormat.addInputPath(job,
new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0:1); } //map public static class IIMapper extends Mapper<LongWritable, Text, Text, VLongWritable>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String[] splited = value.toString().split(" "); for(String word : splited){ context.write(new Text(word),new VLongWritable(1L)); } } } //reduce public static class IIReducer extends Reducer<Text, VLongWritable, Text, VLongWritable>{ @Override protected void reduce(Text key, Iterable<VLongWritable> v2s, Context context) throws IOException, InterruptedException { long sum=0; for(VLongWritable vl : v2s){ sum += vl.get(); } context.write(key, new VLongWritable(sum)); } } }

 5).運行hadoop jar

[root@ncst test]# hadoop jar myjar.jar /test/in/words/ /test/out/0828/02

  如若未在程序中配置輸入和輸出都為Lzo格式,可以在命令行通過 -D 開頭的參數進行配置

hadoop jar myjar.jar \
 -D mapred.reduce.tasks=2 \
 -D mapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat \
 -D mapred.output.compress=true \
 -D mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec \
 /test/in/words /test/out/0828/001

 6).查看結果文件

[root@ncst test]# hadoop fs -ls /test/out/0828/001
Found 3 items
-rw-r--r--   1 root supergroup          0 2015-08-28 21:35 /test/out/0828/001/_SUCCESS
-rw-r--r--   1 root supergroup         61 2015-08-28 21:35 /test/out/0828/001/part-r-00000.lzo
-rw-r--r--   1 root supergroup         87 2015-08-28 21:35 /test/out/0828/001/part-r-00001.lzo

 7).查看結果Lzo文件的內容(兩種方式)

//以fs -get方式下載下來,再解壓
[root@ncst test]# hadoop fs -get /test/out/0828/001 ~/out
[root@ncst test]# cd ~/out

//lzop -d *.lzo 是解壓lzo文件的命令
[root@ncst out]# lzop
-d part-* [root@ncst out]# ls part-r-00000 part-r-00000.lzo part-r-00001 part-r-00001.lzo _SUCCESS
//利用fs -text 結合Linux的重定向 > 命令
[root@ncst test]# hadoop fs -text /test/out/0828/001/part-* > out.txt

//查看out.txt結果
[root@ncst test]# more out.txt hello
3 man 2 women 2 word 1 world 2

總結:

  • lzo文件需要建立索引才能支持分塊(split)。如果沒有索引,lzo文件也是可以處理的,MapReduce會根據后綴名 “.lzo” 來對lzo文件解壓,並且InputFormat也不需要特別指定,但是不支持分塊,整個lzo文件只用一個map來處理。
  • 對於輸入文件為添加了索引的Lzo壓縮文件,如若不在代碼中指定 job.setInputFormatClass(LzoTextInputFormat.class); 則Mapreduce程序將會把索引文件.index也當作是數據文件!LzoTextInputFormat類需要引入相應的(hadoop-lzo-0.4.13.jar)包,如果你是使用Maven管理依賴,則需要在pom.xml文件中添加以下屬性。
<dependency> 
    <groupId>com.hadoop.gplcompression</groupId> 
    <artifactId>hadoop-lzo</artifactId> 
    <version>0.4.19</version> 
</dependency>

在Streaming程序中使用lzo壓縮

把InputFormat設置為DeprecatedLzoTextInputFormat,還要設置參數 stream.map.input.ignoreKey=true。這樣map的key值(key值是行在文件中的偏移量,value值是每行的文本)就不會傳入reduce程序中,這個key值我們是不需要的。

hadoop jar 
/opt/mapr/hadoop/hadoop-0.20.2/contrib/streaming/hadoop-0.20.2-dev-streaming.jar 
-file /home/pyshell/map.py 
-file /home/pyshell/red.py  
-mapper /home/pyshell/map.py 
-reducer /home/pyshell/red.py 
-input /aojianlog/20120304/gold/gold_38_3.csv.lzo 
-output /aojianresult/gold38 
-inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat    //沒有此選項,map作業也不會分片
-jobconf mapred.output.compress=true    //指定reduce輸出壓縮
-jobconf mapred.output.compression.codec=com.hadoop.compression.lzo.LzopCodec    //沒有此選項,reduce作業輸出文件的格式為.lzo_deflate 

在hive中使用lzo壓縮

hadoop集群啟用了Lzo壓縮,就需要在Hive建表的時候指定壓縮時所使用的編解碼器,否則Hive無法正確讀取數據。

 1).Gzip和Bzip2由於是hadoop默認支持的,所以無需指定特殊的編解碼器,只要指定Text類型即可。

create table lzo( 
id int, 
name string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS TEXTFILE;

 2).LZO是外掛的第三方庫,所以要指定輸入和輸出的編解碼器。

create table lzo( 
id int, 
name string) 
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' 
STORED AS INPUTFORMAT 'com.hadoop.mapred.DeprecatedLzoTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'; 

 3).對於Hive表的數據文件,用lzop在本地壓縮好了,直接上傳到hdfs上就可以了。

[root@ncst test]# lzop -v lzo 
compressing lzo into lzo.lzo

hive
> load data local inpath '/root/test/lzo.lzo' > overwrite into table lzo;
hive
> select * from lzo; OK Tie Coat Hat Scarf Time taken: 0.218 seconds, Fetched: 4 row(s)

 4).對於已經存在的表修改為Lzo 

  alter table后對已經load進表中的數據,需要重新load和創建索引,要不還是不能分塊。

ALTER TABLE things
    SET FILEFORMAT
        INPUTFORMAT "com.hadoop.mapred.DeprecatedLzoTextInputFormat"
        OUTPUTFORMAT "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat";

 5).在做數據清洗的時候,假如源日志是lzo壓縮的,輸出的時候也希望使用lzo壓縮。則在數據清洗的腳本中對hadoop的jobconf做一個指定。這樣就可以做到,輸入是lzo,輸出也可以lzo。或者輸入是text,輸出是lzo。

-inputformat com.hadoop.mapred.DeprecatedLzoTextInputFormat 
-jobconf mapreduce.output.compress=true 
-jobconf mapreduce.output.compression.codec=com.hadoop.compression.lzo.LzopCodec

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM