Spark版本:1.6.2
概覽
Spark SQL用於處理結構化數據,與Spark RDD API不同,它提供更多關於數據結構信息和計算任務運行信息的接口,Spark SQL內部使用這些額外的信息完成特殊優化。可以通過SQL、DataFrames API、Datasets API與Spark SQL進行交互,無論使用何種方式,SparkSQL使用統一的執行引擎記性處理。用戶可以根據自己喜好,在不同API中選擇合適的進行處理。本章中所有用例均可以在spark-shell、pyspark shell、sparkR中執行。
SQL
執行SQL語句的方法有多種:
- 可以使用基礎SQL語法或HiveQL語法在Spark SQL上執行查詢,SparkSQL可以從已安裝的Hive中讀取數據。當使用其他編程語言時,結果集以DataFrame類型返回
- 通過SQL命令行進行交互(spark-sql)
- 可以通過JDBC/ODBC驅動進行交互
DataFrames
DataFrame是由分布式數據集合組成的一系列命名列,它與關系數據庫的表類似,但有很多優化的地方。DataFrame支持多種數據源,包括結構化數據、Hive的表、外部數據庫、RDDs等。DataFrame API支持scala 、java、Python和R語言。
Datasets
數據集接口在Spark1.6才加入,它可以使用Spark SQL的優化器對RDD操作進行優化。Dataset有JVM對象構建,並可以進行map、flatMap、filter等操作。Dataset API統一接口支持java和scala語言。
開始
程序入口: SQLContext
SQLContext是Spark SQL所有功能的入口,通過SparkContext可以創建該對象的實例:
val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._
除了SQLContext,還可以創建HiveContext對象,它包含更多的功能,例如HiveQL解析器支持更完善的語法、使用Hive用戶自定義函數UDFs、從Hive表中讀取數據等。HiveContext不依賴Hive是否安裝,Spark默認支持HiveContext。從Spark1.3以后,推薦使用HiveContext,未來SQLContext會包含HiveContext中的功能。
可以通過spark.sql.dialect選項更改SQL解析器,這個參數可以再SQLContext的setConf方法設置,也可以通過SQL的ky=value語法設計。在SQLContext中dialect只支持一種簡單的SQL解析器“sql”。HiveContext默認解析器是“hiveql”,同時支持“sql”,但一般推薦hiveql,因為它語法更全。
創建DataFrames
DataFrames的數據源多種多樣,例如RDD、Hive table或者其他數據源。 下面代碼從JSON文件創建了一個DataFrame
JavaSparkContext sc = ...; // An existing JavaSparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show();
DataFrame 操作
DataFrame支持結構化數據領域常用的數據操作,支持Scala、Java、Python和R語言,下面是一些基本操作示例:
JavaSparkContext sc // An existing SparkContext. SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create the DataFrame DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); // age name // null Michael // 30 Andy // 19 Justin // Print the schema in a tree format df.printSchema(); // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show(); // name // Michael // Andy // Justin // Select everybody, but increment the age by 1 df.select(df.col("name"), df.col("age").plus(1)).show(); // name (age + 1) // Michael null // Andy 31 // Justin 20 // Select people older than 21 df.filter(df.col("age").gt(21)).show(); // age name // 30 Andy // Count people by age df.groupBy("age").count().show(); // age count // null 1 // 19 1 // 30 1
對於DataFrame的所有操作類型可以參考API文檔。除了簡單的列操作,DataFrame還支持字符串操作、日期算法、數據操作等等,可以參考DataFrame函數文檔
編碼實現SQL查詢
SQLContext的sql方法支持運行sql語法的查詢,並返回DataFrame類型的結果集:
SQLContext sqlContext = ... // An existing SQLContext DataFrame df = sqlContext.sql("SELECT * FROM table")
創建Datasets
Dataset與RDD類似,但它不適用java序列化也不適用Kryo,而是使用特定的Encoder作為序列化工具。Encoder可以對Spark對象進行序列化和反序列化,同時不需要反序列化在字節級別就能支持filtering、sorting和hashing等操作。
// Encoders for most common types are automatically provided by importing sqlContext.implicits._ val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // Returns: Array(2, 3, 4) // Encoders are also created for case classes. case class Person(name: String, age: Long) val ds = Seq(Person("Andy", 32)).toDS() // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path).as[Person]
RDD交互操作
在Spark SQL中有兩種方式可以在DataFrame和RDD進行轉換,第一種方法是利用反射機制,推導包含某種類型的RDD,通過反射將其轉換為指定類型的DataFrame,適用於提前知道RDD的schema。
第二種方法通過編程接口與RDD進行交互獲取schema,並動態創建DataFrame,在運行時決定列及其類型。
使用反射推斷Schema
Scala支持使用case class類型導入RDD轉換為DataFrame,通過case class創建schema,case class的參數名稱會被利用反射機制作為列名。case class可以嵌套組合成Sequences或者Array。這種RDD可以高效的轉換為DataFrame並注冊為表。
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface. case class Person(name: String, age: Int) // Create an RDD of Person objects and register it as a table. val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: teenagers.map(t => "Name: " + t(0)).collect().foreach(println) // or by field name: teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println) // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println) // Map("name" -> "Justin", "age" -> 19)
編程指定schema
當case class不能提前定義好時,可以通過以下三步通過代碼創建DataFrame
- 將RDD轉為包含row對象的RDD
- 基於structType類型創建schema,與第一步創建的RDD相匹配
- 通過SQLContext的createDataFrame方法對第一步的RDD應用schema
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // Create an RDD val people = sc.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Import Row. import org.apache.spark.sql.Row; // Import Spark SQL data types import org.apache.spark.sql.types.{StructType,StructField,StringType}; // Generate the schema based on the string of schema val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true))) // Convert records of the RDD (people) to Rows. val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) // Register the DataFrames as a table. peopleDataFrame.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val results = sqlContext.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. results.map(t => "Name: " + t(0)).collect().foreach(println)
數據源
DataFrame接口支持一系列的數據源,它可以按照普通RDD進行操作,也能被注冊為臨時表進行操作。注冊臨時表后可以使用SQL查詢操作數據集,本章節介紹了常用加載保存數據的方法,同時給出了內部數據源的特殊操作。
常規Load/Save函數
未配置spark.sql.sources.default情況下,默認使用parquet數據源處理所有操作。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
手動指定選項
用戶可以手動指定數據源加載的選項,對於數據源類型需要使用完整名稱指定例如(org.apache.spark.sql.parquet),但對於內部類型可以使用簡稱,例如(json parquet jdbc等)。可以通過以上方法在不同DataFrame之間進行轉換。
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
在文件上直接執行SQL
除了需要將文件加載到DataFrame再執行sql以外,還可以直接執行sql
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
保存模式
Save通過SaveMode指定如何維護現有的數據。需要注意的是savemode未對數據加鎖,因而不是源自操作。若使用overwrite模式時,原有數據會先被清空。
Scala/Java | Any Language | 含義 |
---|---|---|
SaveMode.ErrorIfExists (default) | "error" (default) | 當數據輸出的位置已存在時,拋出此異常 |
SaveMode.Append | "append" | 當數據輸出的位置已存在時,在文件后面追加 |
SaveMode.Overwrite | "overwrite" | 當數據輸出的位置已存在時,重寫 |
SaveMode.Ignore | "ignore" | 當數據輸出的位置已存在時,不執行任何操作,與 CREATE IF NOT EXISTS類似 |
保存到持久化表中
使用HiveContext時,DataFrame可以使用saveAsTable方法保存到持久化表中。與registerTempTable不同,saveASTable會為其真正創建數據區並創建指向該區域的指針放入HiveMetaStore中。在持有同一個metastore的連接期間,持久化的數據會一直存在,即使spark程序重啟也不影響。可以通過SQLContext的table方法創建用於持久化表的DataFrame。
默認的saveASTable會創建“managed table”,其數據位置會被metastore維護,被管理的表數據會在表被刪除時清空。
Parquet文件
parquet是一種流行的列式存儲格式。SparkSQL支持對parquet的讀寫以及schema和數據的維護。在寫parquet文件時,為了兼容,所有列都會轉換為nullable格式。
編程實現數據加載
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. // The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet. people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. val parquetFile = sqlContext.read.parquet("people.parquet") //Parquet files can also be registered as tables and then used in SQL statements. parquetFile.registerTempTable("parquetFile") val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
分區推斷
表分區是Hive等系統的常用優化手段。在一個分區表中,數據經常分布在不同目錄下,分區列的值相同的數據分布在同一目錄中。目前支持對parquet文件進行自動推斷分區。例如我們可以將之前的數據增加兩列gender和country,並將兩列作為分區列進行數據分區。
path
└── to └── table ├── gender=male │ ├── ... │ │ │ ├── country=US │ │ └── data.parquet │ ├── country=CN │ │ └── data.parquet │ └── ... └── gender=female ├── ... │ ├── country=US │ └── data.parquet ├── country=CN │ └── data.parquet └── ...
將數據路徑傳給SQLContext后,可以自動推斷DataFrame數據的分區信息。注意,數據的分區列是自動推斷出來你的,目前分區列支持數值類型和string類型。若用戶不希望自動推斷分區列時,可以通過spark.sql.sources.partitionColumnTypeInference.enabled配置禁止自動推斷,此時會使用string類型列進行分區。 分區類型會根據傳入的路徑進行推斷,但用戶可以配置數據源的basePath屬性設置分析的路徑。
Schema合並
parquet支持列增加等操作,當出現多個互相兼容的schemas時,parquet可以自動檢測並合並這些文件的schema。由於schema 合並會消耗大量的資源,默認關閉該操作,可以通過以下方法打開:
- 設置數據源mergeSchema屬性為true
- 設置SQL的選項spark.sql.parquet.mergeSchema為true
// sqlContext from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") df1.write.parquet("data/test_table/key=1") // Create another DataFrame in a new partition directory, // adding a new column and dropping an existing column val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together // with the partitioning column appeared in the partition directory paths. // root // |-- single: int (nullable = true) // |-- double: int (nullable = true) // |-- triple: int (nullable = true) // |-- key : int (nullable = true)
Hive metasotre Parquet表轉化
SparkSQL使用內部庫而不是Hive SerDe,對Hive metasotre Parquet表進行讀寫,性能很好,可以通過spark.sql.hive.convertMetastoreParquet配置。
Hive/Parquet Schema Reconciliation
由於Hive和Parquet的元數據處理方式不同,如下所示
- Hive忽略大小寫,而Parquet沒有
- Hive所有字段都是nullable,而parquet中null是有意義的值(避免理解錯誤,貼上原文:Hive considers all columns nullable, while nullability in Parquet is significant)
將Hive metastore Parquet table轉換為Spark SQL parquet表時,遵從以下規則:
-
相同名稱的字段的數據類型必須相同,nullable類型被忽略。由於融合的數據類型需要在parquet中有對應的類型,所以nullability類型需要處理。
-
融合后schema中包含了Hive元數據中定義的值
- 任何只在Parquet schema中出現的字段被拋棄
- 任何旨在Hive元數據中出現的字段作為nullable增加到融合后元數據中
元數據刷新
Spark SQL會緩存parquet元數據以便提高性能。若Hive metastore Parquet table轉換被啟用,則轉換的表元數據也會被cache。若這些元數據被外部工具修改,則需要手動更新緩存元數據保持一致性。
// sqlContext is an existing HiveContext sqlContext.refreshTable("my_table")
配置
與parquet相關的配置參數如下所示
參數 | 默認值 | 描述 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 該選項讓SparkSQL將string安裝二進制數據按照字符串處理,以便兼容老系統 |
spark.sql.parquet.int96AsTimestamp | true | Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. |
spark.sql.parquet.cacheMetadata | true | 緩存Parquet的Schema元數據,提高查詢靜態數據效率 |
spark.sql.parquet.compression.codec | gzip | 設置Parquet文件的壓縮編碼方式,支持 uncompressed, snappy, gzip, lzo. |
spark.sql.parquet.filterPushdown | true | 啟用過濾謂詞下推優化,將過濾下推到抽取數據時,取得性能的提升 |
spark.sql.hive.convertMetastoreParquet | true | 若設為false,Spark SQL使用Hive SerDe支持對Parquet tables的操作. |
spark.sql.parquet.output.committer.class | org.apache.parquet.hadoop.ParquetOutputCommitter | The output committer class used by Parquet. The specified class needs to be a subclass of org.apache.hadoop.mapreduce.OutputCommitter. Typically, it's also a subclass of org.apache.parquet.hadoop.ParquetOutputCommitter. |
spark.sql.parquet.mergeSchema | false | 是否開啟Schema合並 |
JSON數據集
SQLContext.read.josn()接口可以自動推斷JSON文件的schema。SparkSQL支持的JSON文件中每一行需要是一個完整的JSON對象,不支持跨行的json對象。
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc) // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" val people = sqlContext.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() // root // |-- age: integer (nullable = true) // |-- name: string (nullable = true) // Register this DataFrame as a table. people.registerTempTable("people") // SQL statements can be run by using the sql methods provided by sqlContext. val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Hive 表
Spark SQL支持從Hive中讀取數據,但由於Hive依賴過多,默認不支持Hive,需要在編譯時添加-Phive -Phive-thriftserver
選項。由於用到Hive的序列化和反序列化需要保證Hive包在各個worker中都存在。
將hive-site.xml、core-site.xml和hdfs-site.xml放入conf目錄下配置Hive環境。在Yarn集群上面運行時,需要確定datanucleus jar包和hive-site.xml在driver和所有executor上面都存在。可以通過spark-submit的--jars和--file參數檢查是否存在。
若通過Spark SQL操作Hive需要創建HiveContext,增加元數據功能及HiveQL支持。若沒有部署Hive環境同樣可以創建HiveContext。若沒有在hive-site.xml中配置,會自動在當前目錄創建metastore_db並在/user/hive/warehouse創建倉儲目錄,需要給hive對/user/hive/warehouse的寫權限。
// sc is an existing SparkContext. val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
與不同版本Hive Metastore交互
由於Spark SQL可以與不同版本的Hive Metastor(而不是Hive的版本)進行交互,只需要修改部分的配置信息,相關配置如下:
屬性 | 默認值 | 描述 |
---|---|---|
spark.sql.hive.metastore.version | 1.2.1 | Hive metastore的版本信息,從0.12.0到1.2.1 |
spark.sql.hive.metastore.jars | builtin | 指定metastore的Jar包位置,builtin:該jar被打包到spark應用程序中;maven:使用maven遠程倉儲下載;類路徑:需要包含hive所有的依賴包 |
spark.sql.hive.metastore.sharedPrefixes | com.mysql.jdbc,org.postgresql,com.microsoft.sqlserver,oracle.jdbc | 一個逗號分隔的類名前綴列表,這些類使用classloader加載,且可以在Spark SQL和特定版本的Hive間共享。例如,用來訪問hive metastore 的JDBC的driver就需要這種共享。其他需要共享的類,是與某些已經共享的類有交互的類。例如,自定義的log4j appender。 |
spark.sql.hive.metastore.barrierPrefixes | (empty) | 使用逗號分隔的類名前綴列表,Spark SQL所訪問的每個Hive版本都會被顯式的reload這些類。 |
JDBC連接其他數據庫
SparkSQL通過JdbcRDD實現對支持jdbc的數據庫進行數據加載,將其作為DataFrame進行操作。JDBC加載的數據源不需要提供classTag。使用前需要將JDBC Driver包含在spark的classpath中。例如連接postgres需要如下設置
SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell
數據庫中的表可以作為DataFrame或SparkSQL的臨時表加載,支持以下的選項:
屬性 | 描述 |
---|---|
url | JDBC連接URL |
dbtable | 需要讀取的JDBC表。任何在From子句中的元素都可以,例如表或者子查詢等。 |
partitionColumn, lowerBound, upperBound, numPartitions | 這些選項需要同時制定,他們制定了如何並發讀取數據的同時進行分區。lowerBound, upperBound僅用於確定分區邊界不用於過濾數據,所有數據都會被分區 |
fetchSize | 決定了每次數據取多少行 |
val jdbcDF = sqlContext.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load()
疑難問題
- JDBC的driver類需要在所有executor可見,因為Java的DriverManager會進行安全檢查,忽略所有不可見的類。可以通過修改每個worker節點的compute_classpath.sh以便包含Jar包
- 有些數據庫例如H2的名稱是大寫,需要在SparkSQL中同樣使用大寫
性能調優
對於一些負載可以通過內存緩存數據或者調整參數提高性能。
內存緩存數據
Spark SQL可以通過sqlContext.cacheTable("tableName") 或 dataFrame.cache()接口將RDD數據緩存到內存中。SparkSql可以近掃描需要的列並自動壓縮、進行垃圾回收等。可以通過sqlContext.uncacheTable("Tablename")從內存中移除表。
屬性 | 默認值 | 描述 |
---|---|---|
spark.sql.inMemoryColumnarStorage.compressed | true | 若設為true,Spark SQL會基於列的統計數據自動選擇壓縮器進行數據壓縮 |
spark.sql.inMemoryColumnarStorage.batchSize | 10000 | 控制列緩存的每批次的數據大小,數據越大則內存利用率及壓縮比例越大,但OOM風險也越大 |
其他配置信息
可以通過修改以下配置提高查詢執行的性能,以后可能會棄用以下設置,而變為自動進行最優化配置。
屬性 | 默認值 | 描述 |
---|---|---|
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | 配置做join操作時被廣播變量的表的大小。當設為-1時禁用廣播。目前只有Hive元數據支持統計信息,可以通過ANALYZE TABLE <tablename> COMPUTE STATISTICS 進行信息統計 |
spark.sql.tungsten.enabled | true | 若為true,或使用tungsten物理優化執行,顯式地管理內存並動態生成表達式計算的字節碼 |
spark.sql.shuffle.partitions | 200 | 配置shuffle操作時的分區數量 |
分布式SQL引擎
當使用JDBC/ODBC或者命令行進行交互時,SparkSQL可以作為分布式查詢引擎執行。在這種模式下,Spark SQL的應用能夠不寫代碼便執行查詢。
運行Thrift JDBC/ODBC驅動
這里的實現與HiveServer2類似,可以通過beeline測試Spakr或者Hive1.2.1的JDBC驅動。通過以下命令啟動jdbc驅動
./sbin/start-thriftserver.sh
這腳本支持所有的spark-submit的參數,還支持--hiveconf指定特定的Hive屬性。可以通過--help查看本腳本具體參數。默認server監聽的端口是10000,可以覆蓋一些環境變量:
export HIVE_SERVER2_THRIFT_PORT=<listening-port> export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host> ./sbin/start-thriftserver.sh \ --master <master-uri> \ ...
或者修改系統屬性
./sbin/start-thriftserver.sh \ --hiveconf hive.server2.thrift.port=<listening-port> \ --hiveconf hive.server2.thrift.bind.host=<listening-host> \ --master <master-uri> ...
可以通過beeline測試Thrift JDBC/ODBC驅動
./bin/beeline
連接JDBC/ODBC驅動
beeline> !connect jdbc:hive2://localhost:10000
可能需要輸入用戶和密碼進行安全驗證,在非安全模式下,只需要本機的用戶名和空密碼即可。通過hive-site.xml, core-site.xml 和 hdfs-site.xml配置Hive。ThriftJDBC驅動同時支持通過HTTP端口發送thrift RPC消息。通過hive-site.xml中的配置開啟HTTP模式作為系統屬性:
hive.server2.transport.mode - Set this to value: http hive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001 hive.server2.http.endpoint - HTTP endpoint; default is cliservice
beeline可以通過http模式連接JDBC/ODBC
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
通過Spark SQL CLI運行
CLI是在單點模式下執行Hive元數據服務和查詢的命令工具,但它不能與Thrift JDBC驅動進行會話。
./bin/spark-sql
與Apache Hive的兼容性
Spark SQL設計時考慮對Hive metastore,SerDes以及UDF的兼容。目前是基於Hive-1.2.1版本,並且Spark SQL可以連到不同版本(0.12.0到1.2.1)的Hive metastore。Spark SQL Thrift JDBC可以直接在已經部署Hive的環境運行。
不支持的Hive功能
- bucket表:butcket是Hive的哈希分區
- Union功能
- unique join
- 字段統計信息
- Hadoop歸檔文件
- Hive的部分優化功能
參考
數據類型
Spark SQL和DataFrame支持以下數據類型
- numeric類型
- ByteType:單字節有符號整數
- ShortType:2個字節的有符號整數
- IntegerType:4字節整數
- LongType:8字節整數
- FloatType:4字節單精度浮點數
- DoubleType:8字節雙精度浮點數
- DecimalType:任意精度有符號帶小數的數值
- String類型
- Binary二進制類型
- Boolean布爾類型
- Datetime時間類型
- TimestampType:時間戳類型
- DateType:日期類型,只包含年月日
- Complex復雜類型
- ArrayType:數組類型
- MapType:map類型
- StructType:包含StructField序列的結構體
所有的數據類型都在org.apache.spark.sql.types中。
NaN含義
NaN是not a number的簡寫,用於處理不符合浮點數格式的float和double數據,其語義需要特殊處理:
- NaN = NaN返回true
- 聚集過程中,所有NaN會被放到同一分組中
- NaN在join過程中被看成普通的值
- NaN在升序排序時放到最后,被認為是最大的數值