SparkSQL從2.0開始已經不再支持ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...)
這種語法了(下文簡稱add columns語法)。如果你的Spark項目中用到了SparkSQL+Hive這種模式,從Spark1.x升級到2.x很有可能遇到這個問題。
為了解決這個問題,我們一般有3種方案可以選擇:
- 啟動一個hiveserver2服務,通過jdbc直接調用hive,讓hive執行add columns語句。這種應該是改起來最為方便的一種方式了,缺點就是,我們還需要在啟動一個hiveserver服務,多一個服務依賴,會增加整個系統的維護成本。
- SparkSQL+Hive這種模式,要求我們啟動一個HiveMetastore服務,給SparkSQL用,我們也可以在代碼中直接直接連接HiveMetastore去執行add columns語句。這種方式的好處是不需要額外依賴其他服務,缺點就是我們要自己調用HiveMetastore相關接口,自己管理SessionState,用起來比較麻煩。
- 最后一種方式就是直接修改Spark,讓他支持add columns語法。這種方式最大的好處就是我們原有的業務邏輯代碼不用動,問題就在於,要求對Spark源碼有一定的了解,否則改起來還是挺費勁的。這也是我寫這篇文章的目的:讓大家能夠參考本文自行為Spark添加add columns語法支持。
OK,接下來,我們進入主題。
為Spark添加add columns語法支持
本文基於最新版的Spark 2.1.0,源碼地址:https://github.com/apache/spark/tree/branch-2.1
1. 改進語法定義
Spark2.1開始使用ANTLR來解析SQL語法,它的語法定義文件借鑒的Presto項目,我們在Spark源碼中找到這個文件sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
,做如下改動:
@@ -127,6 +127,8 @@ statement
('(' key=tablePropertyKey ')')? #showTblProperties
| SHOW COLUMNS (FROM | IN) tableIdentifier
((FROM | IN) db=identifier)? #showColumns
+ | ALTER TABLE tableIdentifier ADD COLUMNS
+ ('(' columns=colTypeList ')')? #addColumns
| SHOW PARTITIONS tableIdentifier partitionSpec? #showPartitions
| SHOW identifier? FUNCTIONS
(LIKE? (qualifiedName | pattern=STRING))? #showFunctions
@@ -191,7 +193,6 @@ unsupportedHiveNativeCommands
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=SET kw4=FILEFORMAT
- | kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CHANGE kw4=COLUMN?
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=REPLACE kw4=COLUMNS
| kw1=START kw2=TRANSACTION
194行的kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=ADD kw4=COLUMNS
是在unsupportedHiveNativeCommands
列表中,我們首先把它去掉。
為了讓Spark能解析ALTER TABLE table_name ADD COLUMNS (col_name data_type [COMMENT col_comment], ...)
,我們還需要在129行處新增| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns
最后的#addColumns
是為了讓ANTLR插件(這個插件定義在sql/catalyst/pom.xml中)為我們自動生成addColumns相關方法,便於我們做語法解析處理。這個語法中有2個參數需要我們處理table_name和columns。
2. 改進SparkSqlAstBuilder,使其能處理addColumns
SparkSqlAstBuilder
的作用是將ANTLR的語法樹翻譯為LogicalPlan/Expression/TableIdentifier
要修改的文件為:sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
,我們在178行處,新增如下方法:
override def visitAddColumns(ctx: AddColumnsContext): LogicalPlan = withOrigin(ctx) {
val tableName = visitTableIdentifier(ctx.tableIdentifier())
val dataCols = Option(ctx.columns).map(visitColTypeList).getOrElse(Nil)
AlterTableAddColumnsCommand(tableName, dataCols)
}
visitAddColumns方法是ANTLR插件自動為我們生成的方法,定義在SparkSqlAstBuilder的父類AstBuilder中(AST,Abstract Syntax Tree ,抽象語法樹),這個方法用來處理我們在SqlBase.g4中定義的| ALTER TABLE tableIdentifier ADD COLUMNS ('(' columns=colTypeList ')')? #addColumns
,我們這里重載了visitAddColumns方法用來提取表名及新增的字段列表,並返回一個LogicalPlan:AlterTableAddColumnsCommand,這個類我們接下來會說明。
3. 新增一個為表添加字段的命令
修改sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
,在120行處,新增AlterTableAddColumnsCommand類:
case class AlterTableAddColumnsCommand(
tableName: TableIdentifier,
newColumns: Seq[StructField]) extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
val newSchema = StructType(table.schema.fields ++ newColumns)
val newTable = table.copy(schema = newSchema)
catalog.alterTable(newTable)
Seq.empty[Row]
}
}
RunnableCommand類繼承自LogicalPlan,run方法用於執行addColumns語法對應的執行邏輯。這個類的處理邏輯比較簡單,就不詳細介紹了。
4. 修復HiveExternalCatalog無法修改表schema的問題
我們在第3步的AlterTableAddColumnsCommand中,雖然調用了catalog.alterTable(newTable)
來修改表信息,但實際上並不能將新的字段添加到表中,因為Spark代碼寫死了,不能改Hive表的schema,我們還需要修改HiveExternalCatalog類(sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala),改動如下:
@@ -588,7 +588,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val newTableProps = oldDataSourceProps ++ withStatsProps.properties + partitionProviderProp
val newDef = withStatsProps.copy(
storage = newStorage,
- schema = oldTableDef.schema,
+ // allow `alter table xxx add columns(xx)`
+ schema = tableDefinition.schema,
partitionColumnNames = oldTableDef.partitionColumnNames,
bucketSpec = oldTableDef.bucketSpec,
properties = newTableProps)
我們將591行的schema = oldTableDef.schema
替換為schema = tableDefinition.schema
即可。
至此,我們完成了整個代碼的調整。
最后參考Spark的編譯文檔:http://spark.apache.org/docs/latest/building-spark.html#building-a-runnable-distribution,將Spark編譯打包即可。
Spark 2.x會將編譯后的assembly放到jars目錄下,我們這次的改動會影響到以下幾個jar包:
- spark-catalyst_2.11-2.1.0.jar
- spark-sql_2.11-2.1.0.jar
- spark-hive_2.11-2.1.0.jar
如果Spark已經部署過了,可以直接將以上3個jar替換掉。
更新Spark后,我們就可以使用alter table xxx add columns(xx)
了。