一、結構化數據直接上傳
如果我們拿到要上傳的數據是結構化的,那么就不需要在對數據做處理, 直接從本地上傳到HDFS上即可。
代碼層面也比較簡單:
public class UploadFileToHDFS { public static void main(String[] args) throws Exception { Configuration cfg = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.145.200:9000"),cfg); // 自己修改登錄hdfs的用戶 也可用本地系統用戶 去hdfs上授權 System.setProperty("HADOOP_USER_NAME","root"); /* 文件上傳 */ // 獲得要上傳文件的本地路徑 Path src = new Path("f:/mydata/logs/Log_20200101.log");
// 上傳的路徑 Path dst = new Path("/tmp");
// 上傳命令 fs.copyFromLocalFile(src,dst);
// 釋放資源 fs.close(); } }
二、半結構化 / 非結構化數據轉化后再上傳
有時我們拿到要上傳的數據不一定是結構化的,可能是半結構化(JSON等)或者非結構化,這樣即便上傳也沒什么意義。因此,我們先用Java語言,將其結構化了再上傳。
例如:
我們拿到的數據:
我們上傳后的數據;
package com.zyp.myhdfs.services; import com.alibaba.fastjson.JSON; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /* * * todo:多線程傳遞多個文件到HDFS上 * * */ public class UploadFileToHDFS2 { /* * 傳進來路徑 path 和文件名 filename, 將所有文件全部上傳 * * */ public void writeFileToHDFS(String path,String filename) { FileSystem fs = null; FileReader fis = null; // 輸入流 讀取本地文件 BufferedReader bis = null; // 對輸入流整行整行讀入 借用BufferedReader FSDataOutputStream fos = null; // 輸出流 往HDFS上輸出數據 try { fs = FileSystem.get(new URI("hdfs://192.168.145.200:9000"),new Configuration()); fis = new FileReader(path+"/"+filename); bis = new BufferedReader(fis); // 自己修改登錄hdfs的用戶 也可用本地系統用戶 去hdfs上授權即可 System.setProperty("HADOOP_USER_NAME","root"); /* 用上面創建好的對象,往上寫數據 */ // 先去創建一個文件,然后往里面寫 fos = fs.create(new Path("/logs/"+filename)); String line =""; while ((line = bis.readLine()) !=null ){ Info info = JSON.parseObject(line, Info.class); // System.out.println(info.getGoodid()+","+info.getMachine().getMemory()); // 利用Sting模板 String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n", //設定好格式,通過映射去數據源中獲取對應的數據數據信息 info.getMachine().getCpuType(), info.getMachine().getMemory(), info.getMachine().getCpuSeed(), info.getActTime(), info.getActType(), info.getGoodId(), info.getPage(), info.getUseId(), info.getBrowse().getBrowseType(), info.getBrowse().getBrowseVersion()); fos.write(ctx.getBytes()); } fos.flush(); } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } finally { try { fos.close(); bis.close(); fis.close(); // fs.close(); 這里關閉通道會影響其他傳輸 } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { ExecutorService es = Executors.newFixedThreadPool(30); // 固定線程池 final UploadFileToHDFS2 ufh = new UploadFileToHDFS2(); final String filePath = "f:/mydata/logs"; // 循環獲取所有的子文件 File file = new File(filePath); String[] fs = file.list(); for (String fileName:fs){ es.execute(new Runnable() { @Override public void run() { ufh.writeFileToHDFS(filePath,fileName); } }); } es.shutdown(); } }
三、分組整合文件后在上傳至一個文件里
通過對上面在HDFS上的執行效果觀察發現,本地一個文件可能很小,遠遠不夠一個塊大小(128M),但是他也單獨占據一個block塊。這樣就造成了很大的資源浪費,這里考慮,將文件整合后再上傳,盡可能的節省資源。
例如:
將一年內的日志信息,按月進行分類,HDFS上,每個月一個文件夾,然后往里面添加對應該月的數據信息
import com.alibaba.fastjson.JSON; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Collectors; /* * path: 源文件路徑 * newFileName: hdfs上文件和文件夾的名字 * files: [log_20200101.log,log_20200102.log] * todo: 考慮到每個文件占據一個block塊,太浪費空間 這里將文件整合在一起存到HDFS上 * 按月份 將每個月的文件都寫到一個文件夾下 * */ public class MyMergeFile { public void batchWriteToHDFS(String path, String hdfsFileName, List<String> files) { FSDataOutputStream fos = null; try { FileSystem fs = FileSystem.get(new URI("hdfs://192.168.145.200:9000"), new Configuration()); // 分割hdfs文件 獲得文件夾名字 String folderName = hdfsFileName.split("_")[1]; // 自己修改登錄hdfs的用戶 也可用本地系統用戶 去hdfs上授權即可 System.setProperty("HADOOP_USER_NAME", "root"); // 在hdfs下先創建文件夾 fs.mkdirs(new Path("/logs" + folderName)); // 在文件夾下創建一個文件 fos = fs.create(new Path("/logs" + folderName + "/" + hdfsFileName)); // 循環讀取文件 並向hdfs文件中寫入數據 for (String localFile : files) { BufferedReader br = null; try { br = new BufferedReader(new FileReader(path + "/" + localFile)); String line = ""; while ((line = br.readLine()) != null) { try { Info info = JSON.parseObject(line, Info.class); // System.out.println(info.getGoodid()+","+info.getMachine().getMemory()); // 利用Sting模板 String ctx = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s\n", info.getMachine().getCpuType(), info.getMachine().getMemory(), info.getMachine().getCpuSeed(), info.getActTime(), info.getActType(), info.getGoodId(), info.getPage(), info.getUseId(), info.getBrowse().getBrowseType(), info.getBrowse().getBrowseVersion()); fos.write(ctx.getBytes()); } catch (Exception e) { continue; // 防止源文件里JSON 格式錯誤 ,繼續往下執行 } } fos.flush(); } catch (IOException e) { e.printStackTrace(); } finally { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } } } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } finally { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } } /* * 將一年內的所有文件整合 分組 上傳到hdfs * * */ public static void main(String[] args) { /* * 正常的文件分組 * */ // File file = new File("f:/mydata/logs"); //// 獲取所有文件 // String[] files = file.list(); //// 按照月份分組 log_202001,log_202002 // Map<String, List<String>> map = new HashMap<>(); // for (String fn : files) { // String keyName = fn.substring(0,10); // 截取出來所有文件的月份部分 // if (map.containsKey(keyName)){ // 判斷該月份是否存在 // map.get(keyName).add(fn); // 存在該月份 就直接將文件存到該 key的list中 // }else { // List<String> lst = new ArrayList<>(); // 不存在 就創建一個list 存放文件名 // lst.add(fn); // map.put(keyName,lst); // } // } // System.out.println(map); /* * 使用java工具 jdk必須是1.8以上 * */ File file = new File("f:/mydata/logs"); // 獲取所有文件 String[] files = file.list(); List<String> lst = Arrays.asList(files); // java 流式編程 Map<String, List<String>> mp = lst.stream().collect( Collectors.groupingBy(line -> line.substring(0, 10)) ); // System.out.println(mp); // 遍歷hashmap ExecutorService es = Executors.newFixedThreadPool(12); final MyMergeFile mmf = new MyMergeFile(); for (String key : mp.keySet()) { final String keyName = key; final List<String> fs = mp.get(key); es.execute(new Runnable() { @Override public void run() { mmf.batchWriteToHDFS("f:/mydata/logs",keyName,fs); } }); } es.shutdown(); } }