調研系列第二篇:HCatalog簡介


1.         一般的hdfs讀寫

傳統的對於hdfs的讀寫都是直接設置inputPath  outPath ,而且對於數據都是以文件的形式訪問的,不涉及到結構化/半結構化的東東,及時如hive存儲在hdfs的的結構化數據,外部系統訪問也只能自己去了解具體的結構是如何存儲的,然后自己讀文件再訪問,傳統訪問hdfs的方式如下:

使用InputFormatSplitRecordReader用於讀,具體方式如下:

List<InputSplit> InputFormat.getSplits() ; 

RecordReader  InputFormat. createRecordReader(InputSplit split) ;

RecordReader. nextKeyValue();

RecordReader. getCurrentValue();

這樣的流程來訪問 

使用OutputFormatRecordWriter來寫入hdfs數據,具體如下:

OutputFormat. getRecordWriter();

RecordWriter.write(K key, V value)

對於一般的table結構數據,都是將Key值置成NullWritable,作為一個無用的值來存儲。

2.         Hcataloghdfs的結合:定義了文件的schema

HCatalog將每份結構化的hdfs數據定義schema和訪問信息(dbtablepartition),然后讀和寫的時候使用dbtablepartition(對於無partition這個可以為空)這三部分信息來訪問相應的表數據,屏蔽掉表底層InputFormatOutFormat以及path信息,讀&&寫時候只許關系以下幾個訪問類即可:

HCatInputFormat<WritableComparable, HCatRecord>

HCatRecordReader<WritableComparable, HCatRecord>

 

HCatOutputFormat<WritableComparable<?>, HCatRecord>

FileRecordWriterContainer<WritableComparable<?>, HCatRecord>

幾個類就行了,讀寫的接口Value均是HCatRecord對象,Key值是WritableComparable即可,不過一般key值都是NullWritable,並無實際用途 

3.         對象HCatRecord 

內部存儲這Object[]數組,分別存儲一行數據中每列的值,為hive基本類型的Java對象(intStringlistMap等),對於復雜結構可以嵌套,主要訪問接口如下 

List<Object> getAll();

Object get(int fieldIndex);

Object get(String fieldName, HCatSchema recordSchema)

通過列名稱來訪問具體列的值,但是需要tableschema信息HCatSchema(具體每列的colName以及每個colName對應的fieldIndex

這點設計不是很好,感覺可以將HCatSchema內置到HCatRecord  中,這樣封裝性可能會更好一些 

4.         HCatalog的元數據存儲 

HCatalog存儲元數據和Hive的元數據是一致的,對於元數據的存儲底層也是調用hive底層HiveMetaStoreClient的接口(我們常用的Hive其實是在這個類上封裝了一層),其提供對於元數據庫的各種操作(creat drop alter load insert等等 

直接調用Hive接口/hive的客戶端 寫入的元數據在HCatalog中可以直接用,同樣HCatalog寫入的元數據以及數據在hive中也是可以直接使用的。

5.         HCatInputFormat:讀取HCatalog中數據的接口

讀取的時候需要dbNametableNamepartition(這個可以沒有)tableProperties(可以沒有)信息,存儲於InputJobInfo中,InputJobInfo是讀取時候的信息存儲封裝,在jobclient端初始化以后,使用Java的序列化Serializable轉換成byte[],然后將byte[]編碼成一個字符串存儲(參見HCatUtilencodeBytesdecodeBytes方法)hadoop的配置Configuration來在整個mr任務中傳遞。

InputJobInfo初始化創建的時候只需要dbNametableNamepartition(這個可以沒有)tableProperties(可以沒有)信息,在InitializeInput.getInputJobInfo方法中去連接元數據庫,補齊InputJobInfo中的HCatTableInfopartitions信息,對於不是分區的table,構造一個partition信息,將相應數據存儲在里面。

getSplits(JobContext jobContext)方法,從conf配置中反序列化出InputJobInfo信息,然后根據partition中的信息,利用inputFormatClasslocationsplit成真正的底層數據的split,在封裝成HCatSplit

HCatSplit結構

最終讀取的時候還是要利用PartInfo中的SerDe來讀取相應的數據,后面介紹

 

createRecordReader(InputSplit split, TaskAttemptContext taskContext)方法

返回的是封裝了一層的HCatRecordReader對象,然后通過HCatRecordReader. Initialize(split,taskContext)來初始化具體的讀信息baseRecordReader 

一切初始化完畢,就可以開始調用經典的nextKeyValue的方法了

baseRecordReader是底層的RecordReader ,讀出來數據后使用相應的SerDedeserialize方法反序列化成ObjectInspector(一般是StructObjectInspector)可以訪問的數據格式,然后通過相應的colnum ObjectInspector得到每列的具體Java object值,然后封裝到HCatRecord對象中 

 對於hive基本類型轉換成Java的基本類型,對於structlist轉換成JavaListmap轉換成Javamap

6.         HcatalogMR程序中的輸出類HCatOutputFormat

input對應的,OutputJobInfo是存儲用於輸出數據時候的信息的,其結構如下:

主要是table的相關信息

OutputJobInfo outPutInfo=OutputJobInfo.create("analyse_db","contline_revenue_day_cut", partitions);

      也是需要dbNametblNamepartition就可以初始化,通過HCatOutputFormat.setOutput(conf, credentials,outPutInfo)中連接metastore,然后補充其余的schemaSerdeoutputFormat等一系列信息 ,目前Hcata中對於hive的自建Index、內置壓縮、分桶和sort的特性還是不支持的,然后同樣在利用序列化這個對象成String,存儲與MR的配置conf ,提供給以后的程序共享使用。

getRecordWriter(TaskAttemptContext context)方法

先得到一個封裝后的OutputFormatContainer,這個其實就是內部包裝了一個這個table數據真正的OutputFormat對象,以FileOutputFormatContainer為例,以下是FileOutputFormatContainer. getRecordWriter(TaskAttemptContext context)方法 

主要是做了一些MR程序中輸出文件夾初始化的工作,然后就是返回了一個FileRecordWriterContainer對象,該對象和上面的OutputFormat一樣,內部封裝了一個table實際存儲的OutputFormat返回的RecordWriter baseWriter ,最終真正用於向hdfs寫數據的對象,FileRecordWriterContainer如下:

其中初始化了幾個變量

serDe :用於序列化寫到hdfs的數據

objectInspector :用於訪問HCatRecord 

這里面還是利用了hive中的這一套SerdeObjectInspector的封裝來做的 

7.         單機關於讀/寫的兩個Java demo 

write

 1 package com.baidu.rigel.hactalog;
 2 
 3 import java.io.IOException;
 4 import java.util.HashMap;
 5 import java.util.Map;
 6 
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.io.WritableComparable;
 9 import org.apache.hadoop.mapreduce.Job;
10 import org.apache.hadoop.mapreduce.OutputCommitter;
11 import org.apache.hadoop.mapreduce.RecordWriter;
12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
13 import org.apache.hadoop.mapreduce.TaskAttemptID;
14 import org.apache.hive.hcatalog.data.DefaultHCatRecord;
15 import org.apache.hive.hcatalog.data.HCatRecord;
16 import org.apache.hive.hcatalog.data.schema.HCatFieldSchema.Type;
17 import org.apache.hive.hcatalog.data.schema.HCatSchema;
18 import org.apache.hive.hcatalog.mapreduce.FileOutputCommitterContainer;
19 import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat;
20 import org.apache.hive.hcatalog.mapreduce.OutputJobInfo;
21 
22 public class HcataLogWriteTestMain {
23 
24     public static void main(String[] args) throws IOException, InterruptedException {
25         Map<String,String> partitions = new HashMap<String, String>(1);
26         partitions.put("pdate", "2011-09-24");
27         Configuration conf =new Configuration();
28         Job job = new Job(conf, "GroupByAge");
29         OutputJobInfo outPutInfo=OutputJobInfo.create("analyse_db","contline_revenue_day_cut", partitions);
30         HCatOutputFormat.setOutput(job, outPutInfo);
31         HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration());
32         System.out.println("INFO: output schema explicitly set for writing:"
33                 + s);
34         //同樣,序列化,然后設置到conf配置中  
35         HCatOutputFormat.setSchema(job, s);
36         HCatOutputFormat outFormat= new HCatOutputFormat();
37         // attempt_20140328173000_500575_m000_000000_0     task_20131314_0758_r_000100 
38         job.getConfiguration().set("mapred.task.partition", "1");
39         TaskAttemptID tmp= new TaskAttemptID("20140328173000",500575,true,0,0);
40         job.getConfiguration().set("mapred.task.id", "attempt_20140328173000_500575_m_000000_0");
41         TaskAttemptContext taskTmp=new TaskAttemptContext(job.getConfiguration(),tmp);
42         RecordWriter<WritableComparable<?>, HCatRecord> hcataWrite=outFormat.getRecordWriter(taskTmp);
43         hcataWrite.close(null);
44         for(int i=0;i<100;i++){
45             HCatRecord record = new DefaultHCatRecord(s.getFields().size());
46             for(int k=0;k<s.getFields().size();k++){
47                 if(s.get(k).getType()==Type.BIGINT)
48                     record.set(k, Long.parseLong(i+""));
49                 else if(s.get(k).getType()==Type.STRING)
50                     record.set(k, i+"_ddctest");
51             }
52             hcataWrite.write(null, record);
53         }
54         hcataWrite.close(taskTmp);
55         
56 //        System.out.println(outPutInfo.getDatabaseName());
57 //        System.out.println("******************************************************");
58 //        System.out.println(outPutInfo.getHarRequested());
59 //        System.out.println("******************************************************");
60 //        System.out.println(outPutInfo.getLocation());
61 //        System.out.println("******************************************************");
62 //        System.out.println(outPutInfo.getMaxDynamicPartitions());
63 //        System.out.println("******************************************************");
64 //        System.out.println(outPutInfo.getTableName());
65 //        System.out.println("******************************************************");
66 //        System.out.println(outPutInfo.getOutputSchema());
67 //        System.out.println("******************************************************");
68 //        System.out.println(outPutInfo.getPartitionValues());
69 //        System.out.println("******************************************************");
70 //        System.out.println(outPutInfo.getProperties());
71 //        System.out.println("******************************************************");
72 //        System.out.println(outPutInfo.getTableInfo());
73     }
74 
75 }

readTest

 1 package com.baidu.rigel.hactalog;
 2 
 3 import java.io.IOException;
 4 import java.util.List;
 5 
 6 import org.apache.hadoop.conf.Configuration;
 7 import org.apache.hadoop.io.WritableComparable;
 8 import org.apache.hadoop.mapred.JobConf;
 9 import org.apache.hadoop.mapreduce.InputSplit;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.RecordReader;
12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
13 import org.apache.hadoop.mapreduce.TaskAttemptID;
14 import org.apache.hive.hcatalog.common.HCatConstants;
15 import org.apache.hive.hcatalog.data.HCatRecord;
16 import org.apache.hive.hcatalog.data.schema.HCatSchema;
17 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
18 
19 public class HcataLogReadTestMain {
20 
21     public static void main(String[] args) throws IOException, InterruptedException {
22         Configuration conf = new  Configuration();
23         JobConf job=new JobConf(conf);
24 //        InputJobInfo inputInfo=InputJobInfo.create("analyse_db", "contline_revenue_day","pdate=\"2014-03-09\"",new Properties());
25         HCatInputFormat hIntput=HCatInputFormat.setInput(job, "analyse_db", "contline_revenue_day");
26         hIntput.setFilter("pdate=\"2014-03-09\"");
27         Job jobContext=new Job(job);
28         System.out.println(job.get(HCatConstants.HCAT_KEY_JOB_INFO));
29         hIntput=new HCatInputFormat();
30         List<InputSplit> splitList=hIntput.getSplits(jobContext);
31         HCatSchema hCatSchema=HCatInputFormat.getTableSchema(job);
32         System.out.println(splitList.size());
33         TaskAttemptID tmp= new TaskAttemptID("20131314",758,false,100,200);
34         TaskAttemptContext taskTmp=new TaskAttemptContext(job,tmp);
35         RecordReader<WritableComparable, HCatRecord> reader=hIntput.createRecordReader(splitList.get(0), taskTmp);
36         reader.initialize(splitList.get(0), taskTmp);
37         int cout=0 ;
38         while(reader.nextKeyValue()){
39             HCatRecord row=reader.getCurrentValue();
40             System.out.println(row.get("click_amt", hCatSchema)+"    "+row.get("pdate", hCatSchema));
41             cout++  ;
42         }
43         System.out.println(cout);
44     }
45 
46 }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM