使用spark與MySQL進行數據交互的方法


在項目中,遇到一個場景是,需要從Hive數據倉庫中拉取數據,進行過濾、裁剪或者聚合之后生成中間結果導入MySQL。

對於這樣一個極其普通的離線計算場景,有多種技術選型可以實現。例如,sqoop,MR,HSQL。

 

我們這里使用的spark,優點來說是兩個:一是靈活性高,二是代碼簡潔。

1)靈活性高

相比sqoop和HSQL,spark可以更靈活的控制過濾和裁剪邏輯,甚至你可以通過外部的配置或者參數,來動態的調整spark的計算行為,提供定制化。

2)代碼簡潔

相比MR來說,代碼量上少了很多。也無需實現MySQL客戶端。

 

我抽象了一下需求,做了如下一個demo。

涉及的數據源有兩個:Hive&MySQL;計算引擎:spark&spark-sql。我們的demo中分為兩個步驟:

1)從Hive中讀取數據,交給spark計算,最終輸出到MySQL;

2)從MySQL中讀取數據,交給spark計算,最終再輸出到MySQL另一張表。

 

1、 數據准備

 

創建了Hive外部分區表

關於分區和外部表這里不說了。

CREATE EXTERNAL TABLE `gulfstream_test.accounts`(
  `id` string COMMENT '用戶id', 
  `order_id` string COMMENT '訂單id', 
  `status` bigint COMMENT '用戶狀態', 
  `count` decimal(16,9) COMMENT '訂單數')
COMMENT '用戶信息'
PARTITIONED BY ( 
  `year` string, 
  `month` string, 
  `day` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '\t' 
STORED AS INPUTFORMAT 
  'org.autonavi.udf.CustomInputFormat' 
OUTPUTFORMAT 
  'org.autonavi.udf.CustomHiveOutputFormat'
LOCATION
  'hdfs://mycluster-tj/***/acounts'
TBLPROPERTIES (
  'LEVEL'='1', 
  'TTL'='60', 
  'last_modified_by'='yangfan', 
  'last_modified_time'='2017-10-23', 
  'transient_lastDdlTime'='1508746808')

 

建立分區,並指定分區路徑

這里分區使用的年月日三級分區。通過下面的命令將year=2017/month=10/day=23這個Hive分區的數據指向了location=hdfs://mycluster-tj/***/acounts/2017/10/23

hive> alter table gulfstream_test.accounts add partition(year='2017', month='10', day='23') location 'hdfs://mycluster-tj/***/acounts/2017/10/23';

 

查詢一下分區是否建立成功

可以看到分區已經有了。

show partitions gulfstream_test.accounts;
OK
partition
year=2017/month=10/day=23

 

上傳本地測試數據到hdfs

hadoop fs -put a.txt  hdfs://mycluster-tj/***/acounts/2017/10/23

看一下數據,取了前10行,原諒我數據比較假。

[data_monitor@bigdata-arch-client10 target]$ hadoop fs -cat hdfs://mycluster-tj/***/acounts/2017/10/23/a | head -10
0       0       0       0
1       1       1       1
2       2       2       2
3       3       3       3
4       4       4       4
5       5       5       5
6       6       6       6
7       7       7       7
8       8       8       8
9       9       9       9

在Hive中,也查一下前10條,是一樣的。只是多了分區字段。

hive (default)> select * from gulfstream_test.accounts where year=2017 and month=10 and day=23 limit 10;
OK
accounts.id     accounts.order_id       accounts.status accounts.count  accounts.year   accounts.month  accounts.day
0       0       0       0       2017    10      23
1       1       1       1       2017    10      23
2       2       2       2       2017    10      23
3       3       3       3       2017    10      23
4       4       4       4       2017    10      23
5       5       5       5       2017    10      23
6       6       6       6       2017    10      23
7       7       7       7       2017    10      23
8       8       8       8       2017    10      23
9       9       9       9       2017    10      23
Time taken: 1.38 seconds, Fetched: 10 row(s)

至此,測試數據准備好了。一共1000000條,1百萬。

 

2、代碼

 

1)POM依賴

可以通過pom依賴來看一下筆者使用的組件版本。

這里就不贅述了。

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>

打包方式

<build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <!--這里要替換成jar包main方法所在類 -->
                            <mainClass>com.kangaroo.studio.algorithms.filter.LoadDB</mainClass>

                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id> <!-- this is used for inheritance merges -->
                        <phase>package</phase> <!-- 指定在打包節點執行jar包合並操作 -->
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

 

2)java spark代碼

先貼上代碼,再說明

package com.kangaroo.studio.algorithms.filter;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.hive.HiveContext;

import java.io.Serializable;
import java.util.Properties;


public class LoadDB implements Serializable {

    private SparkConf sparkConf;
    private JavaSparkContext javaSparkContext;
    private HiveContext hiveContext;
    private SQLContext sqlContext;

    /*
    *   初始化Load
    *   創建sparkContext, sqlContext, hiveContext
    * */
    public LoadDB() {
        initSparckContext();
        initSQLContext();
        initHiveContext();
    }

    /*
    *   創建sparkContext
    * */
    private void initSparckContext() {
        String warehouseLocation = System.getProperty("user.dir");
        sparkConf = new SparkConf()
                .setAppName("from-to-mysql")
                .set("spark.sql.warehouse.dir", warehouseLocation)
                .setMaster("yarn-client");
        javaSparkContext = new JavaSparkContext(sparkConf);
    }

    /*
    *   創建hiveContext
    *   用於讀取Hive中的數據
    * */
    private void initHiveContext() {
        hiveContext = new HiveContext(javaSparkContext);
    }

    /*
    *   創建sqlContext
    *   用於讀寫MySQL中的數據
    * */
    private void initSQLContext() {
        sqlContext = new SQLContext(javaSparkContext);
    }

    /*
    *   使用spark-sql從hive中讀取數據, 然后寫入mysql對應表.
    * */
    public void hive2db() {
        String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
        String table = "accounts";
        Properties props = new Properties();
        props.put("user", "root");
        props.put("password", "1234");
        String query = "select * from gulfstream_test.accounts where year=2017 and month=10 and day=23";
        DataFrame rows = hiveContext.sql(query).select("id", "order_id", "status", "count");;
        rows.write().mode(SaveMode.Append).jdbc(url, table, props);
    }

    /*
    *   使用spark-sql從db中讀取數據, 處理后再回寫到db
    * */
    public void db2db() {
        String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8";
        String fromTable = "accounts";
        String toTable = "accountsPart";
        Properties props = new Properties();
        props.put("user", "root");
        props.put("password", "1234");
        DataFrame rows = sqlContext.read().jdbc(url, fromTable, props).where("count < 1000");
        rows.write().mode(SaveMode.Append).jdbc(url, toTable, props);
    }


    public static void main(String[] args) {
        LoadDB loadDB = new LoadDB();
        System.out.println(" ---------------------- start hive2db ------------------------");
        loadDB.hive2db();
        System.out.println(" ---------------------- finish hive2db ------------------------");
        System.out.println(" ---------------------- start db2db ------------------------");
        loadDB.db2db();
        System.out.println(" ---------------------- finish db2db ------------------------");
    }
}

說明:

  • hive2db

核心動作是使用hiveContext.sql(query)執行了hiveSQL,過濾出Hive表中year=2017/month=10/day=23分鍾的數據,返回一個DataFrame對象。

DataFrame是spark-sql數據處理的核心。對DataFrame的操作推薦這樣一篇博客。你可以去使用這些方法,實現復雜的邏輯。

對DataFrame對象,我們使用了select裁剪了其中4列數據(id, order_id, status, count)出來,不過不裁剪的話,會有7列(加上分區的year,month,day)。

然后將數據以SaveMode.Append的方式,寫入了mysql中的accounts表。

SaveMode.Append方式,數據會追加,而不會覆蓋。如果想覆蓋,還有一個常用的SaveMode.Overwrite。推薦這樣一篇博客

最終accounts中的數據有1000000條,百萬。

 

  • db2db

db2db從剛剛生成的MySQL表accounts中讀取出數據,也是返回了一個dataframe對象,通過執行where過濾除了其中id<1000的數據,這里正好是1000條。

然后寫入了accountsPart。最終accountsPart數據應該有1000條。

 

3)編譯和執行

 編譯完成后,生成jar包from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

使用默認參數提交到yarn隊列。

spark-submit --queue=root.zhiliangbu_prod_datamonitor from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar 

 片刻之后,觀察輸出。已經全部finish了。

 

4)查看一下結果

我們到mysql中瞅一瞅。

 

accounts表

有沒有注意到,其實不用建立mysql表!這個過程會自動給你創建,相當於if not exists。

細心的你可能已經注意到了,hive里的string類型,到了MySQL中變成了Text。有個兄弟說,如果你手動創建了表,並且字段設置為String會報錯,我沒有試,只是記錄了一下。

CREATE TABLE `accounts` (
  `id` text,
  `order_id` text,
  `status` bigint(20) DEFAULT NULL,
  `count` decimal(16,9) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

簡單看一下里面有多少數據。1百萬

MariaDB [big_data]> select count(1) from accounts ;    
+----------+
| count(1) |
+----------+
|  1000000 |
+----------+
1 row in set (0.32 sec)

 

acountsPart表

 CREATE TABLE `accountsPart` (
  `id` text,
  `order_id` text,
  `status` bigint(20) DEFAULT NULL,
  `count` decimal(16,9) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

查看有多少數據,1000條,果然是沒有問題的

MariaDB [big_data]> select count(1) from accountsPart;
+----------+
| count(1) |
+----------+
|     1000 |
+----------+
1 row in set (0.00 sec)

 

到此為止。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM