spark教程(九)-操作數據庫


數據庫也是 spark 數據源創建 df 的一種方式,因為比較重要,所以單獨算一節。

本文以 postgres 為例

 

安裝 JDBC

首先需要 安裝 postgres 的客戶端驅動,即 JDBC 驅動,這是官方下載地址,JDBC,根據數據庫版本下載對應的驅動

上傳至 spark 目錄下的 jars 目錄

 並設置環境變量

export SPARK_CLASSPATH = /usr/lib/spark/jars

 

編程模板

如何操作數據庫,不同的版本方法不同,網上的教程五花八門,往往嘗試不成功。

其實我們可以看 spark 自帶的樣例, 路徑為 /usr/lib/spark/examples/src/main/python/sql    【編碼時,sparkSession 需要聲明 spark jars 的驅動路徑,代碼調用 API JDBC To Other Databases

我從 datasource.py 中找到了基本的讀寫方法,其他自己可以看看

def jdbc_dataset_example(spark):
    # $example on:jdbc_dataset$
    # Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    # Loading data from a JDBC source
    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql:dbserver") \
        .option("dbtable", "schema.tablename") \
        .option("user", "username") \
        .option("password", "password") \
        .load()

    jdbcDF2 = spark.read \
        .jdbc("jdbc:postgresql:dbserver", "schema.tablename",
              properties={"user": "username", "password": "password"})

    # Specifying dataframe column data types on read
    jdbcDF3 = spark.read \
        .format("jdbc") \
        .option("url", "jdbc:postgresql:dbserver") \
        .option("dbtable", "schema.tablename") \
        .option("user", "username") \
        .option("password", "password") \
        .option("customSchema", "id DECIMAL(38, 0), name STRING") \
        .load()

# Saving data to a JDBC source jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save() jdbcDF2.write \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # Specifying create table column data types on write jdbcDF.write \ .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)") \ .jdbc("jdbc:postgresql:dbserver", "schema.tablename", properties={"user": "username", "password": "password"}) # $example off:jdbc_dataset$

 

實戰案例

僅供參考,請確保 spark 能連接上數據庫

from pyspark.sql import SparkSession
import os

# 獲取 環境變量 SPARK_CLASSPATH, 當然需要你事先設定了 該變量
# 如果沒有設定 SPARK_CLASSPATH, 得到 后面的值 /usr/lib/spark/jars/*
sparkClassPath = os.getenv('SPARK_CLASSPATH', '/usr/lib/spark/jars/*')

### 創建 sparkSession
# spark.driver.extraClassPath 設定了 jdbc 驅動的路徑
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .master("local") \
    .config("spark.driver.extraClassPath", sparkClassPath) \
    .getOrCreate()

### 連接數據庫並讀取表
# airDF 已經是個 DataFrame
airDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://172.16.89.80:5432/postgres") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "road_point002") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .load()

### 打印schema
airDF.printSchema()     # df 的表結構,我們看到的就是 列名即格式等

### 只打印前20條 -- dsl 方式
airDF.select('id', 'road_number', 'speed_t').show() # id, road_number, speed_t 列名

### 把 df 轉成 table -- sql 方式
def func(x):
    print(x)

airDF.registerTempTable('pg')
spark.sql("select * from pg limit 20").foreach(func)


### 存儲為 RDBMS、xml、json等格式
## 存到數據庫
airDF.write.jdbc("jdbc:postgresql://172.16.89.80:5432/postgres" ,
                 table = "test",mode="append", properties={"user": "postgres", "password": "postgres"})       # 寫入數據庫

## 存為 json
airDF.write.format('json').save('jsoin_path')       # 存入分區文件
airDF.coalesce(1).write.format('json').save('filtered.json')  # 存入單個文件,不建議使用

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM