spark读取压缩文件,对同一个压缩文件内文件进行分布式处理,粒度:文件级
-| .rar.gz
-| .gz
-| .zip
-| .zip
-| .gz
-| .zip
使用 sc.binaryFile()得到-> JavaPairRDD<String,PortableDataStream>
key是压缩文件根目录,PortableDataStream是根目录的二进制流。
并行化处理:将每个压缩文件根据内部文件拆分成文件流,实现1:n的并行度

1 // 一个压缩包流,对应多个流,每个流对应一个文件名称 2 public static JavaPairRDD<PortableDataStream, FilePropertyPojo> getFileListRdd( 3 JavaPairRDD<String, PortableDataStream> zipRdd) { 4 return zipRdd.flatMapToPair(tuple2 -> { 5 List<Tuple2<PortableDataStream, FilePropertyPojo>> targetList = new ArrayList<>(); 6 List<FilePropertyPojo> fileNameList = getFileNameList(tuple2._2); 7 8 for (FilePropertyPojo filePropertyPojo : fileNameList) { 9 10 targetList.add(new Tuple2<>(tuple2._2, filePropertyPojo)); 11 } 12 return targetList.iterator(); 13 14 }); 15 16 } 17 18 private static List<FilePropertyPojo> getFileNameList(PortableDataStream portableDataStream) { 19 List<FilePropertyPojo> fileNameList = new ArrayList<>(); 20 try { 21 List<FilePropertyPojo> mrPropertyPojoList = new ArrayList<>(); 22 String path = portableDataStream.getPath(); 23 24 String fileCompressMode = path.substring(path.lastIndexOf('.')).toLowerCase(); 25 switch (fileCompressMode) { 26 case ".gz": 27 getFileNameFromGz(portableDataStream, mrPropertyPojoList); 28 break; 29 case ".zip": 30 getFileNameFromZip(portableDataStream, mrPropertyPojoList); 31 break; 32 33 default: 34 } 35 return mrPropertyPojoList; 36 37 } catch (Exception e) { 38 // 39 } 40 return fileNameList; 41 } 42 43 private static void getFileNameFromGz(PortableDataStream portableDataStream, 44 List<FilePropertyPojo> mrPropertyPojoList) { 45 try (TarArchiveInputStream inputStream = new TarArchiveInputStream( 46 new GZIPInputStream(portableDataStream.open()))) { 47 TarArchiveEntry tarArchiveEntry; 48 while ((tarArchiveEntry = inputStream.getNextTarEntry()) != null) { 49 try { 50 getEachFileName(mrPropertyPojoList, tarArchiveEntry.getName(), tarArchiveEntry.getSize()); 51 52 } catch (Exception e) { 53 // 54 } 55 } 56 } catch (Exception e) { 57 // 58 } 59 } 60 61 private static void getFileNameFromZip(PortableDataStream portableDataStream, 62 List<FilePropertyPojo> mrPropertyPojoList) throws IOException { 63 64 try (ZipArchiveInputStream zipArchiveInputStream = new ZipArchiveInputStream(portableDataStream.open())) { 65 ZipArchiveEntry nextZipEntry; 66 while ((nextZipEntry = zipArchiveInputStream.getNextZipEntry()) != null) { 67 try { 68 getEachFileName(mrPropertyPojoList, nextZipEntry.getName(), nextZipEntry.getSize()); 69 70 } catch (Exception e) { 71 // 72 } 73 } 74 } 75 }