前言
前面一篇介紹了Java怎么去查看數據塊的相關信息和怎么去查看文件系統。我們只要知道怎么去查看就行了!接下來我分享的是Hadoop的I/O操作。
在Hadoop中為什么要去使用壓縮(Compression)呢?接下來我們就知道了。
一、壓縮(Compression)概述
1.1、壓縮的好處
減少儲存文件所需要的磁盤空間,並加速數據在網絡和磁盤上的傳輸。這兩個在大數據處理大齡數據時相當重要!
1.2、壓縮格式總結
Hadoop對前面三種有默認集成,有就是說Hadoop支持DEFLATE、Gzip、bzip2三種壓縮格式。而后面三種Hadoop沒有支持,要用的話要自己去官網
下載相應的源碼去編譯加入到Hadoop才能用。
注意:
1)這里我要說的是“是否分割”,當我們一個文件去壓縮即使有非常好的壓縮算法,但是它的大小還是超過了一個數據塊的大小,這時就涉及到分割了。
所以說在以后的壓縮我們大多數情況下會使用bzip2。
2)Gzip和bzip2比較時,bzip2的壓縮率(壓縮之后的大小除以源文件的大小)要小,所以說bzip2的壓縮效果好。而這里就會壓縮和解壓縮的時候浪費更多的時間。
就是我們常說的“用時間換取空間”。
二、編解碼器(Codec)概述
codec實現了一種壓縮-加壓縮算法(意思就是codec使用相關的算法對數據進行編解碼)。在Hadoop中,一個對CompressionCodec接口的實現代表一個codec。
對於不同的壓縮算法有不同的編解碼器
我們要對一個文件進行壓縮需要編碼器,對一個壓縮文件進行解壓需要解碼器。那我們怎么樣去獲取編解碼器呢?
有兩種方式:
一是:根據擴展名讓程序自己去選擇相應的編解碼器。比如說:我在本地有一個文件是 user.txt我們通過-Dinput=user.txt去上傳這個文件到集群,
在集群中我們把它指定到-Doutput=/user.txt.gz.。這是我們程序的相關的類會根據你的擴展名(這里是.gz)獲取相應的壓縮編解碼器。
在Hadoop中有一個CompressionCodecFactory會根據擴展名獲取相應的編解碼器對象 。
二是:我們自己去指定編解碼器。為什么要去指定呢?比如說,我在本地有一個文件是user.txt.gz,其實這個壓縮文件是使用的是bzip2的壓縮算法壓縮的。
(因為我自己去更改了它的擴展名),所以這時候就要自己去指定編解碼器。
三、Java編程實現文件的壓縮與解壓縮
3.1、原理分析
在我們把本地的文件上傳的集群的時候,到底是哪里需要壓縮,哪里需要解壓縮,在哪里壓縮?這都是需要明白,下面畫一張圖給大家理解:
3.2、相關類和方法
在Hadoop中關於壓縮和解壓縮的包、接口和類:
1)CompressionCodec接口中
2)CompressionCodecFactory類
第一個是:根據文件的文件名后綴找到相應的壓縮編解碼器
第二個是:為編解碼器的標准類名找到相關的壓縮編解碼器。
第三個是:為編解碼器的標准類名或通過編解碼器別名找到相關的壓縮編解碼器。
3.3、Java將本地文件壓縮上傳到集群當中
1)核心代碼
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class WriteDemo_0010 extends Configured implements Tool{ @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); String input=conf.get("input"); String output=conf.get("output"); LocalFileSystem lfs= FileSystem.getLocal(conf); FileSystem rfs= FileSystem.get( URI.create(output),conf); FSDataInputStream is= lfs.open(new Path(input)); FSDataOutputStream os= rfs.create(new Path(output)); CompressionCodecFactory ccf= new CompressionCodecFactory(conf);
//把路徑傳進去,根據指定的后綴名獲取編解碼器 CompressionCodec codec= ccf.getCodec(new Path(output)); CompressionOutputStream cos= codec.createOutputStream(os); System.out.println( codec.getClass().getName()); IOUtils.copyBytes(is,cos,1024,true); //close return 0; } public static void main(String[] args) throws Exception{ System.exit( ToolRunner.run( new WriteDemo_0010(),args)); } }
2)測試
將IEDA中打好的jar包上傳到Linux中(安裝了HDFS集群的客戶端的服務器中)執行:
結果:
我們可以從前面的那種表中可以看的出來,獲取到了相應的編解碼器。
再次測試:
結果:
3.4、Java將集群文件解壓縮到本地
1)核心代碼
import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class ReadDemo_0010 extends Configured implements Tool{ @Override public int run(String[] args) throws Exception{ Configuration conf=getConf(); String input=conf.get("input"); String output=conf.get("output"); FileSystem rfs= FileSystem.get( URI.create(input),conf); LocalFileSystem lfs= FileSystem.getLocal(conf); FSDataInputStream is= rfs.open(new Path(input)); FSDataOutputStream os= lfs.create(new Path(output)); CompressionCodecFactory factory= new CompressionCodecFactory(conf); CompressionCodec codec= factory.getCodec(new Path(input)); CompressionInputStream cis= codec.createInputStream(is); IOUtils.copyBytes(cis,os,1024,true); return 0; } public static void main(String[] args) throws Exception{ System.exit( ToolRunner.run( new ReadDemo_0010(),args)); } }
2)測試
結果:
查看結果:
喜歡就點個“推薦”哦!