前面我們搭建了Hadoop及HBase本地開發環境,(參見前文:Win7 64位系統上Hadoop單機模式的安裝及開發環境搭建,Win7 64位系統上HBase單機模式的安裝)多數情況下,對於MapReduce模型,其底層使用HDFS作為存儲,在HDFS中,其NameNode節點保存的是文件的元數據,DataNode保存真正的文件數據,如果是大型數據文件,這沒有任何問題,但如果對於大量的小文件,其不足也是很明顯的,NameNode中因為大量的小文件的元數據,占用大量的內存空間,將使得HDFS性能受到很大影響,一個可行的方案是,將大量小文件保存到HBase中,本文及后續文章討論的是以HBase作為存儲,如何將小文件或大文件導入到HBase中去,每個小文件將保存在獨立的cell中,而對於大文件,將先上傳到HDFS中,然后再使用MapReduce方法,將文件內容讀入到HBase中。
場景描述:
對於小文件,我這里假設不足10M,這樣我們就不需要對文件split並保存到不同的cell中,在HBase中,每一個行與列的交匯處,稱為一個cell,其默認上限是10M,當然這個是可以通過配置文件調整的,調整的配置項是 “hbase.client.keyvalue.maxsize”,其默認值是10485760。對於文件源,可以是本地的文件,本測試用例中使用的是本地的email文件,大小才15k左右,我們將創建一個本地Java工程,讀取本地文件后,再通過API保存到HBase中。另外一個可能的場景是,將本地程序變換為一個RESTful API,外部系統遠程調用這個RESTful API,將數據存到HBase中,通過這個API,可以串起2個獨立的系統。
項目步驟:
1:在IDEA中創建一個Java Maven工程

2: 修改pom.xml,引入hbase 1.2.6,因為要使用API操作HBase
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ImportFile</groupId> <artifactId>ImportFile</artifactId> <version>1.0-SNAPSHOT</version> <repositories> <repository> <id>apache</id> <url>http://maven.apache.org</url> </repository> </repositories> <dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <createDependencyReducedPom>false</createDependencyReducedPom> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
3:添加處理HBase的類HbaseUtility,分別包含表的創建,添加,刪除,查詢數據的方法
package examples; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import java.io.IOException; public class HBaseUtility { static String TABLE_NAME = "email"; static String[] COLUMN_FAMILY = {"cf1"}; static String[] COLUMNS = {"message"}; public static void HBaseOperation(String fileContent) { //*******************************************************// Connection conn = null; Admin admin = null; try { Configuration conf = HBaseConfiguration.create(); conn = ConnectionFactory.createConnection(conf); admin = conn.getAdmin(); TableName tableName = TableName.valueOf(TABLE_NAME); CreateTable(admin,tableName); DeleteData(conn,tableName); PutData(conn, tableName, fileContent); GetData(conn, tableName); } catch (IOException e) { e.printStackTrace(); } finally { try { if (admin != null) { admin.close(); } if (conn != null) { conn.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
在上面類中添加HBase的操作函數:
//創建hbase表 private static void CreateTable(Admin admin, TableName tableName) { try { if (admin.tableExists(tableName)) { System.out.println(tableName + " table already exists!"); } else { HTableDescriptor tableDesc = new HTableDescriptor(tableName); for (String column : COLUMN_FAMILY) { tableDesc.addFamily(new HColumnDescriptor(column)); } admin.createTable(tableDesc); System.out.println(tableName + " is created successfully!"); } } catch (IOException e) { e.printStackTrace(); } } //保存數據 private static void PutData(Connection conn, TableName tableName, String rowKey, String fileContent) { System.out.println("PUT value.............................................."); try { Table table = conn.getTable(tableName); Put put = new Put (rowKey.getBytes()); put.addColumn(COLUMN_FAMILY[0].getBytes(), COLUMNS[0].getBytes(), fileContent.getBytes()); table.put(put); table.close(); } catch (IOException e) { e.printStackTrace(); } } //查詢數據 private static void GetData(Connection conn, TableName tableName, String rowKey) { System.out.println("GET value.............................................."); try { Table table = conn.getTable(tableName); Get get = new Get(rowKey.getBytes()); //get.addFamily(COLUMN_FAMILY_NAME.getBytes()); //get.addColumn(COLUMN_FAMILY_NAME.getBytes(),COLUMNS[0].getBytes()); Result result = table.get(get); // get column family result.getFamilyMap(COLUMN_FAMILY[0].getBytes()).forEach((k,v) -> System.out.println(new String(k) + ":" + new String(v))); table.close(); } catch (IOException e) { e.printStackTrace(); } } //刪除數據 private static void DeleteData(Connection conn, TableName tableName) { System.out.println("DELETE value.............................................."); String rowKey = "row1"; try { Table table = conn.getTable(tableName); Delete delete = new Delete(rowKey.getBytes()); delete.addColumn(COLUMN_FAMILY[0].getBytes(),COLUMNS[0].getBytes()); table.delete(delete); table.close(); } catch (IOException e) { e.printStackTrace(); } }
4:添加主程序入口,讀取本地文件,並調用HBaseUtility方法
package examples; import java.io.File; import java.io.FileReader; import java.io.FileNotFoundException; import java.io.IOException; public class ImportFile { public static void main (String[] args) { File file = new File("D:\\MyEmail\\Test email successful.eml"); String fileContent = ReadFile(file); HBaseUtility.HBaseOperation(fileContent); } private static String ReadFile(File file) { FileReader fr = null; String fileContent = null; try { StringBuffer sb = new StringBuffer(); fr = new FileReader(file); // 定義字符數組 char[] buf = new char[1024];// 緩沖區大小 int len = 0;// 長度讀取的字符個數 while ((len = fr.read(buf)) != -1) { System.out.println(new String(buf, 0, len)); sb.append(buf,0,len); } fileContent = new String(sb); System.out.println(fileContent); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (fr != null) { try { fr.close(); } catch (IOException e) { e.printStackTrace(); } } } return fileContent; } }
5:測試
啟動Hadoop,Hbase
cd D:\Application\hadoop-2.7.4\sbin start-all.cmd cd D:\Application\hbase-1.2.6\bin start-hbase.cmd hbase shell
將可以看到以下的命令行窗口

現在可以在IDEA中設置斷點,點擊調試按鈕開始調試了:

