題目:現有一文本文件,要將其中的數據進行清洗,以及存入hive數據庫,在進行相關的數據統計。
這是要求我們使用mapReuce進行數據清洗,以及進行數據的統計。作為一名mapreduce的初學者,對於mapreduce的原理還不是很清楚。這是我使用Java進行數據清洗,在進行數據庫的錄入。
上代碼:
Java數據清洗代碼:
package Data; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.FileReader; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.io.Writer; public class Data { public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException { FileReader read = new FileReader("result.txt"); BufferedReader br = new BufferedReader(read); Writer writer = null; File outFile = new File("result2.txt"); writer = new OutputStreamWriter(new FileOutputStream(outFile),"utf-8"); BufferedWriter bw = new BufferedWriter(writer); String row; String[] data=new String[6]; int hang=1; try { while((row = br.readLine())!=null){ data=change(row); data=chage(data); for(int i=0;i<data.length;i++) { System.out.print(data[i]+"\t"); } System.out.println(); row=data[0]+","+data[1]+","+data[2]+","+data[3]+","+data[4]+","+data[5]; bw.write(row + "\r\n"); //i++; } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private static String[] chage(String[] data) { /* * for(int i=0;i<data.length;i++) { data[] } */ data[0]=data[0]; char[] str=data[1].toCharArray(); String[] time=new String[7]; int j=0; int k=0; for(int i=0;i<str.length;i++) { if(str[i]=='/'||str[i]==':'||str[i]==32) { time[k]=data[1].substring(j,i); j=i+1; k++; } } time[k]=data[1].substring(j, data[1].length()); switch(time[1]) { case "Jan":time[1]="01";break; case "Feb":time[1]="02";break; case "Mar":time[1]="03";break; case "Apr":time[1]="04";break; case "May":time[1]="05";break; case "Jun":time[1]="06";break; case "Jul":time[1]="07";break; case "Aug":time[1]="08";break; case "Sep":time[1]="09";break; case "Oct":time[1]="10";break; case "Nov":time[1]="11";break; case "Dec":time[1]="12";break; } data[1]=time[2]+"-"+time[1]+"-"+time[0]+" "+time[3]+":"+time[4]+":"+time[5]; data[3]=data[3].substring(0, data[3].length()-1); return data; } private static String [] change(String row) { char [] str1=row.toCharArray(); String [] data =new String [6]; int j=0; int k=0; for(int i=0;i<str1.length;i++) { if(str1[i]==',') { data[k]=row.substring(j, i); j=i+1; k++; } } data[k]=row.substring(j, str1.length); return data; } }
上傳到數據庫代碼:
package Hive; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.log4j.Logger; public class Data { private static String driverName = "org.apache.hive.jdbc.HiveDriver"; private static String url = "jdbc:hive2://192.168.43.18:10000/text"; private static String user = "hive"; private static String password = "hive"; private static String sql; //private static ResultSet res; private static final Logger log = Logger.getLogger(Text.class); public static void main(String[] args) { try { Class.forName(driverName); Connection conn = DriverManager.getConnection(url, user, password); Statement stmt = conn.createStatement(); sql = "load data local inpath '/home/hadoop/下載/result2.txt' overwrite into table data";//顯示全部表 System.out.println("Running:" + sql); boolean f=stmt.execute(sql); System.out.println("顯示結果:" + sql); System.out.println("result:" + f); conn.close(); conn = null; } catch (ClassNotFoundException e) { e.printStackTrace(); log.error(driverName + " not found!", e); System.exit(1); } catch (SQLException e) { e.printStackTrace(); log.error("Connection error!", e); System.exit(1); } } }
截圖:

目前遇到的問題:
一:對於BufferedWriter,在數據的最后階段會有一部分的數據錄入不進去,這作為一個問題之后進行相關的探索。
二:對於Map Reduce還是不是很熟悉,以及不知道相關的原理。
