spark 數據 寫入與讀取


//入庫
object Spark2 {
def main(args: Array[String]): Unit = {
val conf=new SparkConf()
conf.setMaster("local")
conf.setAppName("hu")
// 創建Spark上下文對象
val sc=new SparkContext(conf)
// 創建JDBCRDD,方法數據庫
val url = "jdbc:mysql://hadoop01:3306/spark?characterEncoding=utf-8"
val username = "root"
val pass = "123456"
val list=List((1,"張三",20),(2,"李四",30),(3,"王五",40))
val rdd: RDD[(Int, String, Int)] = sc.makeRDD(list)
rdd.foreachPartition(data => {
val connection = java.sql.DriverManager.getConnection(url, username, pass)
data.foreach {
case (id, name, age) => {
val prep = connection.prepareStatement("create table if not exists user(id int auto_increment,name varchar(10),age int,primary key(id)) charset='utf8'")
val prep2= connection.prepareStatement("insert into user (id,name, age) values (?, ?, ?)")
prep.execute()
prep2.setInt(1,id)
prep2.setString(2,name)
prep2.setInt(3,age)
prep2.executeUpdate()

prep.close()
prep2.close()
}
}
connection.close()
})
}
}

//出庫
object JDBCRDD{
def main(args: Array[String]): Unit = {
val conf=new SparkConf()
conf.setMaster("local")
conf.setAppName("hu")
// 創建JDBCRDD,方法數據庫
val sc=new SparkContext(conf)
//連接數據庫jdbc
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://hadoop01:3306/spark"
val uname = "root"
val pass = "123456"
// 查詢數據
val sql = "select name,age from user where id >=? and id <=?"
// val sql = "select name ,age from user " // 查詢數據要帶范圍,要不會報錯
val jdbcRDD: JdbcRDD[Unit] = new JdbcRDD(
  //  sc: SparkContext,  //上下文
  // getConnection: () => Connection, //連接數據庫
  // sql: String,  //sql語句
  // lowerBound: Long,  //下限
  // upperBound: Long,  //上限
  // numPartitions: Int,  //分區數
  // mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _
      sc,
() => {
Class.forName(driver)
java.sql.DriverManager.getConnection(url, uname, pass)
},
sql,
1, //下限
3, //上限
2, //分區數
(rs) => {
println(rs.getString(1) + " , " + rs.getString(2)) //數字是列索引
})
jdbcRDD.collect
}}
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM