【Flink系列十一】FlinkSQL Gateway以及支持Kerberos多租戶的實現思路


前言

Flink 自帶了一個SQLClient,截至目前Flink-1.13.0,Flink還沒有Flink SQL Gateway。

需求

由於需要在提供友好的用戶界面,類似於低代碼平台,因此需要一個WEB服務來調用執行用戶的SQL。

調研

Flink SQLClient 就是一個很好的樣例。

思路就是:
實現一個SQL Parser:

  1. 將用戶輸入的SQL文本,使用正則表達式,進行分割,轉換成一條條DDL,DML
  2. 一行一行地調用Flink Table API執行。

TableEnvironment:

public interface TableEnvironment {
    TableResult executeSql(String statement);
}

執行過程:

EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.useCatalog("slankka");
tableEnv.useDatabase(database);

tableResult = tableEnv.executeSql(cmd);

HiveCatalog 多租戶和Kerberos

在這篇文章中,對Hive進行了修改,以支持Kerberos。
【Flink系列六】構建實時計算平台——Flink 1.10+通過Kerberos連接HiveCatalog
本篇文章將基於這個改動,提供多租戶的思路。

HiveCatalog 類的源碼如下:

//org.apache.flink.table.catalog.hive.HiveCatalog
public class HiveCatalog extends AbstractCatalog {
    @VisibleForTesting HiveMetastoreClientWrapper client;

     @Override
    public void open() throws CatalogException {
        if (client == null) {
            client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion);
            LOG.info("Connected to Hive metastore");
        }
        //....
    }

    @Override
    public void close() throws CatalogException {
        if (client != null) {
            client.close();
            client = null;
            LOG.info("Close connection to Hive metastore");
        }
    }
}

可以看到底層是一個HiveMetastoreClientWrapper

//HiveMetastoreClientWrapper.java

    private IMetaStoreClient createMetastoreClient() {
        return hiveShim.getHiveMetastoreClient(hiveConf);
    }

HiveShimV100,對應hive-1.x。

//HiveShimV100.java
public class HiveShimV100 implements HiveShim {
    @Override
    public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
        try {
            return new HiveMetaStoreClient(hiveConf);
        } catch (MetaException ex) {
            throw new CatalogException("Failed to create Hive Metastore client", ex);
        }
    }
}

因此HiveCatalog.open 導致 HiveMetaStoreClient的創建。

那么對於Kerberos代理,則需要HADOOP_PROXY_USER這個變量。

HADOOP_PROXY_USER的運行時設置

Flink 的 TableEnvironment 有這個API:

void registerCatalog(String catalogName, Catalog catalog);

這個將注冊一個Catalog,對於持久化的Catalog,Flink目前只有HiveCatalog,那么可以覆蓋HiveCatalog的子類的open/close方法來實現運行時切換用戶。

new HiveCatalog(...) {
      @Override
      public void open() throws CatalogException {
        wrapExecution(super::open);
      }
      @Override
      public void close() throws CatalogException {
        wrapExecution(super::close);
      }
      
      public void wrapExecution(Runnable consumer) {
          //大致代碼如下:
          String current = System.getProperty(HADOOP_PROXY_USER);
          try {
             System.setProperty(HADOOP_PROXY_USER, slankkaProvidedAccount);
             consumer.run();
          } finally {
             if (current != null) {
                System.setProperty(HADOOP_PROXY_USER, current);
             } else {
                System.clearProperty(HADOOP_PROXY_USER);
             }
          }
      }
}

篇幅有限,大致記錄一個思路。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM