Spark jdbc postgresql數據庫連接和寫入操作源代碼解讀


概述:Spark postgresql jdbc 數據庫連接和寫入操作源代碼解讀。具體記錄了SparkSQL對數據庫的操作,通過java程序。在本地開發和執行。總體為,Spark建立數據庫連接,讀取數據。將DataFrame數據寫入還有一個數據庫表中。附帶完整項目源代碼(完整項目源代碼github)。

這里寫圖片描寫敘述

1.首先在postgreSQL中創建一張測試表,並插入數據。(完整項目源代碼Github)

1.1. 在postgreSQL中的postgres用戶下,創建 products

CREATE TABLE products ( product_no integer, name text, price numeric );

1.2. 在 products 插入數據

INSERT INTO products (product_no, name, price) VALUES (1, 'Cheese', 9.99), (2, 'Bread', 1.99), (3, 'Milk', 2.99);

查看數據庫寫入結果。

這里寫圖片描寫敘述

2.編寫SPARK程序。(完整項目源代碼Github

2.1.讀取Postgresql某一張表的數據為DataFrame(完整項目源代碼Github

SparkPostgresqlJdbc.java
Properties connectionProperties = new Properties();


//添加數據庫的username(user)密碼(password),指定postgresql驅動(driver)
connectionProperties.put("user","postgres");
connectionProperties.put("password","123456");
connectionProperties.put("driver","org.postgresql.Driver");

//SparkJdbc讀取Postgresql的products表內容
Dataset<Row> jdbcDF = spark.read()
        .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");

//顯示jdbcDF數據內容
jdbcDF.show();

2.2.寫入Postgresql某張表中

//將jdbcDF數據新建並寫入newproducts,append模式是連接模式,默認的是"error"模式。
jdbcDF.write().mode("append")
        .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties);

3.執行程序。並查看結果(假設在IDEA中開發不熟練。能夠看我還有一篇博文spark (java API) 在Intellij IDEA中開發並執行)。

3.1.直接在intellij IDEA(社區版)中執行。

a.在執行button的“Edit Configeration”中的VM option中加入“-Dspark.master=local”

這里寫圖片描寫敘述

3.2.在終端(Terminal)中執行。

/opt/spark-2.1.0-bin-hadoop2.7/bin/spark-submit \
  --class "SparkPostgresqlJdbc" \   --master local[4] \   --driver-class-path /home/xiaolei/.m2/repository/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar \   target/SparkPostgresqlJdbc-1.0-SNAPSHOT.jar

當中 --driver-class-path 指定下載的postgresql JDBC數據
庫驅動路徑。命令執行要在項目的根文件夾中(/home/xiaolei/Data/GS/Spark/SparkPostgresqlJdbc)。

這里寫圖片描寫敘述

查看Spark寫入數據庫中的數據

這里寫圖片描寫敘述

4.下面為項目中主要源代碼(完整項目源代碼Github):

4.1.項目配置源代碼pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>wangxiaolei</groupId>
    <artifactId>SparkPostgresqlJdbc</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>9.4.1212</version>
        </dependency>
    </dependencies>
</project>

4.2.java源代碼SparkPostgresqlJdbc.java

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

/** * MIT. * Author: wangxiaolei(王小雷). * Date:17-2-9. * Project:SparkPostgresqlJdbc. */
public class SparkPostgresqlJdbc {
    public static void main (String[] args) {

        SparkSession spark = SparkSession
                .builder()
                .appName("SparkPostgresqlJdbc")
                .config("spark.some.config.option","some-value")
                .getOrCreate();
    //啟動runSparkPostgresqlJdbc程序
        runSparkPostgresqlJdbc(spark);

        spark.stop();

    }

    private static void runSparkPostgresqlJdbc(SparkSession spark){
        //new一個屬性
        System.out.println("確保數據庫已經開啟,並創建了products表和插入了數據");
        Properties connectionProperties = new Properties();


        //添加數據庫的username(user)密碼(password),指定postgresql驅動(driver)
        System.out.println("添加數據庫的username(user)密碼(password),指定postgresql驅動(driver)");
        connectionProperties.put("user","postgres");
        connectionProperties.put("password","123456");
        connectionProperties.put("driver","org.postgresql.Driver");



        //SparkJdbc讀取Postgresql的products表內容
        System.out.println("SparkJdbc讀取Postgresql的products表內容");
        Dataset<Row> jdbcDF = spark.read()
                .jdbc("jdbc:postgresql://localhost:5432/postgres","products",connectionProperties).select("name","price");
        //顯示jdbcDF數據內容
        jdbcDF.show();



        //將jdbcDF數據新建並寫入newproducts,append模式是連接模式。默認的是"error"模式。

jdbcDF.write().mode("append") .jdbc("jdbc:postgresql://localhost:5432/postgres","newproducts",connectionProperties); } }

完整項目源代碼Github


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM