Spark Streaming、HDFS結合Spark JDBC External DataSouces處理案例


場景:使用Spark Streaming接收HDFS上的文件數據與關系型數據庫中的表進行相關的查詢操作;

使用技術:Spark Streaming + Spark JDBC External DataSources

 
HDFS上文件的數據格式為:id、name、cityId,分隔符為tab 
1       zhangsan        1
2       lisi    1
3       wangwu  2
4       zhaoliu 3

 

MySQL的表city結構為:id int, name varchar
1    bj
2    sz
3    sh
本案例的結果為:select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id;

 

示例代碼:

package com.asiainfo.ocdc

case class Student(id: Int, name: String, cityId: Int)
package com.asiainfo.ocdc

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext

/**
 * Spark Streaming處理HDFS上的數據並結合Spark JDBC外部數據源處理
 *
 * @author luogankun
 */
object HDFSStreaming {
  def main(args: Array[String]) {

    if (args.length < 1) {
      System.err.println("Usage: HDFSStreaming <path>")
      System.exit(1)
    }

    val location = args(0)

    val sparkConf = new SparkConf()
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(5))

    val sqlContext = new HiveContext(sc)
    import sqlContext._

    import com.luogankun.spark.jdbc._
    //使用External Data Sources處理MySQL中的數據
    val cities = sqlContext.jdbcTable("jdbc:mysql://hadoop000:3306/test", "root", "root", "select id, name from city")
    //將cities RDD注冊成city臨時表
    cities.registerTempTable("city")

    val inputs = ssc.textFileStream(location)
    inputs.foreachRDD(rdd => {
      if (rdd.partitions.length > 0) {
        //將Streaming中接收到的數據注冊成student臨時表
        rdd.map(_.split("\t")).map(x => Student(x(0).toInt, x(1), x(2).toInt)).registerTempTable("student");

        //關聯Streaming和MySQL表進行查詢操作
        sqlContext.sql("select s.id, s.name, s.cityId, c.name from student s join city c on s.cityId=c.id").collect().foreach(println)
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

 

提交到集群執行腳本:sparkstreaming_hdfs_jdbc.sh

#!/bin/sh
. /etc/profile
set -x

cd $SPARK_HOME/bin

spark-submit \
--name HDFSStreaming \
--class com.asiainfo.ocdc.HDFSStreaming \
--master spark://hadoop000:7077 \
--executor-memory 1G \
--total-executor-cores 1 \
/home/spark/software/source/streaming-app/target/streaming-app-V00B01C00-SNAPSHOT-jar-with-dependencies.jar \
hdfs://hadoop000:8020/data/hdfs 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM