場景:使用Spark Streaming接收HDFS上的文件數據與關系型數據庫中的表進行相關的查詢操作;
使用技術:Spark Streaming + Spark JDBC External DataSources
HDFS上文件的數據格式為:id、name、cityId,分隔符為tab
1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3
MySQL的表city結構為:id int, name varchar
1 bj 2 sz 3 sh
本案例的結果為:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;
示例代碼:
package com.asiainfo.ocdc case class Student(id: Int, name: String, cityId: Int)
package com.asiainfo.ocdc import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.SparkContext import org.apache.spark.sql.hive.HiveContext /** * Spark Streaming處理HDFS上的數據並結合Spark JDBC外部數據源處理 * * @author luogankun */ object HDFSStreaming { def main(args: Array[String]) { if (args.length < 1) { System.err.println("Usage: HDFSStreaming <path>") System.exit(1) } val location = args(0) val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(5)) val sqlContext = new HiveContext(sc) import sqlContext._ import com.luogankun.spark.jdbc._ //使用External Data Sources處理MySQL中的數據 val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city") //將cities RDD注冊成city臨時表 cities.registerTempTable("city") val inputs = ssc.textFileStream(location) inputs.foreachRDD(rdd => { if (rdd.partitions.length > 0) { //將Streaming中接收到的數據注冊成student臨時表 rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student"); //關聯Streaming和MySQL表進行查詢操作 sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println) } }) ssc.start() ssc.awaitTermination() } }
提交到集群執行腳本:sparkstreaming_hdfs_jdbc.sh
#!/bin/sh . /etc/profile set -x cd $SPARK_HOME/bin spark-submit \ --name HDFSStreaming \ --class com.asiainfo.ocdc.HDFSStreaming \ --master spark://hadoop000:7077 \ --executor-memory 1G \ --total-executor-cores 1 \ /home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar \ hdfs://hadoop000:8020/data/hdfs