將文件以API方式導入到HBase(小文件處理)


前面我們搭建了Hadoop及HBase本地開發環境,(參見前文:Win7 64位系統上Hadoop單機模式的安裝及開發環境搭建Win7 64位系統上HBase單機模式的安裝)多數情況下,對於MapReduce模型,其底層使用HDFS作為存儲,在HDFS中,其NameNode節點保存的是文件的元數據,DataNode保存真正的文件數據,如果是大型數據文件,這沒有任何問題,但如果對於大量的小文件,其不足也是很明顯的,NameNode中因為大量的小文件的元數據,占用大量的內存空間,將使得HDFS性能受到很大影響,一個可行的方案是,將大量小文件保存到HBase中,本文及后續文章討論的是以HBase作為存儲,如何將小文件或大文件導入到HBase中去,每個小文件將保存在獨立的cell中,而對於大文件,將先上傳到HDFS中,然后再使用MapReduce方法,將文件內容讀入到HBase中。

 

場景描述

對於小文件,我這里假設不足10M,這樣我們就不需要對文件split並保存到不同的cell中,在HBase中,每一個行與列的交匯處,稱為一個cell,其默認上限是10M,當然這個是可以通過配置文件調整的,調整的配置項是 “hbase.client.keyvalue.maxsize”,其默認值是10485760。對於文件源,可以是本地的文件,本測試用例中使用的是本地的email文件,大小才15k左右,我們將創建一個本地Java工程,讀取本地文件后,再通過API保存到HBase中。另外一個可能的場景是,將本地程序變換為一個RESTful API,外部系統遠程調用這個RESTful API,將數據存到HBase中,通過這個API,可以串起2個獨立的系統。

 

項目步驟:

1:在IDEA中創建一個Java Maven工程

2: 修改pom.xml,引入hbase 1.2.6,因為要使用API操作HBase

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>ImportFile</groupId>
    <artifactId>ImportFile</artifactId>
    <version>1.0-SNAPSHOT</version>

    <repositories>
        <repository>
            <id>apache</id>
            <url>http://maven.apache.org</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.6</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>

                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

        </plugins>
    </build>
</project>

3:添加處理HBase的類HbaseUtility,分別包含表的創建,添加,刪除,查詢數據的方法

package examples;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.io.IOException;


public class HBaseUtility {
    static String TABLE_NAME = "email";
    static String[] COLUMN_FAMILY = {"cf1"};
    static String[] COLUMNS = {"message"};

    public static void HBaseOperation(String fileContent) {
        //*******************************************************//
        Connection conn = null;
        Admin admin = null;
        try {
            Configuration conf = HBaseConfiguration.create();
            conn = ConnectionFactory.createConnection(conf);
            admin = conn.getAdmin();

            TableName tableName = TableName.valueOf(TABLE_NAME);
            CreateTable(admin,tableName);
            DeleteData(conn,tableName);
            PutData(conn, tableName, fileContent);
            GetData(conn, tableName);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
        finally
        {
            try {
                if (admin != null) {
                    admin.close();
                }

                if (conn != null) {
                    conn.close();
                }
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }
}

在上面類中添加HBase的操作函數:

    //創建hbase表
    private static void CreateTable(Admin admin, TableName tableName)
    {
        try {
            if (admin.tableExists(tableName)) {
                System.out.println(tableName + " table already exists!");
            } else {
                HTableDescriptor tableDesc = new HTableDescriptor(tableName);
                for (String column : COLUMN_FAMILY) {
                    tableDesc.addFamily(new HColumnDescriptor(column));
                }
                admin.createTable(tableDesc);
                System.out.println(tableName + " is created successfully!");
            }
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }

    }

    //保存數據
    private static void PutData(Connection conn, TableName tableName, String rowKey, String fileContent) {
        System.out.println("PUT value..............................................");
        try
        {
            Table table = conn.getTable(tableName);
            Put put = new Put (rowKey.getBytes());
            put.addColumn(COLUMN_FAMILY[0].getBytes(), COLUMNS[0].getBytes(), fileContent.getBytes());
            table.put(put);
            table.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

    //查詢數據
    private static void GetData(Connection conn, TableName tableName, String rowKey) {
        System.out.println("GET value..............................................");
        try
        {
            Table table = conn.getTable(tableName);
            Get get = new Get(rowKey.getBytes());
            //get.addFamily(COLUMN_FAMILY_NAME.getBytes());
            //get.addColumn(COLUMN_FAMILY_NAME.getBytes(),COLUMNS[0].getBytes());
            Result result = table.get(get);
            // get column family
            result.getFamilyMap(COLUMN_FAMILY[0].getBytes()).forEach((k,v) -> System.out.println(new String(k) + ":" + new String(v)));
            table.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

    //刪除數據
    private static void DeleteData(Connection conn, TableName tableName)
    {
        System.out.println("DELETE value..............................................");
        String rowKey = "row1";
        try
        {
            Table table = conn.getTable(tableName);
            Delete delete = new Delete(rowKey.getBytes());
            delete.addColumn(COLUMN_FAMILY[0].getBytes(),COLUMNS[0].getBytes());
            table.delete(delete);
            table.close();
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }

 

4:添加主程序入口,讀取本地文件,並調用HBaseUtility方法

package examples;

import java.io.File;
import java.io.FileReader;
import java.io.FileNotFoundException;
import java.io.IOException;

public class ImportFile {
    public static void main (String[] args)
    {
        File file = new File("D:\\MyEmail\\Test email successful.eml");
        String fileContent = ReadFile(file);
        HBaseUtility.HBaseOperation(fileContent);
    }

    private static String ReadFile(File file) {
        FileReader fr = null;
        String fileContent = null;
        try {
            StringBuffer sb = new StringBuffer();
            fr = new FileReader(file);
            // 定義字符數組
            char[] buf = new char[1024];// 緩沖區大小
            int len = 0;// 長度讀取的字符個數
            while ((len = fr.read(buf)) != -1) {
                System.out.println(new String(buf, 0, len));
                sb.append(buf,0,len);
            }
            fileContent = new String(sb);
            System.out.println(fileContent);

        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (fr != null) {
                try {
                    fr.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        return fileContent;
    }
}

5:測試

啟動Hadoop,Hbase

cd D:\Application\hadoop-2.7.4\sbin
start-all.cmd

cd D:\Application\hbase-1.2.6\bin
start-hbase.cmd

hbase shell

將可以看到以下的命令行窗口

 

現在可以在IDEA中設置斷點,點擊調試按鈕開始調試了:

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM