通過HA方式操作HDFS


之前操作hdfs的時候,都是固定namenode的地址,然后去操作。這個時候就必須判斷namenode的狀態為active還是standby,比較繁瑣,如果集群使用了HA的形式,就很方便了

直接上代碼,看注釋:

package com.ideal.template.openbigdata.util;

import java.io.IOException;
import java.net.URI;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;

import java.util.LinkedList;
import java.util.List;

//import org.anarres.lzo.LzoAlgorithm;
//import org.anarres.lzo.LzoDecompressor;
//import org.anarres.lzo.LzoInputStream;
//import org.anarres.lzo.LzoLibrary;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

public class HadoopUse
{
    private static Log log = LogFactory.getLog(HadoopUse.class);
    
    /**
	 * 設置hdfs配置信息
	 * @return
	 */
	private static Configuration getConf()
	{
		Configuration conf = new Configuration();
		
		//設置配置相關的信息,分別對應hdfs-site.xml core-site.xml
		conf.set("fs.defaultFS", "hdfs://dragoncluster");
		conf.set("dfs.nameservices", "dragoncluster");
		conf.set("dfs.ha.namenodes.dragoncluster", "nn1,nn2");
		conf.set("dfs.namenode.rpc-address.dragoncluster.nn1", "n01.dragon.com:8020");
		conf.set("dfs.namenode.rpc-address.dragoncluster.nn2", "n02.dragon.com:8020");
		conf.set("dfs.client.failover.proxy.provider.dragoncluster", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
		
		//設置實現類,因為會出現類覆蓋的問題
		conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
		conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
		return conf;
	}
	
	/**
	 * 設置kerberos認證
	 * @param conf
	 * @throws Exception
	 */
	private static void kerberosLogin(Configuration conf) throws Exception
	{
		conf.set("hadoop.security.authentication", "kerberos");
		UserGroupInformation.setConfiguration(conf);
		UserGroupInformation.loginUserFromKeytab("openbigdata@DRAGON.COM", "/etc/security/keytabs/openbigdata.keytab");
	}
	
	public static long getSize(String uri, String user)
	{
		Path path = new Path(URI.create(uri));

		Configuration conf = new Configuration();
		try
		{
			FileSystem fs = FileSystem.get(URI.create(uri), conf, user);
			return fs.getContentSummary(path).getLength() / 1024 / 1024; // 單位為MB
		}
		catch (Exception ex)
		{
			log.error("HadoopUse.getSize" + ex.getMessage(), ex);
			return 0;
		}
	}

	/**
	 * 在hdfs上創建文件,並寫入內容
	 * 
	 * @param uri
	 * @param content
	 * @param user
	 * @return
	 */
	public static boolean createHdfsFile(String uri, String user, String fullName, String content)
	{
		if (fullName == null || fullName.length() == 0)
		{// 本地路徑不正確
			return false;
		}
		if (content == null || content.length() == 0)
		{// hdfs路徑不正確
			return false;
		}

		try
		{
			Configuration conf = new Configuration();

			FileSystem fs = FileSystem.get(URI.create(uri), conf, user);
			FSDataOutputStream os = null;

			if (fs.exists(new Path(fullName)) == true)
			{// 如果該路徑存在
				// os = fs.append(new Path(fullName));
				fs.delete(new Path(fullName), true);
			}
			os = fs.create(new Path(fullName));
			os.write(content.getBytes());
			os.close();
			fs.close();
			return true;
		}
		catch (Exception ex)
		{
			log.error("HadoopUse.createHdfsFile" + ex.getMessage(), ex);
			return false;
		}
	}
	
	/**
	 * 刪除hdfs上的文件
	 * @param uri
	 * @param user
	 * @param fullName
	 * @return
	 */
	public static boolean deleteHdfsFile(String uri, String user, String fullName)
	{
		if (fullName == null || fullName.length() == 0)
		{// 本地路徑不正確
			log.error("HadoopUse.deleteHdfsFile文件名不合法");
			return false;
		}

		try
		{
			Configuration conf = new Configuration();

			FileSystem fs = FileSystem.get(URI.create(uri), conf, user);
			//FSDataOutputStream os = null;

			if (fs.exists(new Path(fullName)) == true)
			{// 如果該路徑存在
				// os = fs.append(new Path(fullName));
				fs.delete(new Path(fullName), true);
			}
			return true;
		}
		catch (Exception ex)
		{
			log.error("HadoopUse.createHdfsFile" + ex.getMessage(), ex);
		}
		return false;
	}
	
	/**
	 * 根據resultset將值寫入到hdfs上
	 * @param uri
	 * @param user
	 * @param fullName
	 * @param resultSet
	 * @param terminated
	 * @return
	 * @throws InterruptedException 
	 * @throws IOException 
	 * @throws SQLException 
	 */
    public void createHdfsFile(String fullName, ResultSet resultSet, String terminated, FlagUtil flag)
        throws IOException, InterruptedException, SQLException, Exception
    {
        if (resultSet == null)
        { // 如果查詢出來的游標為空,直接退出
            return;
        }
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        
        FileSystem fs = null;
        FSDataOutputStream out = null;
        Configuration conf = getConf();
        kerberosLogin(conf);
        
        fs = FileSystem.get(conf);
        if (fs.exists(new Path(fullName)) == true)
        {// 如果該路徑存在
            fs.delete(new Path(fullName), true);
        }
        
        // 獲取文件句柄
        out = fs.create(new Path(fullName));
        
        // 寫入文件內容
        ResultSetMetaData rsmd = resultSet.getMetaData();
        int rowCnt = rsmd.getColumnCount();
        int count = 0;
        while (resultSet.next())
        {
        	count++;
        	if(count  >= 1000)
        	{//每1000條記錄檢查一次需要終止任務
        		if(flag.getTeminalStatus() == true)
        		{
        			break;
        		}
        		count = 0;
        	}
        	
            for (int i = 1; i <= rowCnt; i++)
            {
                if (resultSet.getObject(i) == null)
                {// 如果是空的數據
                    out.write("".getBytes("utf-8"));
                }
                else
                {
                	String item = null;
                	if("DATE".equals(rsmd.getColumnTypeName(i).toUpperCase()))
                	{//如果是日期類型
                		Timestamp date = resultSet.getTimestamp(i);
                		item = sdf.format(date);
                	}
                	else
                	{
                		item = String.valueOf(resultSet.getObject(i));
                	}
					if (item != null)
					{
						out.write(item.getBytes("utf-8"));
					}
					else
					{
						out.write("".getBytes("utf-8"));
					}
                }
                if (i < rowCnt)
                {// 如果寫完一列,則插入分隔符
                    out.write(terminated.getBytes("utf-8"));
                }
            }
            // 切換到下一行
            out.write("\r\n".getBytes("utf-8"));
        }
        log.info("fullName:" + fullName + "寫入成功");
        
        if (out != null)
        {
            out.flush();
            out.close();
        }
        if (fs != null)
        {
            fs.close();
        }
    }
    
    /**
	 * 查詢路徑
	 * @param path
	 * @return
	 * @throws Exception
	 */
	public static List<String> listDir(String path) throws Exception
	{
		Configuration conf = getConf();
		kerberosLogin(conf);
		FileSystem fs = FileSystem.get(conf);
		
		Path hdfs = new Path(path);
		List<String> pathList = null;
		FileStatus files[] = fs.listStatus(hdfs);
		if(files!=null && files.length >0)
		{
			pathList = new LinkedList<String>();
			for (FileStatus file : files)
			{
				pathList.add(file.getPath().toString());
			}
		}
		return pathList;
	}

	public static void main(String[] args) throws Exception
	{
		List<String> pathList = listDir(args[0]);
		for(String path: pathList)
		{
			System.out.println(path);
		}
	}
}

 注意,這用到了HA,以及kerberos認證,


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM