Spark 官方文檔(5)——Spark SQL,DataFrames和Datasets 指南


Spark版本:1.6.2

概覽

Spark SQL用於處理結構化數據,與Spark RDD API不同,它提供更多關於數據結構信息和計算任務運行信息的接口,Spark SQL內部使用這些額外的信息完成特殊優化。可以通過SQL、DataFrames API、Datasets API與Spark SQL進行交互,無論使用何種方式,SparkSQL使用統一的執行引擎記性處理。用戶可以根據自己喜好,在不同API中選擇合適的進行處理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中執行。

SQL

執行SQL語句的方法有多種:

  • 可以使用基礎SQL語法或HiveQL語法在Spark SQL上執行查詢,SparkSQL可以從已安裝的Hive中讀取數據。當使用其他編程語言時,結果集以DataFrame類型返回
  • 通過SQL命令行進行交互(spark-sql)
  • 可以通過JDBC/ODBC驅動進行交互

DataFrames

DataFrame是由分布式數據集合組成的一系列命名列,它與關系數據庫的表類似,但有很多優化的地方。DataFrame支持多種數據源,包括結構化數據、Hive的表、外部數據庫、RDDs等。DataFrame API支持scala 、java、Python和R語言。

Datasets

數據集接口在Spark1.6才加入,它可以使用Spark SQL的優化器對RDD操作進行優化。Dataset有JVM對象構建,並可以進行map、flatMap、filter等操作。Dataset API統一接口支持java和scala語言。

開始

程序入口: SQLContext

SQLContext是Spark SQL所有功能的入口,通過SparkContext可以創建該對象的實例:

val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._

除了SQLContext,還可以創建HiveContext對象,它包含更多的功能,例如HiveQL解析器支持更完善的語法、使用Hive用戶自定義函數UDFs、從Hive表中讀取數據等。HiveContext不依賴Hive是否安裝,Spark默認支持HiveContext。從Spark1.3以后,推薦使用HiveContext,未來SQLContext會包含HiveContext中的功能。
可以通過spark.sql.dialect選項更改SQL解析器,這個參數可以再SQLContext的setConf方法設置,也可以通過SQL的ky=value語法設計。在SQLContext中dialect只支持一種簡單的SQL解析器“sql”。HiveContext默認解析器是“hiveql”,同時支持“sql”,但一般推薦hiveql,因為它語法更全。

創建DataFrames

DataFrames的數據源多種多樣,例如RDD、Hive table或者其他數據源。 下面代碼從JSON文件創建了一個DataFrame

JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);  DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show();

DataFrame 操作

DataFrame支持結構化數據領域常用的數據操作,支持Scala、Java、Python和R語言,下面是一些基本操作示例:

JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1

對於DataFrame的所有操作類型可以參考API文檔。除了簡單的列操作,DataFrame還支持字符串操作、日期算法、數據操作等等,可以參考DataFrame函數文檔

編碼實現SQL查詢

SQLContext的sql方法支持運行sql語法的查詢,並返回DataFrame類型的結果集:

SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext.sql("SELECT * FROM table")

創建Datasets

Dataset與RDD類似,但它不適用java序列化也不適用Kryo,而是使用特定的Encoder作為序列化工具。Encoder可以對Spark對象進行序列化和反序列化,同時不需要反序列化在字節級別就能支持filtering、sorting和hashing等操作。

// Encoders for most common types are automatically provided by importing sqlContext.implicits._ val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // Returns: Array(2, 3, 4) // Encoders are also created for case classes. case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]

RDD交互操作

在Spark SQL中有兩種方式可以在DataFrame和RDD進行轉換,第一種方法是利用反射機制,推導包含某種類型的RDD,通過反射將其轉換為指定類型的DataFrame,適用於提前知道RDD的schema。
第二種方法通過編程接口與RDD進行交互獲取schema,並動態創建DataFrame,在運行時決定列及其類型。

使用反射推斷Schema

Scala支持使用case class類型導入RDD轉換為DataFrame,通過case class創建schema,case class的參數名稱會被利用反射機制作為列名。case class可以嵌套組合成Sequences或者Array。這種RDD可以高效的轉換為DataFrame並注冊為表。

// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19)

編程指定schema

當case class不能提前定義好時,可以通過以下三步通過代碼創建DataFrame

  • 將RDD轉為包含row對象的RDD
  • 基於structType類型創建schema,與第一步創建的RDD相匹配
  • 通過SQLContext的createDataFrame方法對第一步的RDD應用schema
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val people = sc.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType,StructField,StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println)

數據源

DataFrame接口支持一系列的數據源,它可以按照普通RDD進行操作,也能被注冊為臨時表進行操作。注冊臨時表后可以使用SQL查詢操作數據集,本章節介紹了常用加載保存數據的方法,同時給出了內部數據源的特殊操作。

常規Load/Save函數

未配置spark.sql.sources.default情況下,默認使用parquet數據源處理所有操作。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

手動指定選項

用戶可以手動指定數據源加載的選項,對於數據源類型需要使用完整名稱指定例如(org.apache.spark.sql.parquet),但對於內部類型可以使用簡稱,例如(json parquet jdbc等)。可以通過以上方法在不同DataFrame之間進行轉換。

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

在文件上直接執行SQL

除了需要將文件加載到DataFrame再執行sql以外,還可以直接執行sql

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式

Save通過SaveMode指定如何維護現有的數據。需要注意的是savemode未對數據加鎖,因而不是源自操作。若使用overwrite模式時,原有數據會先被清空。

Scala/Java Any Language 含義
SaveMode.ErrorIfExists (default) "error" (default) 當數據輸出的位置已存在時,拋出此異常
SaveMode.Append "append" 當數據輸出的位置已存在時,在文件后面追加
SaveMode.Overwrite "overwrite" 當數據輸出的位置已存在時,重寫
SaveMode.Ignore "ignore" 當數據輸出的位置已存在時,不執行任何操作,與 CREATE IF NOT EXISTS類似

保存到持久化表中

使用HiveContext時,DataFrame可以使用saveAsTable方法保存到持久化表中。與registerTempTable不同,saveASTable會為其真正創建數據區並創建指向該區域的指針放入HiveMetaStore中。在持有同一個metastore的連接期間,持久化的數據會一直存在,即使spark程序重啟也不影響。可以通過SQLContext的table方法創建用於持久化表的DataFrame。
默認的saveASTable會創建“managed table”,其數據位置會被metastore維護,被管理的表數據會在表被刪除時清空。

Parquet文件

parquet是一種流行的列式存儲格式。SparkSQL支持對parquet的讀寫以及schema和數據的維護。在寫parquet文件時,為了兼容,所有列都會轉換為nullable格式。

編程實現數據加載

// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分區推斷

表分區是Hive等系統的常用優化手段。在一個分區表中,數據經常分布在不同目錄下,分區列的值相同的數據分布在同一目錄中。目前支持對parquet文件進行自動推斷分區。例如我們可以將之前的數據增加兩列gender和country,並將兩列作為分區列進行數據分區。

path
└── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...

將數據路徑傳給SQLContext后,可以自動推斷DataFrame數據的分區信息。注意,數據的分區列是自動推斷出來你的,目前分區列支持數值類型和string類型。若用戶不希望自動推斷分區列時,可以通過spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自動推斷,此時會使用string類型列進行分區。 分區類型會根據傳入的路徑進行推斷,但用戶可以配置數據源的basePath屬性設置分析的路徑。

Schema合並

parquet支持列增加等操作,當出現多個互相兼容的schemas時,parquet可以自動檢測並合並這些文件的schema。由於schema 合並會消耗大量的資源,默認關閉該操作,可以通過以下方法打開:

  • 設置數據源mergeSchema屬性為true
  • 設置SQL的選項spark.sql.parquet.mergeSchema為true
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)

Hive metasotre Parquet表轉化

SparkSQL使用內部庫而不是Hive SerDe,對Hive metasotre Parquet表進行讀寫,性能很好,可以通過spark.sql.hive.convertMetastoreParquet配置。

Hive/Parquet Schema Reconciliation

由於Hive和Parquet的元數據處理方式不同,如下所示

  • Hive忽略大小寫,而Parquet沒有
  • Hive所有字段都是nullable,而parquet中null是有意義的值(避免理解錯誤,貼上原文:Hive considers all columns nullable, while nullability in Parquet is significant)

將Hive metastore Parquet table轉換為Spark SQL parquet表時,遵從以下規則:

  • 相同名稱的字段的數據類型必須相同,nullable類型被忽略。由於融合的數據類型需要在parquet中有對應的類型,所以nullability類型需要處理。

  • 融合后schema中包含了Hive元數據中定義的值

    • 任何只在Parquet schema中出現的字段被拋棄
    • 任何旨在Hive元數據中出現的字段作為nullable增加到融合后元數據中

元數據刷新

Spark SQL會緩存parquet元數據以便提高性能。若Hive metastore Parquet table轉換被啟用,則轉換的表元數據也會被cache。若這些元數據被外部工具修改,則需要手動更新緩存元數據保持一致性。

// sqlContext is an existing HiveContext sqlContext.refreshTable("my_table")

配置

與parquet相關的配置參數如下所示

參數 默認值 描述
spark.sql.parquet.binaryAsString false 該選項讓SparkSQL將string安裝二進制數據按照字符串處理,以便兼容老系統
spark.sql.parquet.int96AsTimestamp true Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
spark.sql.parquet.cacheMetadata true 緩存Parquet的Schema元數據,提高查詢靜態數據效率
spark.sql.parquet.compression.codec gzip 設置Parquet文件的壓縮編碼方式,支持 uncompressed, snappy, gzip, lzo.
spark.sql.parquet.filterPushdown true 啟用過濾謂詞下推優化,將過濾下推到抽取數據時,取得性能的提升
spark.sql.hive.convertMetastoreParquet true 若設為false,Spark SQL使用Hive SerDe支持對Parquet tables的操作.
spark.sql.parquet.output.committer.class org.apache.parquet.hadoop.ParquetOutputCommitter The output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter.
spark.sql.parquet.mergeSchema false 是否開啟Schema合並

JSON數據集

SQLContext.read.josn()接口可以自動推斷JSON文件的schema。SparkSQL支持的JSON文件中每一行需要是一個完整的JSON對象,不支持跨行的json對象。

// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive 表

Spark SQL支持從Hive中讀取數據,但由於Hive依賴過多,默認不支持Hive,需要在編譯時添加-Phive -Phive-thriftserver選項。由於用到Hive的序列化和反序列化需要保證Hive包在各個worker中都存在。

將hive-site.xml、core-site.xml和hdfs-site.xml放入conf目錄下配置Hive環境。在Yarn集群上面運行時,需要確定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通過spark-submit的--jars和--file參數檢查是否存在。
若通過Spark SQL操作Hive需要創建HiveContext,增加元數據功能及HiveQL支持。若沒有部署Hive環境同樣可以創建HiveContext。若沒有在hive-site.xml中配置,會自動在當前目錄創建metastore_db並在/user/hive/warehouse創建倉儲目錄,需要給hive對/user/hive/warehouse的寫權限。

// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

與不同版本Hive Metastore交互

由於Spark SQL可以與不同版本的Hive Metastor(而不是Hive的版本)進行交互,只需要修改部分的配置信息,相關配置如下:

屬性 默認值 描述
spark.sql.hive.metastore.version 1.2.1 Hive metastore的版本信息,從0.12.0到1.2.1
spark.sql.hive.metastore.jars builtin 指定metastore的Jar包位置,builtin:該jar被打包到spark應用程序中;maven:使用maven遠程倉儲下載;類路徑:需要包含hive所有的依賴包
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc 一個逗號分隔的類名前綴列表,這些類使用classloader加載,且可以在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就需要這種共享。其他需要共享的類,是與某些已經共享的類有交互的類。例如,自定義的log4j appender。
spark.sql.hive.metastore.barrierPrefixes (empty) 使用逗號分隔的類名前綴列表,Spark SQL所訪問的每個Hive版本都會被顯式的reload這些類。

JDBC連接其他數據庫

SparkSQL通過JdbcRDD實現對支持jdbc的數據庫進行數據加載,將其作為DataFrame進行操作。JDBC加載的數據源不需要提供classTag。使用前需要將JDBC Driver包含在spark的classpath中。例如連接postgres需要如下設置

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

數據庫中的表可以作為DataFrame或SparkSQL的臨時表加載,支持以下的選項:

屬性 描述
url JDBC連接URL
dbtable 需要讀取的JDBC表。任何在From子句中的元素都可以,例如表或者子查詢等。
partitionColumn, lowerBound, upperBound, numPartitions 這些選項需要同時制定,他們制定了如何並發讀取數據的同時進行分區。lowerBound, upperBound僅用於確定分區邊界不用於過濾數據,所有數據都會被分區
fetchSize 決定了每次數據取多少行
val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()

疑難問題

  • JDBC的driver類需要在所有executor可見,因為Java的DriverManager會進行安全檢查,忽略所有不可見的類。可以通過修改每個worker節點的compute_classpath.sh以便包含Jar包
  • 有些數據庫例如H2的名稱是大寫,需要在SparkSQL中同樣使用大寫

性能調優

對於一些負載可以通過內存緩存數據或者調整參數提高性能。

內存緩存數據

Spark SQL可以通過sqlContext.cacheTable("tableName") 或 dataFrame.cache()接口將RDD數據緩存到內存中。SparkSql可以近掃描需要的列並自動壓縮、進行垃圾回收等。可以通過sqlContext.uncacheTable("Tablename")從內存中移除表。

屬性 默認值 描述
spark.sql.inMemoryColumnarStorage.compressed true 若設為true,Spark SQL會基於列的統計數據自動選擇壓縮器進行數據壓縮
spark.sql.inMemoryColumnarStorage.batchSize 10000 控制列緩存的每批次的數據大小,數據越大則內存利用率及壓縮比例越大,但OOM風險也越大

其他配置信息

可以通過修改以下配置提高查詢執行的性能,以后可能會棄用以下設置,而變為自動進行最優化配置。

屬性 默認值 描述
spark.sql.autoBroadcastJoinThreshold 10485760 (10 MB) 配置做join操作時被廣播變量的表的大小。當設為-1時禁用廣播。目前只有Hive元數據支持統計信息,可以通過ANALYZE TABLE <tablename> COMPUTE STATISTICS進行信息統計
spark.sql.tungsten.enabled true 若為true,或使用tungsten物理優化執行,顯式地管理內存並動態生成表達式計算的字節碼
spark.sql.shuffle.partitions 200 配置shuffle操作時的分區數量

分布式SQL引擎

當使用JDBC/ODBC或者命令行進行交互時,SparkSQL可以作為分布式查詢引擎執行。在這種模式下,Spark SQL的應用能夠不寫代碼便執行查詢。

運行Thrift JDBC/ODBC驅動

這里的實現與HiveServer2類似,可以通過beeline測試Spakr或者Hive1.2.1的JDBC驅動。通過以下命令啟動jdbc驅動

./sbin/start-thriftserver.sh

這腳本支持所有的spark-submit的參數,還支持--hiveconf指定特定的Hive屬性。可以通過--help查看本腳本具體參數。默認server監聽的端口是10000,可以覆蓋一些環境變量:

export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...

或者修改系統屬性

./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...

可以通過beeline測試Thrift JDBC/ODBC驅動

./bin/beeline

連接JDBC/ODBC驅動

beeline> !connect jdbc:hive2://localhost:10000

可能需要輸入用戶和密碼進行安全驗證,在非安全模式下,只需要本機的用戶名和空密碼即可。通過hive-site.xml, core-site.xml 和 hdfs-site.xml配置Hive。ThriftJDBC驅動同時支持通過HTTP端口發送thrift RPC消息。通過hive-site.xml中的配置開啟HTTP模式作為系統屬性:

hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice

beeline可以通過http模式連接JDBC/ODBC

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

通過Spark SQL CLI運行

CLI是在單點模式下執行Hive元數據服務和查詢的命令工具,但它不能與Thrift JDBC驅動進行會話。

./bin/spark-sql

與Apache Hive的兼容性

Spark SQL設計時考慮對Hive metastore,SerDes以及UDF的兼容。目前是基於Hive-1.2.1版本,並且Spark SQL可以連到不同版本(0.12.0到1.2.1)的Hive metastore。Spark SQL Thrift JDBC可以直接在已經部署Hive的環境運行。

不支持的Hive功能

  • bucket表:butcket是Hive的哈希分區
  • Union功能
  • unique join
  • 字段統計信息
  • Hadoop歸檔文件
  • Hive的部分優化功能

參考

數據類型

Spark SQL和DataFrame支持以下數據類型

  • numeric類型
    • ByteType:單字節有符號整數
    • ShortType:2個字節的有符號整數
    • IntegerType:4字節整數
    • LongType:8字節整數
    • FloatType:4字節單精度浮點數
    • DoubleType:8字節雙精度浮點數
    • DecimalType:任意精度有符號帶小數的數值
  • String類型
  • Binary二進制類型
  • Boolean布爾類型
  • Datetime時間類型
    • TimestampType:時間戳類型
    • DateType:日期類型,只包含年月日
  • Complex復雜類型
    • ArrayType:數組類型
    • MapType:map類型
    • StructType:包含StructField序列的結構體

所有的數據類型都在org.apache.spark.sql.types中。

NaN含義

NaN是not a number的簡寫,用於處理不符合浮點數格式的float和double數據,其語義需要特殊處理:

  • NaN = NaN返回true
  • 聚集過程中,所有NaN會被放到同一分組中
  • NaN在join過程中被看成普通的值
  • NaN在升序排序時放到最后,被認為是最大的數值


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM