題記:
近期在做某個大型銀行的大數據項目,當在處理非結構化數據時,卻發現他們給的數據並不符合hive和pig的處理要求,數據每行必須需要多個分割符才能完美處理,一下午也沒有想到完美的辦法解決,今天重新審視了一下整個過程。看來hive的命令行沒法搞定了。於是乎,只能通過代碼來搞定。
1、重新實現hive的InputFormat了,別急放碼過來
package hiveStream;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
public class MyHiveInputFormat extends TextInputFormat implements JobConfigurable{
@Override
public RecordReader<LongWritable, Text> getRecordReader(
InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new MyRecordReader((FileSplit) genericSplit, job);
}
}
2、仔細看看下面的方法,不解釋,自己領悟。
package hiveStream;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;
public class MyRecordReader implements RecordReader<LongWritable, Text>{
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader lineReader;
int maxLineLength;
public MyRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength) {
this.maxLineLength = maxLineLength;
this.start = offset;
this.lineReader = new LineReader(in);
this.pos = offset;
this.end = endOffset;
}
public MyRecordReader(InputStream in, long offset, long endOffset,
Configuration job) throws IOException {
this.maxLineLength = job.getInt(
"mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
this.lineReader = new LineReader(in, job);
this.start = offset;
this.end = endOffset;
}
// 構造方法
public MyRecordReader(FileSplit inputSplit, Configuration job)
throws IOException {
maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
Integer.MAX_VALUE);
start = inputSplit.getStart();
end = start + inputSplit.getLength();
final Path file = inputSplit.getPath();
// 創建壓縮器
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodec codec = compressionCodecs.getCodec(file);
// 打開文件系統
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(file);
boolean skipFirstLine = false;
if (codec != null) {
lineReader = new LineReader(codec.createInputStream(fileIn), job);
end = Long.MAX_VALUE;
} else {
if (start != 0) {
skipFirstLine = true;
--start;
fileIn.seek(start);
}
lineReader = new LineReader(fileIn, job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0,
(int) Math.min((long) Integer.MAX_VALUE, end - start));
}
this.pos = start;
}
@Override
public void close() throws IOException {
if (lineReader != null)
lineReader.close();
}
@Override
public LongWritable createKey() {
return new LongWritable();
}
@Override
public Text createValue() {
return new Text();
}
@Override
public long getPos() throws IOException {
return pos;
}
@Override
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (pos - start) / (float) (end - start));
}
}
@Override
public boolean next(LongWritable key, Text value) throws IOException {
while (pos < end) {
key.set(pos);
int newSize = lineReader.readLine(value, maxLineLength,
Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
maxLineLength));
// 把字符串中的"##"轉變為"#"
String strReplace = value.toString().replaceAll("\\s+", "\001");
Text txtReplace = new Text();
txtReplace.set(strReplace);
value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
if (newSize == 0)
return false;
pos += newSize;
if (newSize < maxLineLength)
return true;
}
return false;
}
}
3、處理實例:如下
數據處理要求:
12 afd fewf fewfe we
76 vee ppt wfew wefw
83 tyutr ppt wfew wefw
45 vbe ppt wfew wefw
565 wee ppt wfew wefw
12 sde ppt wfew wefw
注意:字段之間的空格不一致
1、建表:
create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat 'hiveStream.MyHiveInputFormat' outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
注意:輸出咱可沒有重寫哦
2、加載數據:
LOAD DATA LOCAL INPATH'/mnt/test' OVERWRITE INTO TABLE micmiu_blog;
3、看看的成果:
select * from micmiu_blog;
自己去試試,不解釋
