使用 C# 調用 Hadoop HttpFS


使用 C# 調用 Hadoop HttpFS

HttpFS 是 Hadoop 的 RESTful Web APi,Java可以直接調用 Hadoop 的 API,其它語言則要通過 WebHDFS 調用,貌似 Azure 有對應的 API?我也不懂,就自己寫了

開啟 HttpFS 步驟

參考鏈接:https://juejin.cn/post/7015007338830495780

怎么用這玩意兒可以去看看官方文檔,但是寫的真的太爛了,沒案例,沒講解,網上基本也是用 Java 直接調用的 Hadoop API ,我只能用 Postman 慢慢試,只寫了幾個常用的,完整代碼在下面
(都整分布式了,確實應該直接用 Java ,但是寫 C# 實在是太爽了)

完整 C# 代碼

IConfiguration 讀取 appsettings.json
List a DirectoryIteratively List a DirectoryList a File,這三個我沒有寫,因為暫時用不上,而且這幾個也挺簡單的,如果用上了會補上代碼

public class HadoopHelper
{
    private static IConfiguration _configuration = new ConfigurationBuilder().AddJsonFile("appsettings.json").Build();

    /// <summary>
    /// 查詢 path 目錄是否包含該文件
    /// </summary>
    /// <param name="fileName">需要查找的文件名,帶后綴名</param>
    /// <param name="path">文件所在的目錄,前后不要 “/”</param>
    /// <returns>true 表示文件已存在,false 表示文件不存在</returns>
    public async static Task<bool> IsExistenceAsync(string fileName, string path)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(fileName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=GETFILESTATUS");

        string requestUrl = stringBuilder.ToString();

        try
        {
            HttpClient httpClient = new HttpClient();
            var response = await httpClient.GetAsync(requestUrl);

            //解析響應字符串
            //string responseBody = await response.Content.ReadAsStringAsync();

            //RemoteException 表示文件不存在
            //var message = JObject.Parse(responseBody)["RemoteException"];

            //if (null == message)
            //{
            //    return true;
            //}

            if (response.StatusCode == HttpStatusCode.NotFound)
            {
                return false;
            }

            return true;
        }
        catch
        {
            return true;
        }
    }

    /// <summary>
    /// 在指定 path 目錄下創建文件夾
    /// </summary>
    /// <param name="folderName">文件夾名稱</param>
    /// <param name="path">文件夾所在目錄,前后不要 “/”</param>
    /// <returns>true 表示創建成功,false 表示創建失敗,由於重復創建也是 true,所以返回 false 也不確定會發生什么錯誤</returns>
    public async static Task<bool> CreateFolderAsync(string folderName, string path)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(folderName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=MKDIRS");

        string requestUrl = stringBuilder.ToString();

        HttpClient httpClient = new HttpClient();
        HttpResponseMessage httpResponseMessage = await httpClient.PutAsync(requestUrl, null);
        if (httpResponseMessage.StatusCode != HttpStatusCode.OK)
        {
            return false;
        }

        string responseBody = await httpResponseMessage.Content.ReadAsStringAsync();

        string value = JObject.Parse(responseBody)["boolean"].ToString();
        return Boolean.Parse(value);
    }

    /// <summary>
    /// 重命名指定文件或文件夾
    /// </summary>
    /// <param name="oldFileOrFolderName">舊的文件或文件夾名稱,需要后綴名</param>
    /// <param name="newFileOrFolderName">新的文件或文件夾名稱,需要后綴名</param>
    /// <param name="path">指定文件或文件夾所在目錄,前后不要 “/”</param>
    /// <returns>true 表示重命名成功,false 表示重命名失敗,可能是文件或文件夾不存在,或者重命名前后文件同名</returns>
    public async static Task<bool> RenameFileOrFolderAsync(string oldFileOrFolderName, string newFileOrFolderName, string path)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(oldFileOrFolderName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=RENAME&destination=/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(newFileOrFolderName);

        string requestUrl = stringBuilder.ToString();

        HttpClient httpClient = new HttpClient();
        HttpResponseMessage httpResponseMessage = await httpClient.PutAsync(requestUrl, null);
        if (httpResponseMessage.StatusCode != HttpStatusCode.OK)
        {
            return false;
        }

        string responseBody = await httpResponseMessage.Content.ReadAsStringAsync();

        string value = JObject.Parse(responseBody)["boolean"].ToString();
        return Boolean.Parse(value);
    }

    /// <summary>
    /// 刪除指定文件或文件夾
    /// </summary>
    /// <param name="fileOrFolderName">指定文件或文件夾的名稱,需要后綴名</param>
    /// <param name="path">指定文件所在目錄</param>
    /// <returns>true 表示刪除成功,false 表示刪除失敗,可能是文件或文件夾不存在</returns>
    public async static Task<bool> DeleteFileOrFolderAsync(string fileOrFolderName, string path)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(fileOrFolderName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=DELETE");

        string requestUrl = stringBuilder.ToString();

        HttpClient httpClient = new HttpClient();
        HttpResponseMessage httpResponseMessage = await httpClient.DeleteAsync(requestUrl);
        if (httpResponseMessage.StatusCode != HttpStatusCode.OK)
        {
            return false;
        }

        string responseBody = await httpResponseMessage.Content.ReadAsStringAsync();

        string value = JObject.Parse(responseBody)["boolean"].ToString();
        return Boolean.Parse(value);
    }

    /// <summary>
    /// 讀取指定 path 目錄下的文件,僅支持小文件
    /// </summary>
    /// <param name="fileName">指定文件名稱,需要后綴名</param>
    /// <param name="path">指定文件所在的目錄,前后不要 “/”</param>
    /// <returns>返回文件 byte[] 數組</returns>
    public async static Task<byte[]> OpenAndReadFileAsync(string fileName, string path)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(fileName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=OPEN");

        string requestUrl = stringBuilder.ToString();
        HttpClient httpClient = new HttpClient();
        HttpResponseMessage httpResponseMessage = await httpClient.GetAsync(requestUrl);

        return await httpClient.GetByteArrayAsync(requestUrl);
    }

    /// <summary>
    /// 創建並寫入一個指定的 Json 文件,以覆蓋方式寫入
    /// </summary>
    /// <param name="fileName">指定文件名稱,需要后綴名</param>
    /// <param name="path">指定文件所在的目錄,前后不要 “/”</param>
    /// <param name="message">Json 字符串</param>
    /// <returns>true 表示創建寫入成功,false 表示創建寫入失敗</returns>
    public async static Task<bool> CreateAndWriteJsonFileAsync(string fileName, string path, string message)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(fileName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=CREATE");

        string requestUrl = stringBuilder.ToString();

        StringContent stringContent = new StringContent(message);
        //上傳文件一定要改標頭
        stringContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");

        try
        {
            HttpClient httpClient = new HttpClient();
            //上傳數據
            HttpResponseMessage httpResponseMessage = await httpClient.PutAsync(requestUrl, stringContent);

            if (httpResponseMessage.StatusCode == HttpStatusCode.Created)
            {
                //返回 201 表示創建成功
                return true;
            }

            return false;
        }
        catch (Exception e)
        {
            return false;
        }
    }

    /// <summary>
    /// 以追加方式將 Json 數據寫入 Json 文件
    /// </summary>
    /// <param name="fileName">指定文件名稱,需要后綴名</param>
    /// <param name="path">指定文件所在的目錄,前后不要 “/”</param>
    /// <param name="message">追加的內容</param>
    /// <returns>true 表示追加成功,false 表示追加失敗</returns>
    public async static Task<bool> AppendWriteJsonFileAsync(string fileName, string path, string message)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(fileName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=APPEND");

        string requestUrl = stringBuilder.ToString();

        StringContent stringContent = new StringContent(message);
        stringContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");

        try
        {
            HttpClient httpClient = new HttpClient();
            //上傳數據
            HttpResponseMessage httpResponseMessage = await httpClient.PostAsync(requestUrl, stringContent);

            if (httpResponseMessage.StatusCode == HttpStatusCode.Created)
            {
                //返回 200 表示追加成功
                return true;
            }

            return false;
        }
        catch (Exception e)
        {
            return false;
        }
    }

    /// <summary>
    /// 創建並寫入一個指定的 PNG 文件,即上傳 PNG 文件,以覆蓋方式寫入,僅支持小文件
    /// </summary>
    /// <param name="fileName">指定文件名稱,需要后綴名</param>
    /// <param name="path">指定文件所在的目錄,前后不要 “/”</param>
    /// <param name="bytes">圖片數據</param>
    /// <returns>true 表示創建寫入成功,false 表示創建寫入失敗</returns>
    public async static Task<bool> CreateAndWritePngFileAsync(string fileName, string path, byte[] bytes)
    {
        StringBuilder stringBuilder = new StringBuilder();

        string hostName = _configuration["Hadoop:HostName"];
        string httpfsPort = _configuration["Hadoop:HttpfsPort"];
        string username = _configuration["Hadoop:Username"];

        stringBuilder.Append("http://");
        stringBuilder.Append(hostName);
        stringBuilder.Append(":");
        stringBuilder.Append(httpfsPort);
        stringBuilder.Append("/webhdfs/v1/");
        stringBuilder.Append(path);
        stringBuilder.Append("/");
        stringBuilder.Append(fileName);
        stringBuilder.Append("?user.name=");
        stringBuilder.Append(username);
        stringBuilder.Append("&op=CREATE");

        string requestUrl = stringBuilder.ToString();

        ByteArrayContent byteArrayContent = new ByteArrayContent(bytes);
        //上傳文件一定要改標頭
        byteArrayContent.Headers.ContentType = new MediaTypeHeaderValue("application/octet-stream");

        try
        {
            HttpClient httpClient = new HttpClient();
            //上傳數據
            HttpResponseMessage httpResponseMessage = await httpClient.PutAsync(requestUrl, byteArrayContent);

            if (httpResponseMessage.StatusCode == HttpStatusCode.Created)
            {
                //返回 201 表示創建成功
                return true;
            }

            return false;
        }
        catch (Exception e)
        {
            return false;
        }
    }
}

WebHDFS REST API

Status of a File/Directory

查看文件/文件夾的狀態,GET請求,返回 404就是文件/文件夾不存在,文件/文件夾存在則返回具體信息的 Json 字符串

http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=GETFILESTATUS
public async static Task<bool> IsExistenceAsync(string fileName, string path)
{
    StringBuilder stringBuilder = new StringBuilder();

    string hostName = _configuration["Hadoop:HostName"];
    string httpfsPort = _configuration["Hadoop:HttpfsPort"];
    string username = _configuration["Hadoop:Username"];

    stringBuilder.Append("http://");
    stringBuilder.Append(hostName);
    stringBuilder.Append(":");
    stringBuilder.Append(httpfsPort);
    stringBuilder.Append("/webhdfs/v1/");
    stringBuilder.Append(path);
    stringBuilder.Append("/");
    stringBuilder.Append(fileName);
    stringBuilder.Append("?user.name=");
    stringBuilder.Append(username);
    stringBuilder.Append("&op=GETFILESTATUS");

    string requestUrl = stringBuilder.ToString();

    try
    {
        HttpClient httpClient = new HttpClient();
        var response = await httpClient.GetAsync(requestUrl);

        //解析響應字符串
        //string responseBody = await response.Content.ReadAsStringAsync();

        //RemoteException 表示文件不存在
        //var message = JObject.Parse(responseBody)["RemoteException"];

        //if (null == message)
        //{
        //    return true;
        //}

        if (response.StatusCode == HttpStatusCode.NotFound)
        {
            return false;
        }

        return true;
    }
    catch
    {
        return true;
    }
}

Make a Directory

創建文件夾,PUT請求,這個返回值是一個 bool 的 Json 字符串

http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=MKDIRS
                    [&permission=<OCTAL>]
public async static Task<bool> CreateFolderAsync(string folderName, string path)
{
    StringBuilder stringBuilder = new StringBuilder();

    string hostName = _configuration["Hadoop:HostName"];
    string httpfsPort = _configuration["Hadoop:HttpfsPort"];
    string username = _configuration["Hadoop:Username"];

    stringBuilder.Append("http://");
    stringBuilder.Append(hostName);
    stringBuilder.Append(":");
    stringBuilder.Append(httpfsPort);
    stringBuilder.Append("/webhdfs/v1/");
    stringBuilder.Append(path);
    stringBuilder.Append("/");
    stringBuilder.Append(folderName);
    stringBuilder.Append("?user.name=");
    stringBuilder.Append(username);
    stringBuilder.Append("&op=MKDIRS");

    string requestUrl = stringBuilder.ToString();

    HttpClient httpClient = new HttpClient();
    HttpResponseMessage httpResponseMessage = await httpClient.PutAsync(requestUrl, null);
    if (httpResponseMessage.StatusCode != HttpStatusCode.OK)
    {
        return false;
    }

    string responseBody = await httpResponseMessage.Content.ReadAsStringAsync();

    string value = JObject.Parse(responseBody)["boolean"].ToString();
    return Boolean.Parse(value);
}

Rename a File/Directory

重命名文件/文件夾,PUT請求,這個返回值是一個 bool 的 Json 字符串

<HOST>:<PORT>/webhdfs/v1/<PATH>?op=RENAME&destination=<PATH>
public async static Task<bool> RenameFileOrFolderAsync(string oldFileOrFolderName, string newFileOrFolderName, string path)
{
    StringBuilder stringBuilder = new StringBuilder();

    string hostName = _configuration["Hadoop:HostName"];
    string httpfsPort = _configuration["Hadoop:HttpfsPort"];
    string username = _configuration["Hadoop:Username"];

    stringBuilder.Append("http://");
    stringBuilder.Append(hostName);
    stringBuilder.Append(":");
    stringBuilder.Append(httpfsPort);
    stringBuilder.Append("/webhdfs/v1/");
    stringBuilder.Append(path);
    stringBuilder.Append("/");
    stringBuilder.Append(oldFileOrFolderName);
    stringBuilder.Append("?user.name=");
    stringBuilder.Append(username);
    stringBuilder.Append("&op=RENAME&destination=/");
    stringBuilder.Append(path);
    stringBuilder.Append("/");
    stringBuilder.Append(newFileOrFolderName);

    string requestUrl = stringBuilder.ToString();

    HttpClient httpClient = new HttpClient();
    HttpResponseMessage httpResponseMessage = await httpClient.PutAsync(requestUrl, null);
    if (httpResponseMessage.StatusCode != HttpStatusCode.OK)
    {
        return false;
    }

    string responseBody = await httpResponseMessage.Content.ReadAsStringAsync();

    string value = JObject.Parse(responseBody)["boolean"].ToString();
    return Boolean.Parse(value);
}

Delete a File/Directory

刪除文件/文件夾,DELETE請求,這個返回值是一個 bool 的 Json 字符串

http://<host>:<port>/webhdfs/v1/<path>?op=DELETE
                    [&recursive=<true |false>]
public async static Task<bool> DeleteFileOrFolderAsync(string fileOrFolderName, string path)
{
    StringBuilder stringBuilder = new StringBuilder();

    string hostName = _configuration["Hadoop:HostName"];
    string httpfsPort = _configuration["Hadoop:HttpfsPort"];
    string username = _configuration["Hadoop:Username"];

    stringBuilder.Append("http://");
    stringBuilder.Append(hostName);
    stringBuilder.Append(":");
    stringBuilder.Append(httpfsPort);
    stringBuilder.Append("/webhdfs/v1/");
    stringBuilder.Append(path);
    stringBuilder.Append("/");
    stringBuilder.Append(fileOrFolderName);
    stringBuilder.Append("?user.name=");
    stringBuilder.Append(username);
    stringBuilder.Append("&op=DELETE");

    string requestUrl = stringBuilder.ToString();

    HttpClient httpClient = new HttpClient();
    HttpResponseMessage httpResponseMessage = await httpClient.DeleteAsync(requestUrl);
    if (httpResponseMessage.StatusCode != HttpStatusCode.OK)
    {
        return false;
    }

    string responseBody = await httpResponseMessage.Content.ReadAsStringAsync();

    string value = JObject.Parse(responseBody)["boolean"].ToString();
    return Boolean.Parse(value);
}

Open and Read a File

打開並且讀取文件,GET請求,返回值可能是一些文件的信息,但是我不在乎

http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=OPEN
                    [&offset=<LONG>][&length=<LONG>][&buffersize=<INT>][&noredirect=<true|false>]
public async static Task<byte[]> OpenAndReadFileAsync(string fileName, string path)
{
    StringBuilder stringBuilder = new StringBuilder();

    string hostName = _configuration["Hadoop:HostName"];
    string httpfsPort = _configuration["Hadoop:HttpfsPort"];
    string username = _configuration["Hadoop:Username"];

    stringBuilder.Append("http://");
    stringBuilder.Append(hostName);
    stringBuilder.Append(":");
    stringBuilder.Append(httpfsPort);
    stringBuilder.Append("/webhdfs/v1/");
    stringBuilder.Append(path);
    stringBuilder.Append("/");
    stringBuilder.Append(fileName);
    stringBuilder.Append("?user.name=");
    stringBuilder.Append(username);
    stringBuilder.Append("&op=OPEN");

    string requestUrl = stringBuilder.ToString();
    HttpClient httpClient = new HttpClient();
    HttpResponseMessage httpResponseMessage = await httpClient.GetAsync(requestUrl);

    return await httpClient.GetByteArrayAsync(requestUrl);
}

Create and Write to a File 和 Append to a File

創建並寫入文件,PUT請求

http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=CREATE
                  [&overwrite=<true |false>][&blocksize=<LONG>][&replication=<SHORT>]
                  [&permission=<OCTAL>][&buffersize=<INT>][&noredirect=<true|false>]

追加寫入文件,POST請求

http://<HOST>:<PORT>/webhdfs/v1/<PATH>?op=APPEND
                    [&buffersize=<INT>][&noredirect=<true|false>]

這兩個的代碼就比較多樣了,就稍微講講,它們有一個noredirect字段,就是自動重定向的,如果為true就需要發送兩次請求,重定向之后的鏈接多了一個字段data=true,就是發送數據用的;noredirectfalse則只要一次就可以了,因為會自動重定向

使用 C# 調用 Hadoop HttpFS


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM