數據庫也是 spark 數據源創建 df 的一種方式,因為比較重要,所以單獨算一節。
本文以 postgres 為例
安裝 JDBC
首先需要 安裝 postgres 的客戶端驅動,即 JDBC 驅動,這是官方下載地址,JDBC,根據數據庫版本下載對應的驅動
上傳至 spark 目錄下的 jars 目錄
並設置環境變量
export SPARK_CLASSPATH = /usr/lib/spark/jars
編程模板
如何操作數據庫,不同的版本方法不同,網上的教程五花八門,往往嘗試不成功。
其實我們可以看 spark 自帶的樣例, 路徑為 /usr/lib/spark/examples/src/main/python/sql 【編碼時,sparkSession 需要聲明 spark jars 的驅動路徑,代碼調用 API JDBC To Other Databases】
我從 datasource.py 中找到了基本的讀寫方法,其他自己可以看看
def jdbc_dataset_example(spark): # $example on:jdbc_dataset$ # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods # Loading data from a JDBC source jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load() jdbcDF2 = spark.read \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying dataframe column data types on read jdbcDF3 = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .option("customSchema", "id DECIMAL(38, 0), name STRING") \ .load()
# Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save() jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$
實戰案例
僅供參考,請確保 spark 能連接上數據庫
from pyspark.sql import SparkSession import os # 獲取 環境變量 SPARK_CLASSPATH, 當然需要你事先設定了 該變量 # 如果沒有設定 SPARK_CLASSPATH, 得到 后面的值 /usr/lib/spark/jars/* sparkClassPath = os.getenv('SPARK_CLASSPATH', '/usr/lib/spark/jars/*') ### 創建 sparkSession # spark.driver.extraClassPath 設定了 jdbc 驅動的路徑 spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .master("local") \ .config("spark.driver.extraClassPath", sparkClassPath) \ .getOrCreate() ### 連接數據庫並讀取表 # airDF 已經是個 DataFrame airDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql://172.16.89.80:5432/postgres") \ .option("driver", "org.postgresql.Driver") \ .option("dbtable", "road_point002") \ .option("user", "postgres") \ .option("password", "postgres") \ .load() ### 打印schema airDF.printSchema() # df 的表結構,我們看到的就是 列名即格式等 ### 只打印前20條 -- dsl 方式 airDF.select('id', 'road_number', 'speed_t').show() # id, road_number, speed_t 列名 ### 把 df 轉成 table -- sql 方式 def func(x): print(x) airDF.registerTempTable('pg') spark.sql("select * from pg limit 20").foreach(func) ### 存儲為 RDBMS、xml、json等格式 ## 存到數據庫 airDF.write.jdbc("jdbc:postgresql://172.16.89.80:5432/postgres" , table = "test",mode="append", properties={"user": "postgres", "password": "postgres"}) # 寫入數據庫 ## 存為 json airDF.write.format('json').save('jsoin_path') # 存入分區文件 airDF.coalesce(1).write.format('json').save('filtered.json') # 存入單個文件,不建議使用