Spark 連接mysql 及MongoDB


在spark 運算過程中,常常需要連接不同類型的數據庫以獲取或者存儲數據,這里將提及Spark如何連接mysql和MongoDB.

1. 連接mysql , 在1.3版本提出了一個新概念DataFrame ,因此以下方式獲取到的是DataFrame,但是可通過JavaRDD<Row> rows = jdbcDF.toJavaRDD()轉化為JavaRDD。

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;

public class Main implements Serializable {

    private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Main.class);

    private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
    private static final String MYSQL_USERNAME = "expertuser";
    private static final String MYSQL_PWD = "expertuser123";
    private static final String MYSQL_CONNECTION_URL =
            "jdbc:mysql://localhost:3306/employees?user=" + MYSQL_USERNAME + "&password=" + MYSQL_PWD;

    private static final JavaSparkContext sc =
            new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));

    private static final SQLContext sqlContext = new SQLContext(sc);

    public static void main(String[] args) {
        //Data source options
        Map<String, String> options = new HashMap<>();
        options.put("driver", MYSQL_DRIVER);
        options.put("url", MYSQL_CONNECTION_URL); //getConnection 返回一個已經打開的結構化數據庫連接,JdbcRDD會自動維護關閉。
        options.put("dbtable",
                    "(select emp_no, concat_ws(' ', first_name, last_name) as full_name from employees) as employees_name");
//     sql 是查詢語句,此查詢語句必須包含兩處占位符?來作為分割數據庫ResulSet的參數,例如:"select title, author from books where ? < = id and id <= ?"
        options.put("partitionColumn", "emp_no");//進行分區的表字段
        options.put("lowerBound", "10001");
//     owerBound, upperBound, numPartitions 分別為第一、第二占位符,partition的個數。例如,給出lowebound 1,upperbound 20, numpartitions 2,則查詢分別為(1, 10)與(11, 20)
        options.put("upperBound", "499999");
        options.put("numPartitions", "10");

        //Load MySQL query result as DataFrame
        DataFrame jdbcDF = sqlContext.load("jdbc", options);
        JavaRDD<Row> rows = jdbcDF.toJavaRDD(); 
List
<Row> employeeFullNameRows = jdbcDF.collectAsList(); for (Row employeeFullNameRow : employeeFullNameRows) { LOGGER.info(employeeFullNameRow); } } }

2. 連接mongoDB

可參考 https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM