//入庫 object Spark2 { def main(args: Array[String]): Unit = { val conf=new SparkConf() conf.setMaster("local") conf.setAppName("hu") // 創建Spark上下文對象 val sc=new SparkContext(conf) // 創建JDBCRDD,方法數據庫 val url = "jdbc:mysql://hadoop01:3306/spark?characterEncoding=utf-8" val username = "root" val pass = "123456" val list=List((1,"張三",20),(2,"李四",30),(3,"王五",40)) val rdd: RDD[(Int, String, Int)] = sc.makeRDD(list) rdd.foreachPartition(data => { val connection = java.sql.DriverManager.getConnection(url, username, pass) data.foreach { case (id, name, age) => { val prep = connection.prepareStatement("create table if not exists user(id int auto_increment,name varchar(10),age int,primary key(id)) charset='utf8'") val prep2= connection.prepareStatement("insert into user (id,name, age) values (?, ?, ?)") prep.execute() prep2.setInt(1,id) prep2.setString(2,name) prep2.setInt(3,age) prep2.executeUpdate()
prep.close() prep2.close() } } connection.close() }) } }
//出庫 object JDBCRDD{ def main(args: Array[String]): Unit = { val conf=new SparkConf() conf.setMaster("local") conf.setAppName("hu") // 創建JDBCRDD,方法數據庫 val sc=new SparkContext(conf) //連接數據庫jdbc val driver = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://hadoop01:3306/spark" val uname = "root" val pass = "123456" // 查詢數據 val sql = "select name,age from user where id >=? and id <=?" // val sql = "select name ,age from user " // 查詢數據要帶范圍,要不會報錯 val jdbcRDD: JdbcRDD[Unit] = new JdbcRDD( // sc: SparkContext, //上下文 // getConnection: () => Connection, //連接數據庫 // sql: String, //sql語句 // lowerBound: Long, //下限 // upperBound: Long, //上限 // numPartitions: Int, //分區數 // mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _ sc, () => { Class.forName(driver) java.sql.DriverManager.getConnection(url, uname, pass) }, sql, 1, //下限 3, //上限 2, //分區數 (rs) => { println(rs.getString(1) + " , " + rs.getString(2)) //數字是列索引 }) jdbcRDD.collect }} |