1. 一般的hdfs讀寫
傳統的對於hdfs的讀寫都是直接設置inputPath 和 outPath ,而且對於數據都是以文件的形式訪問的,不涉及到結構化/半結構化的東東,及時如hive存儲在hdfs的的結構化數據,外部系統訪問也只能自己去了解具體的結構是如何存儲的,然后自己讀文件再訪問,傳統訪問hdfs的方式如下:
使用InputFormat、Split、RecordReader用於讀,具體方式如下:
List<InputSplit> InputFormat.getSplits() ;
RecordReader InputFormat. createRecordReader(InputSplit split) ;
RecordReader. nextKeyValue();
RecordReader. getCurrentValue();
這樣的流程來訪問 。
使用OutputFormat、RecordWriter來寫入hdfs數據,具體如下:
OutputFormat. getRecordWriter();
RecordWriter.write(K key, V value)
對於一般的table結構數據,都是將Key值置成NullWritable,作為一個無用的值來存儲。
2. Hcatalog與hdfs的結合:定義了文件的schema
HCatalog將每份結構化的hdfs數據定義schema和訪問信息(db、table、partition),然后讀和寫的時候使用db、table、partition(對於無partition這個可以為空)這三部分信息來訪問相應的表數據,屏蔽掉表底層InputFormat、OutFormat以及path信息,讀&&寫時候只許關系以下幾個訪問類即可:
HCatInputFormat<WritableComparable, HCatRecord>
HCatRecordReader<WritableComparable, HCatRecord>
HCatOutputFormat<WritableComparable<?>, HCatRecord>
FileRecordWriterContainer<WritableComparable<?>, HCatRecord>
幾個類就行了,讀寫的接口Value均是HCatRecord對象,Key值是WritableComparable即可,不過一般key值都是NullWritable,並無實際用途 。
3. 對象HCatRecord
內部存儲這Object[]數組,分別存儲一行數據中每列的值,為hive基本類型的Java對象(int、String、list、Map等),對於復雜結構可以嵌套,主要訪問接口如下 :
List<Object> getAll();
Object get(int fieldIndex);
Object get(String fieldName, HCatSchema recordSchema)
通過列名稱來訪問具體列的值,但是需要table的schema信息HCatSchema(具體每列的colName以及每個colName對應的fieldIndex)
這點設計不是很好,感覺可以將HCatSchema內置到HCatRecord 中,這樣封裝性可能會更好一些 、
4. HCatalog的元數據存儲
HCatalog存儲元數據和Hive的元數據是一致的,對於元數據的存儲底層也是調用hive底層HiveMetaStoreClient的接口(我們常用的Hive其實是在這個類上封裝了一層),其提供對於元數據庫的各種操作(creat drop alter load insert等等 )
直接調用Hive接口/hive的客戶端 寫入的元數據在HCatalog中可以直接用,同樣HCatalog寫入的元數據以及數據在hive中也是可以直接使用的。
5. HCatInputFormat:讀取HCatalog中數據的接口
讀取的時候需要dbName、tableName、partition(這個可以沒有)、tableProperties(可以沒有)信息,存儲於InputJobInfo中,InputJobInfo是讀取時候的信息存儲封裝,在jobclient端初始化以后,使用Java的序列化Serializable轉換成byte[],然后將byte[]編碼成一個字符串存儲(參見HCatUtil的encodeBytes和decodeBytes方法)到hadoop的配置Configuration來在整個mr任務中傳遞。
InputJobInfo初始化創建的時候只需要dbName、tableName、partition(這個可以沒有)、tableProperties(可以沒有)信息,在InitializeInput.getInputJobInfo方法中去連接元數據庫,補齊InputJobInfo中的HCatTableInfo、partitions信息,對於不是分區的table,構造一個partition信息,將相應數據存儲在里面。
getSplits(JobContext jobContext)方法,從conf配置中反序列化出InputJobInfo信息,然后根據partition中的信息,利用inputFormatClass、location來split成真正的底層數據的split,在封裝成HCatSplit。
HCatSplit結構
最終讀取的時候還是要利用PartInfo中的SerDe來讀取相應的數據,后面介紹
createRecordReader(InputSplit split, TaskAttemptContext taskContext)方法
返回的是封裝了一層的HCatRecordReader對象,然后通過HCatRecordReader. Initialize(split,taskContext)來初始化具體的讀信息baseRecordReader等 。
一切初始化完畢,就可以開始調用經典的nextKeyValue的方法了
baseRecordReader是底層的RecordReader ,讀出來數據后使用相應的SerDe的deserialize方法反序列化成ObjectInspector(一般是StructObjectInspector)可以訪問的數據格式,然后通過相應的colnum ObjectInspector得到每列的具體Java object值,然后封裝到HCatRecord對象中 。
對於hive基本類型轉換成Java的基本類型,對於struct、list轉換成Java的List,map轉換成Java的map。
6. Hcatalog在MR程序中的輸出類HCatOutputFormat
和input對應的,OutputJobInfo是存儲用於輸出數據時候的信息的,其結構如下:
主要是table的相關信息
OutputJobInfo outPutInfo=OutputJobInfo.create("analyse_db","contline_revenue_day_cut", partitions);
也是需要dbName、tblName、partition就可以初始化,通過HCatOutputFormat.setOutput(conf, credentials,outPutInfo)中連接metastore,然后補充其余的schema、Serde、outputFormat等一系列信息 ,目前Hcata中對於hive的自建Index、內置壓縮、分桶和sort的特性還是不支持的,然后同樣在利用序列化這個對象成String,存儲與MR的配置conf中 ,提供給以后的程序共享使用。
getRecordWriter(TaskAttemptContext context)方法
先得到一個封裝后的OutputFormatContainer,這個其實就是內部包裝了一個這個table數據真正的OutputFormat對象,以FileOutputFormatContainer為例,以下是FileOutputFormatContainer. getRecordWriter(TaskAttemptContext context)方法
主要是做了一些MR程序中輸出文件夾初始化的工作,然后就是返回了一個FileRecordWriterContainer對象,該對象和上面的OutputFormat一樣,內部封裝了一個table實際存儲的OutputFormat返回的RecordWriter baseWriter ,最終真正用於向hdfs寫數據的對象,FileRecordWriterContainer如下:
其中初始化了幾個變量
serDe :用於序列化寫到hdfs的數據
objectInspector :用於訪問HCatRecord的 。
這里面還是利用了hive中的這一套Serde和ObjectInspector的封裝來做的 。
7. 單機關於讀/寫的兩個Java demo
write
1 package com.baidu.rigel.hactalog; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Map; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.io.WritableComparable; 9 import org.apache.hadoop.mapreduce.Job; 10 import org.apache.hadoop.mapreduce.OutputCommitter; 11 import org.apache.hadoop.mapreduce.RecordWriter; 12 import org.apache.hadoop.mapreduce.TaskAttemptContext; 13 import org.apache.hadoop.mapreduce.TaskAttemptID; 14 import org.apache.hive.hcatalog.data.DefaultHCatRecord; 15 import org.apache.hive.hcatalog.data.HCatRecord; 16 import org.apache.hive.hcatalog.data.schema.HCatFieldSchema.Type; 17 import org.apache.hive.hcatalog.data.schema.HCatSchema; 18 import org.apache.hive.hcatalog.mapreduce.FileOutputCommitterContainer; 19 import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; 20 import org.apache.hive.hcatalog.mapreduce.OutputJobInfo; 21 22 public class HcataLogWriteTestMain { 23 24 public static void main(String[] args) throws IOException, InterruptedException { 25 Map<String,String> partitions = new HashMap<String, String>(1); 26 partitions.put("pdate", "2011-09-24"); 27 Configuration conf =new Configuration(); 28 Job job = new Job(conf, "GroupByAge"); 29 OutputJobInfo outPutInfo=OutputJobInfo.create("analyse_db","contline_revenue_day_cut", partitions); 30 HCatOutputFormat.setOutput(job, outPutInfo); 31 HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration()); 32 System.out.println("INFO: output schema explicitly set for writing:" 33 + s); 34 //同樣,序列化,然后設置到conf配置中 35 HCatOutputFormat.setSchema(job, s); 36 HCatOutputFormat outFormat= new HCatOutputFormat(); 37 // attempt_20140328173000_500575_m000_000000_0 task_20131314_0758_r_000100 38 job.getConfiguration().set("mapred.task.partition", "1"); 39 TaskAttemptID tmp= new TaskAttemptID("20140328173000",500575,true,0,0); 40 job.getConfiguration().set("mapred.task.id", "attempt_20140328173000_500575_m_000000_0"); 41 TaskAttemptContext taskTmp=new TaskAttemptContext(job.getConfiguration(),tmp); 42 RecordWriter<WritableComparable<?>, HCatRecord> hcataWrite=outFormat.getRecordWriter(taskTmp); 43 hcataWrite.close(null); 44 for(int i=0;i<100;i++){ 45 HCatRecord record = new DefaultHCatRecord(s.getFields().size()); 46 for(int k=0;k<s.getFields().size();k++){ 47 if(s.get(k).getType()==Type.BIGINT) 48 record.set(k, Long.parseLong(i+"")); 49 else if(s.get(k).getType()==Type.STRING) 50 record.set(k, i+"_ddctest"); 51 } 52 hcataWrite.write(null, record); 53 } 54 hcataWrite.close(taskTmp); 55 56 // System.out.println(outPutInfo.getDatabaseName()); 57 // System.out.println("******************************************************"); 58 // System.out.println(outPutInfo.getHarRequested()); 59 // System.out.println("******************************************************"); 60 // System.out.println(outPutInfo.getLocation()); 61 // System.out.println("******************************************************"); 62 // System.out.println(outPutInfo.getMaxDynamicPartitions()); 63 // System.out.println("******************************************************"); 64 // System.out.println(outPutInfo.getTableName()); 65 // System.out.println("******************************************************"); 66 // System.out.println(outPutInfo.getOutputSchema()); 67 // System.out.println("******************************************************"); 68 // System.out.println(outPutInfo.getPartitionValues()); 69 // System.out.println("******************************************************"); 70 // System.out.println(outPutInfo.getProperties()); 71 // System.out.println("******************************************************"); 72 // System.out.println(outPutInfo.getTableInfo()); 73 } 74 75 }
readTest
1 package com.baidu.rigel.hactalog; 2 3 import java.io.IOException; 4 import java.util.List; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.io.WritableComparable; 8 import org.apache.hadoop.mapred.JobConf; 9 import org.apache.hadoop.mapreduce.InputSplit; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.RecordReader; 12 import org.apache.hadoop.mapreduce.TaskAttemptContext; 13 import org.apache.hadoop.mapreduce.TaskAttemptID; 14 import org.apache.hive.hcatalog.common.HCatConstants; 15 import org.apache.hive.hcatalog.data.HCatRecord; 16 import org.apache.hive.hcatalog.data.schema.HCatSchema; 17 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; 18 19 public class HcataLogReadTestMain { 20 21 public static void main(String[] args) throws IOException, InterruptedException { 22 Configuration conf = new Configuration(); 23 JobConf job=new JobConf(conf); 24 // InputJobInfo inputInfo=InputJobInfo.create("analyse_db", "contline_revenue_day","pdate=\"2014-03-09\"",new Properties()); 25 HCatInputFormat hIntput=HCatInputFormat.setInput(job, "analyse_db", "contline_revenue_day"); 26 hIntput.setFilter("pdate=\"2014-03-09\""); 27 Job jobContext=new Job(job); 28 System.out.println(job.get(HCatConstants.HCAT_KEY_JOB_INFO)); 29 hIntput=new HCatInputFormat(); 30 List<InputSplit> splitList=hIntput.getSplits(jobContext); 31 HCatSchema hCatSchema=HCatInputFormat.getTableSchema(job); 32 System.out.println(splitList.size()); 33 TaskAttemptID tmp= new TaskAttemptID("20131314",758,false,100,200); 34 TaskAttemptContext taskTmp=new TaskAttemptContext(job,tmp); 35 RecordReader<WritableComparable, HCatRecord> reader=hIntput.createRecordReader(splitList.get(0), taskTmp); 36 reader.initialize(splitList.get(0), taskTmp); 37 int cout=0 ; 38 while(reader.nextKeyValue()){ 39 HCatRecord row=reader.getCurrentValue(); 40 System.out.println(row.get("click_amt", hCatSchema)+" "+row.get("pdate", hCatSchema)); 41 cout++ ; 42 } 43 System.out.println(cout); 44 } 45 46 }