pyspark可以直接將DataFrame格式數據轉成table,也可在程序中執行sql代碼。
1. 首先導入庫和環境,os.environ在系統中有多個python版本時需要設置
import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession from pyspark.sql import HiveContext os.environ["PYSPARK_PYTHON"]="/usr/bin/python3" conf = SparkConf().setAppName('test_sql') sc = SparkContext('local', 'test', conf=conf) spark = SparkSession(sc) hive_text = HiveContext(spark)
2. 獲取DataFrame格式數據
獲取DataFrame格式的方式有很多種:讀取sql/hive數據、讀取csv數據、讀取text數據、rdd轉DataFrame數據、Pandas數據轉DataFrame數據、讀取json數據、讀取parquet數據等等。
本例采用pandas轉DataFrame格式數據,其中生成pandas數據的方式可見:https://www.cnblogs.com/qi-yuan-008/p/12412018.html,例如
import pandas as pd df_1 = pd.DataFrame([['Jack','M',40],['Tony','M',20],['Mary','F',30],['Bob','M',25]], columns=['name','gender','age'])
然后從pandas建立DataFrame數據(spark.createDataFrame)<注:從DataFrame轉成pandas也很方便:df.toPandas()即可>:
df = spark.createDataFrame(df_1) print(df.show())
3. 將DataFrame數據轉成table:registerDataFrameAsTable
rows_data = hive_text.registerDataFrameAsTable(df, tableName='table_moment') #生成虛擬表,設置表名 data_2 = hive_text.sql("select * from table_moment") #執行sql語句 print(data_2.take(2)) print(data_2.collect()[0])
取表中某一列數據:
print(data_2.select('name').show())
取表中多列數據:
print(data_2.select(['name', 'age']).show())
注意:如果要連hive表的數據,並用sql語句操作hive表,以上方法不行,需要其他設置。
4. 代碼中操作hive表的sql語句,需要修改以上spark的配置(配置enableHiveSupport().getOrCreate()),不再需要配置sc和conf,直接配置spark,改成:
spark = SparkSession.builder.master("local").appName("SparkOnHive").enableHiveSupport().getOrCreate() hive_text = HiveContext(spark)
其中 'local' 可以修改成其他 “主機+端口號” 形式,然后可以訪問hive表:
hive_text.sql('use database_name') data_3 = hive_text.sql("select * from table_name") print(data_3.first())
而此時,就可以將上述的第3步的表:table_moment 直接寫入hive形成新的表
hive_text.sql('create table default.test_pandas select * from table_moment')
default是已有的數據庫名,test_pandas是新建表名
參考:
https://www.jianshu.com/p/d1f6678db183
https://blog.csdn.net/u011412768/article/details/93426353
##