*以下内容由《Spark快速大数据分析》整理所得。
读书笔记的第六部分是讲的是Spark SQL和Beeline。
Spark SQL是Spark用来操作结构化和半结构化数据的接口。
一、在应用中使用Spark SQL
二、Spark SQL UDF VS. Hive UDF
三、Beeline
一、在应用中使用Spark SQL
Spark SQL提供了一种特殊的RDD,叫作SchemaRDD。SchemaRDD是存放Row对象的RDD,每个Row对象代表一行记录。SchemaRDD还包含记录的结构信息(即数据字段)。有了SchemaRDD,我们就可以运行 SQL 查询。
在应用中使用Spark SQL:
(1) 初始化Spark SQL
# 导入Spark SQL(支持Hive) from pyspark.sql import HiveContext, Row # 当不能引入hive依赖时 (不支持Hive) from pyspark.sql import SQLContext, Row # 在 Python 中创建SQL上下文环境(支持Hive) hiveCtx = HiveContext(sc) # 在 Python 中创建SQL上下文环境(不支持Hive) hiveCtx = SparkContext(sc)
(2) 基本查询
# 例子:在Python中读取并查询推文 # 读取Json文件 input = hiveCtx.jsonFile(inputFile) # 注册输入的SchemaRDD(将上面读取得到的SchemaRDD放入临时表,应用application退出时会自动删去它) input.registerTempTable("tweets") # 依据retweetCount(转发计数)选出推文(执行查询) topTweets = hiveCtx.sql("""SELECT text, retweetCount FROM tweets ORDER BY retweetCount LIMIT 10""")
在上面例子中,我们是读取了JSON文件,但我们还可以读取其它数据格式的文件,例如: hiveCtx.parquetFile(parquetFile) 。
二、Spark SQL UDF VS. Hive UDF
(1) Spark SQL UDF
使用hiveCtx.registerFunction()自定义函数。
# 例子:Python 版本耳朵字符串长度 UDF # 写一个求字符串长度的UDF hiveCtx.registerFunction("strLenPython", lambda x: len(x), IntegerType()) lengthSchemaRDD = hiveCtx.sql("SELECT strLenPython('text') FROM tweets LIMIT 10")
(2) Hive UDF
只需调用 hiveCtx.sql("CREATE TEMPORARY FUNCTION name AS class.function") 。
三、Beeline
Beeline是Hive 0.11版本引入的新命令行客户端工具,基于SQLline CLI的JDBC(Java Database Connectivity: Java语言中用来规范客户端程序如何访问数据库的应用程序接口)客户端。在Beeline客户端中,你可以使用标准的HiveQL命令来创建、列举以及查询数据表。Beeline shell的好处是:在多用户间共享的缓存数据表上进行快速的数据探索。