前言
Flink 自帶了一個SQLClient,截至目前Flink-1.13.0,Flink還沒有Flink SQL Gateway。
需求
由於需要在提供友好的用戶界面,類似於低代碼平台,因此需要一個WEB服務來調用執行用戶的SQL。
調研
Flink SQLClient 就是一個很好的樣例。
思路就是:
實現一個SQL Parser:
- 將用戶輸入的SQL文本,使用正則表達式,進行分割,轉換成一條條DDL,DML
- 一行一行地調用Flink Table API執行。
TableEnvironment:
public interface TableEnvironment {
TableResult executeSql(String statement);
}
執行過程:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
tableEnv.useCatalog("slankka");
tableEnv.useDatabase(database);
tableResult = tableEnv.executeSql(cmd);
HiveCatalog 多租戶和Kerberos
在這篇文章中,對Hive進行了修改,以支持Kerberos。
【Flink系列六】構建實時計算平台——Flink 1.10+通過Kerberos連接HiveCatalog
本篇文章將基於這個改動,提供多租戶的思路。
HiveCatalog 類的源碼如下:
//org.apache.flink.table.catalog.hive.HiveCatalog
public class HiveCatalog extends AbstractCatalog {
@VisibleForTesting HiveMetastoreClientWrapper client;
@Override
public void open() throws CatalogException {
if (client == null) {
client = HiveMetastoreClientFactory.create(hiveConf, hiveVersion);
LOG.info("Connected to Hive metastore");
}
//....
}
@Override
public void close() throws CatalogException {
if (client != null) {
client.close();
client = null;
LOG.info("Close connection to Hive metastore");
}
}
}
可以看到底層是一個HiveMetastoreClientWrapper
//HiveMetastoreClientWrapper.java
private IMetaStoreClient createMetastoreClient() {
return hiveShim.getHiveMetastoreClient(hiveConf);
}
HiveShimV100,對應hive-1.x。
//HiveShimV100.java
public class HiveShimV100 implements HiveShim {
@Override
public IMetaStoreClient getHiveMetastoreClient(HiveConf hiveConf) {
try {
return new HiveMetaStoreClient(hiveConf);
} catch (MetaException ex) {
throw new CatalogException("Failed to create Hive Metastore client", ex);
}
}
}
因此HiveCatalog.open
導致 HiveMetaStoreClient
的創建。
那么對於Kerberos代理,則需要HADOOP_PROXY_USER這個變量。
HADOOP_PROXY_USER的運行時設置
Flink 的 TableEnvironment 有這個API:
void registerCatalog(String catalogName, Catalog catalog);
這個將注冊一個Catalog,對於持久化的Catalog,Flink目前只有HiveCatalog,那么可以覆蓋HiveCatalog的子類的open/close方法來實現運行時切換用戶。
new HiveCatalog(...) {
@Override
public void open() throws CatalogException {
wrapExecution(super::open);
}
@Override
public void close() throws CatalogException {
wrapExecution(super::close);
}
public void wrapExecution(Runnable consumer) {
//大致代碼如下:
String current = System.getProperty(HADOOP_PROXY_USER);
try {
System.setProperty(HADOOP_PROXY_USER, slankkaProvidedAccount);
consumer.run();
} finally {
if (current != null) {
System.setProperty(HADOOP_PROXY_USER, current);
} else {
System.clearProperty(HADOOP_PROXY_USER);
}
}
}
}
篇幅有限,大致記錄一個思路。