在項目中,遇到一個場景是,需要從Hive數據倉庫中拉取數據,進行過濾、裁剪或者聚合之后生成中間結果導入MySQL。
對於這樣一個極其普通的離線計算場景,有多種技術選型可以實現。例如,sqoop,MR,HSQL。
我們這里使用的spark,優點來說是兩個:一是靈活性高,二是代碼簡潔。
1)靈活性高
相比sqoop和HSQL,spark可以更靈活的控制過濾和裁剪邏輯,甚至你可以通過外部的配置或者參數,來動態的調整spark的計算行為,提供定制化。
2)代碼簡潔
相比MR來說,代碼量上少了很多。也無需實現MySQL客戶端。
我抽象了一下需求,做了如下一個demo。
涉及的數據源有兩個:Hive&MySQL;計算引擎:spark&spark-sql。我們的demo中分為兩個步驟:
1)從Hive中讀取數據,交給spark計算,最終輸出到MySQL;
2)從MySQL中讀取數據,交給spark計算,最終再輸出到MySQL另一張表。
1、 數據准備
創建了Hive外部分區表
關於分區和外部表這里不說了。
CREATE EXTERNAL TABLE `gulfstream_test.accounts`( `id` string COMMENT '用戶id', `order_id` string COMMENT '訂單id', `status` bigint COMMENT '用戶狀態', `count` decimal(16,9) COMMENT '訂單數') COMMENT '用戶信息' PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS INPUTFORMAT 'org.autonavi.udf.CustomInputFormat' OUTPUTFORMAT 'org.autonavi.udf.CustomHiveOutputFormat' LOCATION 'hdfs://mycluster-tj/***/acounts' TBLPROPERTIES ( 'LEVEL'='1', 'TTL'='60', 'last_modified_by'='yangfan', 'last_modified_time'='2017-10-23', 'transient_lastDdlTime'='1508746808')
建立分區,並指定分區路徑
這里分區使用的年月日三級分區。通過下面的命令將year=2017/month=10/day=23這個Hive分區的數據指向了location=hdfs://mycluster-tj/***/acounts/2017/10/23
hive> alter table gulfstream_test.accounts add partition(year='2017', month='10', day='23') location 'hdfs://mycluster-tj/***/acounts/2017/10/23';
查詢一下分區是否建立成功
可以看到分區已經有了。
show partitions gulfstream_test.accounts; OK partition year=2017/month=10/day=23
上傳本地測試數據到hdfs
hadoop fs -put a.txt hdfs://mycluster-tj/***/acounts/2017/10/23
看一下數據,取了前10行,原諒我數據比較假。
[data_monitor@bigdata-arch-client10 target]$ hadoop fs -cat hdfs://mycluster-tj/***/acounts/2017/10/23/a | head -10 0 0 0 0 1 1 1 1 2 2 2 2 3 3 3 3 4 4 4 4 5 5 5 5 6 6 6 6 7 7 7 7 8 8 8 8 9 9 9 9
在Hive中,也查一下前10條,是一樣的。只是多了分區字段。
hive (default)> select * from gulfstream_test.accounts where year=2017 and month=10 and day=23 limit 10; OK accounts.id accounts.order_id accounts.status accounts.count accounts.year accounts.month accounts.day 0 0 0 0 2017 10 23 1 1 1 1 2017 10 23 2 2 2 2 2017 10 23 3 3 3 3 2017 10 23 4 4 4 4 2017 10 23 5 5 5 5 2017 10 23 6 6 6 6 2017 10 23 7 7 7 7 2017 10 23 8 8 8 8 2017 10 23 9 9 9 9 2017 10 23 Time taken: 1.38 seconds, Fetched: 10 row(s)
至此,測試數據准備好了。一共1000000條,1百萬。
2、代碼
1)POM依賴
可以通過pom依賴來看一下筆者使用的組件版本。
這里就不贅述了。
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.6.0</version> <scope>provided</scope> </dependency>
打包方式
<build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <archive> <manifest> <!--這里要替換成jar包main方法所在類 --> <mainClass>com.kangaroo.studio.algorithms.filter.LoadDB</mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包節點執行jar包合並操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build>
2)java spark代碼
先貼上代碼,再說明
package com.kangaroo.studio.algorithms.filter; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.hive.HiveContext; import java.io.Serializable; import java.util.Properties; public class LoadDB implements Serializable { private SparkConf sparkConf; private JavaSparkContext javaSparkContext; private HiveContext hiveContext; private SQLContext sqlContext; /* * 初始化Load * 創建sparkContext, sqlContext, hiveContext * */ public LoadDB() { initSparckContext(); initSQLContext(); initHiveContext(); } /* * 創建sparkContext * */ private void initSparckContext() { String warehouseLocation = System.getProperty("user.dir"); sparkConf = new SparkConf() .setAppName("from-to-mysql") .set("spark.sql.warehouse.dir", warehouseLocation) .setMaster("yarn-client"); javaSparkContext = new JavaSparkContext(sparkConf); } /* * 創建hiveContext * 用於讀取Hive中的數據 * */ private void initHiveContext() { hiveContext = new HiveContext(javaSparkContext); } /* * 創建sqlContext * 用於讀寫MySQL中的數據 * */ private void initSQLContext() { sqlContext = new SQLContext(javaSparkContext); } /* * 使用spark-sql從hive中讀取數據, 然后寫入mysql對應表. * */ public void hive2db() { String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8"; String table = "accounts"; Properties props = new Properties(); props.put("user", "root"); props.put("password", "1234"); String query = "select * from gulfstream_test.accounts where year=2017 and month=10 and day=23"; DataFrame rows = hiveContext.sql(query).select("id", "order_id", "status", "count");; rows.write().mode(SaveMode.Append).jdbc(url, table, props); } /* * 使用spark-sql從db中讀取數據, 處理后再回寫到db * */ public void db2db() { String url = "jdbc:mysql://10.93.84.53:3306/big_data?characterEncoding=UTF-8"; String fromTable = "accounts"; String toTable = "accountsPart"; Properties props = new Properties(); props.put("user", "root"); props.put("password", "1234"); DataFrame rows = sqlContext.read().jdbc(url, fromTable, props).where("count < 1000"); rows.write().mode(SaveMode.Append).jdbc(url, toTable, props); } public static void main(String[] args) { LoadDB loadDB = new LoadDB(); System.out.println(" ---------------------- start hive2db ------------------------"); loadDB.hive2db(); System.out.println(" ---------------------- finish hive2db ------------------------"); System.out.println(" ---------------------- start db2db ------------------------"); loadDB.db2db(); System.out.println(" ---------------------- finish db2db ------------------------"); } }
說明:
- hive2db
核心動作是使用hiveContext.sql(query)執行了hiveSQL,過濾出Hive表中year=2017/month=10/day=23分鍾的數據,返回一個DataFrame對象。
DataFrame是spark-sql數據處理的核心。對DataFrame的操作推薦這樣一篇博客。你可以去使用這些方法,實現復雜的邏輯。
對DataFrame對象,我們使用了select裁剪了其中4列數據(id, order_id, status, count)出來,不過不裁剪的話,會有7列(加上分區的year,month,day)。
然后將數據以SaveMode.Append的方式,寫入了mysql中的accounts表。
SaveMode.Append方式,數據會追加,而不會覆蓋。如果想覆蓋,還有一個常用的SaveMode.Overwrite。推薦這樣一篇博客。
最終accounts中的數據有1000000條,百萬。
- db2db
db2db從剛剛生成的MySQL表accounts中讀取出數據,也是返回了一個dataframe對象,通過執行where過濾除了其中id<1000的數據,這里正好是1000條。
然后寫入了accountsPart。最終accountsPart數據應該有1000條。
3)編譯和執行
編譯完成后,生成jar包from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar
使用默認參數提交到yarn隊列。
spark-submit --queue=root.zhiliangbu_prod_datamonitor from-to-mysql-1.0-SNAPSHOT-jar-with-dependencies.jar
片刻之后,觀察輸出。已經全部finish了。
4)查看一下結果
我們到mysql中瞅一瞅。
accounts表
有沒有注意到,其實不用建立mysql表!這個過程會自動給你創建,相當於if not exists。
細心的你可能已經注意到了,hive里的string類型,到了MySQL中變成了Text。有個兄弟說,如果你手動創建了表,並且字段設置為String會報錯,我沒有試,只是記錄了一下。
CREATE TABLE `accounts` ( `id` text, `order_id` text, `status` bigint(20) DEFAULT NULL, `count` decimal(16,9) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8
簡單看一下里面有多少數據。1百萬
MariaDB [big_data]> select count(1) from accounts ; +----------+ | count(1) | +----------+ | 1000000 | +----------+ 1 row in set (0.32 sec)
acountsPart表
CREATE TABLE `accountsPart` ( `id` text, `order_id` text, `status` bigint(20) DEFAULT NULL, `count` decimal(16,9) DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8
查看有多少數據,1000條,果然是沒有問題的
MariaDB [big_data]> select count(1) from accountsPart; +----------+ | count(1) | +----------+ | 1000 | +----------+ 1 row in set (0.00 sec)
到此為止。