hive SerDe的簡介
https://www.jianshu.com/p/afee9acba686
問題
數據文件為文本文件,每一行為固定格式,每一列的長度都是定長或是有限制范圍,考慮采用hive提供的RegexSerDe來實現記錄解析,使用后發現hive查詢出的數據中文字段亂碼
解決過程
serialization.encoding=GBK
Hadoop中文件默認utf8編碼,hive序列化操作時,默認按照utf8來解析,所以肯定會亂碼,從網上查了下,解決方案是建表是指定serde的"serialization.encoding"="GBK",然而並沒有解決我的問題
源碼
Hive建表格式為ROW FORMAT,不指定SerDe時,默認用的是org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe,繼承了org.apache.hadoop.hive.serde2.AbstractEncodingAwareSerDe,而該類確實可以通過設置"serialization.encoding"="GBK"來解決hive讀取gbk文件亂碼的問題,代碼如下:
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.hadoop.hive.serde2;
import com.google.common.base.Charsets;
import java.nio.charset.Charset;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractEncodingAwareSerDe extends AbstractSerDe {
private static final Logger LOG = LoggerFactory.getLogger(AbstractEncodingAwareSerDe.class);
protected Charset charset;
public AbstractEncodingAwareSerDe() {
}
/** @deprecated */
@Deprecated
public void initialize(Configuration conf, Properties tbl) throws SerDeException {
this.charset = Charset.forName(tbl.getProperty("serialization.encoding", "UTF-8"));
if (this.charset.equals(Charsets.ISO_8859_1) || this.charset.equals(Charsets.US_ASCII)) {
LOG.warn("The data may not be properly converted to target charset " + this.charset);
}
}
public final Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {
Writable result = this.doSerialize(obj, objInspector);
if (!this.charset.equals(Charsets.UTF_8)) {
result = this.transformFromUTF8(result);
}
return result;
}
protected abstract Writable transformFromUTF8(Writable var1);
protected abstract Writable doSerialize(Object var1, ObjectInspector var2) throws SerDeException;
public final Object deserialize(Writable blob) throws SerDeException {
if (!this.charset.equals(Charsets.UTF_8)) {
blob = this.transformToUTF8(blob);
}
return this.doDeserialize(blob);
}
protected abstract Writable transformToUTF8(Writable var1);
protected abstract Object doDeserialize(Writable var1) throws SerDeException;
}
繼續查看org.apache.hadoop.hive.serde2.RegexSerDe,發現並沒有用到serialization.encoding,難怪設置了也沒有用,源碼就不貼了
解決
解決方法也很簡單,自定義類EncodingAwareRegexSerDe繼承RegexSerDe,實現轉UTF8的功能,代碼如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.RegexSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
@SerDeSpec(
schemaProps = {"columns", "columns.types", "input.regex", "input.regex.case.insensitive","serialization.encoding"}
)
public class EncodingAwareRegexSerDe extends RegexSerDe {
public static final Logger LOG = LoggerFactory.getLogger(EncodingAwareRegexSerDe.class.getName());
protected String charsetName;
public EncodingAwareRegexSerDe(){
super();
}
@Override
public void initialize(Configuration conf, Properties tbl) throws SerDeException {
super.initialize(conf, tbl);
this.charsetName = tbl.getProperty("serialization.encoding", "UTF-8").trim();
}
@Override
public Object deserialize(Writable blob) throws SerDeException {
Text rowText = (Text) blob;
Text utf8Text = transformTextToYTF8(rowText,this.charsetName);
return super.deserialize(utf8Text);
}
private Text transformTextToYTF8(Text text,String encoding){
String value = "";
try{
value = new String(text.getBytes(),0,text.getLength(),encoding);
}catch (UnsupportedEncodingException e){
e.printStackTrace();
}
return new Text(value);
}
}
使用自定義序列化器
將上述自定義的類打成jar包后,即可使用
操作hive shell
hive> add jar /home/dw_hbkal/przhang/hive-custom-serdes-1.0-SNAPSHOT.jar;
CREATE EXTERNAL TABLE IF NOT EXISTS test_tooldb.ind01acoma_tmp(
acq_ins_id_cd STRING,
fwd_settle_at DECIMAL(12, 0),
repl_at DECIMAL(12, 0),
......
card_accptr_nm_addr STRING,
resv5 STRING
)PARTITIONED BY(ins_id_cd STRING, hp_settle_dt STRING)
ROW FORMAT SERDE 'com.unionpay.bigdataTest.hive.serdes.EncodingAwareRegexSerDe'
with serdeproperties (
"input.regex"="(.{11}) (.{11}) (.{6}) (.{10}) (.{19}) (.{12}) (.{12}) (.{12}) (.{4}) (.{6}) (.{4}) (.{8}) (.{15}) (.{12}) (.{2}) (.{6}) (.{11}) (.{6}) (.{2}) (.{3}) (.{12}) (.{12}) (.{12}) (.{1}) (.{3}) (.{1}) (.{1}) (.{10}) (.{11}) (.{1}) (.{2}) (.{2}) (.{12}) (.{1})(.{2})(.{1})(.{1})(.{2})(.{1})(.{1})(.{2})(.{1})(.{2}) (.{11}) (.{11}) (.{1}) (.{1}) (.{4}) (.{2}) (.{1,40}) (.{3}) (.{9}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{11}) (.{9}) (.{9}) (.{9}) (.{9}) (.{19}) (.{2}) (.{40}) (.{4}) (.{1}) (.{2}) (.{10}) (.{6}) (.{1}) (.{12}) (.{193})",
"serialization.encoding"="GBK"
)
STORED AS TEXTFILE
LOCATION '/user/dw_hbkal/db/test_tooldb/ind01acoma_tmp';
load data local inpath '/home/dw_hbkal/przhang/IND18071032ACOMA' overwrite into table test_tooldb.ind01acoma_tmp partition(ins_id_cd='01055800',hp_settle_dt='20180710');