1、讀取mysql數據。從mysql讀取的時候需要傳入數據邊界,數據類型是long,一般建議主鍵列,如果是時間列,需要轉換成時間戳。
參考demo:https://github.com/asker124143222/spark-demo
package com.home.spark import java.sql.DriverManager import java.time.{LocalDateTime, ZoneOffset} import org.apache.spark.rdd.{JdbcRDD, RDD} import org.apache.spark.{SparkConf, SparkContext} object Ex_mysql { def main(args: Array[String]): Unit = { val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark mysql demo") val sc = new SparkContext(conf) val driverClassName = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/busdata?characterEncoding=utf8&useSSL=false" val user = "root" val password = "root" //mysql里時間類型為datetime,傳入的條件為時間戳 val sql = "select userId,userName,name from user where createTime > from_unixtime(?) and createTime < from_unixtime(?)" val connection = () => { Class.forName(driverClassName) DriverManager.getConnection(url, user, password) } val startTime = LocalDateTime.of(2018, 11, 3, 0, 0, 0) val endTime = LocalDateTime.of(2018, 11, 4, 0, 0) //mysql的時間戳只有10位,需要把java里的13位時間戳降低精度,直接除以1000 val startTimeStamp = startTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli / 1000 val endTimeStamp = endTime.toInstant(ZoneOffset.ofHours(8)).toEpochMilli / 1000 println("startTime: " + startTime + ", endTime: " + endTime) println("startTime: " + startTimeStamp + ", endTime: " + endTimeStamp) //讀取 val result: JdbcRDD[(Int, String, String)] = new JdbcRDD[(Int, String, String)]( sc, connection, sql, startTimeStamp, endTimeStamp, 2, rs => { val userId = rs.getInt(1) val userName = rs.getString(2) val name = rs.getString(3) // println(s"id:${userId},userName:${userName},name:${name}") (userId, userName, name) } ) result.collect().foreach(println) sc.stop() } }
2、寫mysql。減少連接創建次數,使用foreachPartition,而不是foreach
package com.home.spark import java.sql.{DriverManager, PreparedStatement} import java.time.LocalDateTime import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable object Ex_mysql2 { def main(args: Array[String]): Unit = { val conf = new SparkConf(true).setMaster("local[*]").setAppName("spark mysql demo") val sc = new SparkContext(conf) val driverClassName = "com.mysql.jdbc.Driver" val url = "jdbc:mysql://localhost:3306/busdata?characterEncoding=utf8&useSSL=false" val user = "root" val password = "root" //寫入 val logBuffer = mutable.ListBuffer[(String, String, String, String, String, String)]() import java.time.format.DateTimeFormatter val ofPattern = DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss") for (i <- 1 to 100) { logBuffer.+=(("write" + i, "寫入測試" + i, "localhost" + i, LocalDateTime.now().format(ofPattern), "spark", LocalDateTime.now().format(ofPattern))) } // logBuffer.foreach(println) val logRDD: RDD[(String, String, String, String, String, String)] = sc.makeRDD(logBuffer) //為了減少連接創建次數,使用foreachPartition,而不是foreach //缺陷:所有按Partition方式傳輸整個迭代器的方式都有OOM的風險 logRDD.foreachPartition(logData => { Class.forName(driverClassName) val connection = DriverManager.getConnection(url, user, password) val sql = "insert into syslog(action, event, host, insertTime, userName, update_Time) values(?,?,?,?,?,?)" val statement: PreparedStatement = connection.prepareStatement(sql) try { logData.foreach { case (action, event, host, insertTime, userName, updateTime) => { statement.setString(1, action) statement.setString(2, event) statement.setString(3, host) statement.setString(4, insertTime) statement.setString(5, userName) statement.setString(6, updateTime) statement.executeUpdate() } } } finally { if(statement!=null) statement.close() if(connection!=null) connection.close() } connection.close() } ) sc.stop() } }