集成kerberos的kudu(1.10.0) 訪問


集成kerberos的kudu 訪問

kudu Api (java)

1. 首先需要進行kerberos的驗證(需要將相應用戶的keytab文件引入本地)

代碼如下:

public class KuduKerberosAuth {

    /**
     * 初始化訪問Kerberos訪問
     * @param debug 是否啟用Kerberos的Debug模式
     */
    public static void initKerberosENV(Boolean debug) {
        try {
            System.setProperty("java.security.krb5.conf","D:\\cdh\\kudu\\src\\main\\kerberos\\krb5.conf");
//            System.setProperty("java.security.krb5.conf","/lvm/data3/zhc/krb5.conf");
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            if (debug){
                System.setProperty("sun.security.krb5.debug", "true");
            }
             UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "D:\\cdh\\kudu\\src\\main\\kerberos\\gree1.keytab");
//            UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "/lvm/data3/zhc/gree1.keytab");
             System.out.println(UserGroupInformation.getCurrentUser());
        } catch(Exception e) {
            e.printStackTrace();
        }
    }
}

2.Maven 依賴

    
  <properties>
        <kudu-version>1.10.0-cdh6.3.0</kudu-version>
  </properties>
<dependencies>
<!--認證依賴-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.0.0-cdh6.3.0</version>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.0.0-cdh6.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-client</artifactId>
                <version>${kudu-version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.25</version>
            </dependency>
</dependencies>

3.引入hadoop 配置文件

文件放到resources 文件夾里面要放到根目錄下面

1.core-site.xml

4. 進行訪問

代碼如下:

獲取kudu客戶端

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.client.KuduClient;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;

public class GetKuduClient {

    private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051");

    public static KuduClient getKuduClient() {
        KuduClient client = null;
        try {
            client = UserGroupInformation.getLoginUser().doAs(
                    new PrivilegedExceptionAction<KuduClient>() {
                        @Override
                        public KuduClient run() throws Exception {
                            return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build();
                        }
                    }
            );

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return client;
    }
}

main函數

import kudutest.KuduApiTest;
import kudutest.client.GetKuduClient;
import org.apache.kudu.client.KuduClient;
import kudujavautil.KuduKerberosAuth;


public class KuduApiMain {

    public static void main(String[] args) {
        /*
        * 通過kerberos 認證
        * */
        KuduKerberosAuth.initKerberosENV(false);
        /*
        * 獲取kudu客戶端
        * */
        KuduClient client= GetKuduClient.getKuduClient();
        /*
        * 查詢表中字段
        * */
        KuduApiTest.getTableData(client,"kudutest","zhckudutest1","id");
        /*
        * 創建一個表名
        * */
//        KuduApiTest.createTableData(client,"zhckudutest1");
        /*
        *列出kudu下的所有表
        * */
//        KuduApiTest.tableListShow(client);
        /*
        * 向指定的kudu表中upsert數據
        * */
//        KuduApiTest.upsertTableData(client,"zhckudutest1",10);
        /*
        * 刪除kudu表
        * */
//        KuduApiTest.dropTableData(client,"zhckudutest");
    }
}

操作kudu表

import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;

import java.util.ArrayList;
import java.util.List;

public class KuduApiTest {

    /**
     * 獲取kudu表里面的數據
     */
    public static void getTableData(KuduClient client, String database, String table, String columns) {
        try {
            KuduTable kudutable = client.openTable( table);
            KuduScanner kuduScanner = client.newScannerBuilder(kudutable).build();
            while (kuduScanner.hasMoreRows()) {
                RowResultIterator rowResultIterator = kuduScanner.nextRows();
                while (rowResultIterator.hasNext()) {
                    RowResult rowResult = rowResultIterator.next();
                    System.out.println(rowResult.getString(columns));
                }
            }
            try {
                client.close();
            } catch (KuduException e) {
                e.printStackTrace();
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 向kudu表里面插入數據
     */

    public static void upsertTableData(KuduClient client, String tableName, int numRows ) {
        try {
            KuduTable kuduTable = client.openTable(tableName);
            KuduSession kuduSession = client.newSession();
            //設置Kudu提交數據方式,這里設置的為手動刷新,默認為自動提交
            kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
            for(int i =0; i < numRows; i++) {
                String userInfo_str = "abcdef,ghigk";
                Insert upsert = kuduTable.newInsert();
                PartialRow row = upsert.getRow();
                String[] userInfo = userInfo_str.split(",");
                if(userInfo.length == 2) {
                    row.addString("id", userInfo[0]);
                    row.addString("name", userInfo[1]);
                }
                kuduSession.apply(upsert);
            }
            kuduSession.flush();
            kuduSession.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

    /**
     * 創建一個kudu 表
     */

    public static void createTableData(KuduClient client, String tableName) {
        List<ColumnSchema> columns = new ArrayList<>();
        //在添加列時可以指定每一列的壓縮格式
        columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).
                compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
        Schema schema = new Schema(columns);
        CreateTableOptions createTableOptions = new CreateTableOptions();
        List<String> hashKeys = new ArrayList<>();
        hashKeys.add("id");
        int numBuckets = 8;
        createTableOptions.addHashPartitions(hashKeys, numBuckets);

        try {
            if (!client.tableExists(tableName)) {
                client.createTable(tableName, schema, createTableOptions);
            }
            System.out.println("成功創建Kudu表:" + tableName);
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }

    /**
     * 列出Kudu下所有的表
     * @param client
     */
    public static void tableListShow(KuduClient client) {
        try {
            ListTablesResponse listTablesResponse = client.getTablesList();
            List<String> tblist = listTablesResponse.getTablesList();
            for(String tableName : tblist) {
                System.out.println(tableName);
            }
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
    /**
     * 刪除表
     * */

    public static void dropTableData(KuduClient client, String tableName) {
        try {
            client.deleteTable(tableName);
        } catch (KuduException e) {
            e.printStackTrace();
        }
    }
}

package kudutest;

import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.client.*;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;

public class KuduClientTest {

    private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051");

    //    private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "sns-cdh-namenode2:7051,sns-cdh-namenode1:7051,sns-cdh-datanode1:7051");
    /**
     * 獲取kudu表里面的數據
     * */
   static void getTableData(){

            System.out.println("-----------------------------------------------");
            System.out.println("Will try to connect to Kudu master(s) at " + KUDU_MASTERS);
            System.out.println("-----------------------------------------------");
    try {
        KuduClient client = UserGroupInformation.getLoginUser().doAs(
                new PrivilegedExceptionAction<KuduClient>() {
                    @Override
                    public KuduClient run() throws Exception {
                        return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build();
                    }
                }
        );
      KuduTable table = client.openTable("impala::kudutest.kudu_table");
//                  KuduTable table = client.openTable("impala::test.test");
      KuduScanner kuduScanner = client.newScannerBuilder(table).build();
      while (kuduScanner.hasMoreRows()) {
          RowResultIterator rowResultIterator = kuduScanner.nextRows();
          while (rowResultIterator.hasNext()) {
              RowResult rowResult = rowResultIterator.next();
              System.out.println(rowResult.getString("name"));
//                        System.out.println(rowResult.getString("t1"));
          }
      }
        try {
            client.close();
        } catch (KuduException e) {
            e.printStackTrace();
        }

    } catch (IOException e) {
        e.printStackTrace();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
}

kudu Impala JDBC (java)

1.kerberos驗證或者LDAP 驗證(需要將ssl證書文件引入本地)

LDAP驗證代碼如下:

public class GetImpalaClient {
    //驅動
    private static String diiver = "com.cloudera.impala.jdbc41.Driver";
    //LDAP 認證
    private static String ldap_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=3;SSL=1;SSLTrustStore=/lvm/data3/zhc/cm-auto-global_truststore.jks";
    private  static String user="gree1";
    private  static String password="000000";
    //kerberos 認證
    private static String kerberos_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=1;KrbRealm=GREE.IO;KrbHostFQDN=cdh-master03;KrbServiceName=impala;SSL=1;SSLTrustStore=D:/cdh/kudu/src/main/ssl/cm-auto-global_truststore.jks";
    
//LADP認證
    public  static  Connection getKuduClientLDAP() throws ClassNotFoundException, SQLException {
        Class.forName(diiver);
        Connection connection= DriverManager.getConnection(ldap_URL,user,password);
        System.out.println("這是LDAP認證");
        return  connection;
    }
}

kerberos驗證代碼如下:

//kerberos認證  
public static Connection getKuduClientKerberos() throws IOException {
        //kerberos 認證
        KuduKerberosAuth.initKerberosENV(false);
        Connection client = null;
        try {
            client = (Connection) UserGroupInformation.getLoginUser().doAs(
                    new PrivilegedExceptionAction<Object>() {
                        @Override
                        public Object run() throws Exception {
                            Class.forName(diiver);
                             return   DriverManager.getConnection(kerberos_URL);
                        }
                    }
            );
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("這是KERBEROS認證");
        return client;
    }

2.maven 依賴

  <dependencies>
           <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.0.0-cdh6.3.0</version>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.0.0-cdh6.3.0</version>
            </dependency>
      
        <dependency>
            <groupId>com.cloudera.impala.jdbc</groupId>
            <artifactId>ImpalaJDBC41</artifactId>
            <version>2.5.43</version>
        </dependency>
      
        <dependency>
            <groupId>com.cloudera.impala.jdbc</groupId>
            <artifactId>hive_metastore</artifactId>
            <version>2.5.43</version>
        </dependency>
      
        <dependency>
            <groupId>com.cloudera.impala.jdbc</groupId>
            <artifactId>hive_service</artifactId>
            <version>2.5.43</version>
        </dependency>
      
        <dependency>
            <groupId>com.cloudera.impala.jdbc</groupId>
            <artifactId>ql</artifactId>
            <version>2.5.43</version>
        </dependency>
      
        <dependency>
            <groupId>com.cloudera.impala.jdbc</groupId>
            <artifactId>TCLIServiceClient</artifactId>
            <version>2.5.43</version>
        </dependency>
      
        <dependency>
            <groupId>org.apache.thrift</groupId>
            <artifactId>libthrift</artifactId>
            <version>0.9.0</version>
        </dependency> 
      
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
</dependencies>

3.hadoop文件

文件放到resources 文件夾里面要放到根目錄下面

1.core-site.xml

4. 代碼訪問

//main函數
    public static void main(String[] args) throws SQLException, ClassNotFoundException, IOException {
        /*
         * 獲取impala connection(kerberos 認證)
         * */
//        Connection kerberosConn = GetImpalaClient.getKuduClientKerberos();

        /*
         * 獲取impala connection(LDAP 認證)
         * */

        Connection ldapConn = GetImpalaClient.getKuduClientLDAP();

        /*
         * 通過impala 獲取kudu 表里面的數據(kerberos 認證)
         * */

//        KuduImpalaTest.getKuduData(kerberosConn, "kudutest", "zhckudutest1");

        /*
         * 通過impala 獲取kudu 表里面的數據(LDAP 認證)
         * */

        KuduImpalaTest.getKuduData(ldapConn, "kudutest", "zhckudutest1");

    }
//獲取kudu表里面的數據   
public static void getKuduData( Connection connection,String database,String tableName) throws SQLException, ClassNotFoundException, IOException {

        PreparedStatement ps=null;
        ResultSet rs=null;
        try {
            ps = connection.prepareStatement("select  * from "+database+"."+tableName);
            rs = ps.executeQuery();
            while (rs.next()) {
                System.out.println(rs.getString(1) + "  ******  " + rs.getString(2));
            }
            try{
                connection.close();
            }catch (Exception e){
                e.printStackTrace();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

kudu Spark  (scala)

1.kerberos 認證(需要將相應用戶的keryab文件引入本地)

 def kerberosAuth(debug: Boolean): Unit = {
    try {
      System.setProperty("java.security.krb5.conf", "D:\\cdh\\kudu\\src\\main\\kerberos\\krb5.conf")
      //            System.setProperty("java.security.krb5.conf","/lvm/data3/zhc/krb5.conf");
      System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
      if (debug) System.setProperty("sun.security.krb5.debug", "true")
      UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "D:\\cdh\\kudu\\src\\main\\kerberos\\gree1.keytab")
      //            UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "/lvm/data3/zhc/gree1.keytab");
      System.out.println(UserGroupInformation.getCurrentUser)
    } catch {
      case e: Exception =>
        e.printStackTrace()
    }
  }

2.maven依賴

  
 <properties>
    	    <kudu-version>1.10.0-cdh6.3.0</kudu-version>
 </properties>
 <dependencies>
       <!--認證依賴-->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>3.0.0-cdh6.3.0</version>
            </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>3.0.0-cdh6.3.0</version>
            </dependency>
      	<dependency>
            <groupId>org.apache.kudu</groupId>
            <artifactId>kudu-spark2_2.11</artifactId>
            <version>${kudu-version}</version>
        </dependency>
      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0-cdh6.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0-cdh6.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.25</version>
        </dependency>
</dependencies>     

3.hadoop文件

文件放到resources 文件夾里面要放到根目錄下面

1.core-site.xml

4.代碼訪問(scala)

訪問代碼如下:

import org.apache.spark.sql.SparkSession
import kuduscalautil.{GetKuduConnect, KerberosAuth}
object KuduSparkTest {
//main
  def main(args: Array[String]): Unit = {

    new KerberosAuth().kerberosAuth(false);

    val spark = SparkSession.builder.appName("zhc_SparkTest").master("local[*]").getOrCreate();

    val kuduContext = new GetKuduConnect().getKuduContext(spark.sqlContext.sparkContext);

    //創建表
    new KuduSparkFunction().createTable(kuduContext,spark,"impala_kudu.zhcTestKudu",false);
   }
}
import java.io.IOException
import java.security.PrivilegedExceptionAction

import org.apache.hadoop.security.UserGroupInformation
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.SparkContext
class GetKuduConnect {
  val kuduMaster: String = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051");
  def getKuduContext(sparkcontext:SparkContext): KuduContext = {

    var kuduContext:KuduContext = null
    try
      kuduContext = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduContext]() {
        @throws[Exception]
        override def run: KuduContext =new KuduContext(kuduMaster, sparkcontext)
      })
    catch {
      case e: IOException =>
        e.printStackTrace()
      case e: InterruptedException =>
        e.printStackTrace()
    }
    return kuduContext
  }
}

import org.apache.kudu.client
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
class KuduSparkFunction {
val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas", 1)
  val nameCol = "name";
  val idCol = "id";
  val logger = LoggerFactory.getLogger(KuduSparkTest.getClass)
  /*
* 創建表
* */
  def createTable(kuduContext:KuduContext,spark:SparkSession,tableName:String,delectTable:Boolean):Unit={

    val schema = StructType(
      List(
        StructField(idCol, StringType, false),
        StructField(nameCol, StringType, false)
      )
    )

    //創建表之后是否刪除表

    var tableIsDelete = delectTable;

    try {
      if (kuduContext.tableExists(tableName)) {
        throw new RuntimeException(tableName + ":table already exists")
      }
      println(s"開始創建表$tableName")
      kuduContext.createTable(tableName, schema, Seq(idCol), new client.CreateTableOptions().addHashPartitions(List(idCol).asJava, 3).setNumReplicas(tableNumReplicas))
      println("創建成功")
    } catch {
      case unknown: Throwable => logger.error(s"got an exception" + unknown)
    } finally {
      if (tableIsDelete) {
        logger.info(s"deleting table '$tableName'")
        kuduContext.deleteTable(tableName)
      }
      logger.info(s"closing dowm the session")
      spark.close()
    }
  }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM