spark讀取文件,sparksql分析數據,然后批量寫入oracle,(sparksql,處理時間戳差值不支持的解決)


本人1943695812@qq.com,附上導入的包,不向有些不帶導入包,臨門海導錯了。不謝。一起說聲擦擦擦,喲喲喲
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.text.SimpleDateFormat

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object Yy01 {

val dt = new DateDe

val tm_jg = (str1: String,str2:String) => {
dt.getCz(str1,str2)
}
val hdfsurl="hdfs://jacky:9000/data/yy/*"
val url = "jdbc:oracle:thin:@192.168.1.233:1521:ecom"
val user = "TEST"
val password = "lishizhou"
val dbtable = "TMP_VOICE_CELLDATA_180123"
val nowdate="20180125"
///1943695812@qq.com lisz
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[4]")
.appName("yy")
.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("spark.files.ignoreCorruptFiles", value = true)
.config("hive.groupby.orderby.position.alias", value = true)
.enableHiveSupport()
.getOrCreate()
//180113,00,13077762198,15878286278,20180113172440,20180113172519,59373,11083,990002758737030,460017767005286
val mrdd = spark.sparkContext.textFile(hdfsurl)
///spark 讀取清洗數據后格式定義
val scheam = StructType(List(
StructField("starttime", StringType, nullable = false),
StructField("endtime", StringType, nullable = false),
StructField("lac", StringType, nullable = false),
StructField("ci", StringType, nullable = false)
))
///數據初步清洗
val jgRdd = mrdd.map(_.split(",")).filter(x => inspect(x)).map(y => Row(y(4), y(5), y(6), y(7)))
val jgRdd2 = mrdd.map(_.split(",")).filter(x => inspect(x)).map(y =>(y(4),y(5),y(6),y(7)))

val lszzz = spark.createDataFrame(jgRdd, scheam)
lszzz.createOrReplaceTempView("jg")
spark.udf.register("datecz", tm_jg)
///sparksql 處理其中時間戳差值不支持自己實現的。使用的idfa定義函數datecz
val jglast = spark.sql(s"""
select $nowdate ndt,lac,ci, round(sum(datecz(endtime,starttime)),1) sc,count(*) pc from jg
group by lac,ci""".stripMargin)

//結果數據批量寫入oracle

jglast.foreachPartition(rows => {
Class.forName("oracle.jdbc.driver.OracleDriver").newInstance()

val connection: Connection = DriverManager.getConnection(url,user,password)
val prepareStatement: PreparedStatement = connection.prepareStatement(s"insert into $dbtable(CELL_DATE, LAC, CELLID,CELL_COUNT,DURATION)values(?,?,?,?,?)")

rows.foreach(row => {
prepareStatement.setString(1,row.get(0).toString)
prepareStatement.setString(2,row.get(1).toString)
prepareStatement.setString(3,row.get(2).toString)
prepareStatement.setString(4,row.get(4).toString)
prepareStatement.setString(5,row.get(3).toString)
// prepareStatement.setString(2, row.getAs("lac"))
// prepareStatement.setString(3, row.getAs("ci"))
// prepareStatement.setString(4, row.getAs("pc"))
// prepareStatement.setString(5, row.getAs("sc"))
prepareStatement.addBatch()
})
prepareStatement.executeBatch()

prepareStatement.close()
connection.close()
})

}


//下邊這一堆狗屎並沒有用上,都是垃圾,看上邊的即可,奶奶的浪費了好長時間
def insertInto(iterator: Iterator[(String, String, String, Int)]): Unit = {
var conn: Connection = null
var ps: PreparedStatement = null
val sql = s"insert into $dbtable( CELL_DATE, LAC, CELLID,CELL_COUNT,DURATION) values (?,?,?,?,?)"

try {

Class.forName("oracle.jdbc.driver.OracleDriver")
conn = DriverManager.getConnection(url, user, password)
iterator.foreach(data => {
ps = conn.prepareStatement(sql)
ps.setString(1, nowdate)
ps.setString(2, data._1)
ps.setString(3, data._2)
ps.setString(4, data._3)
// ps.setString(3, data._4)
ps.executeUpdate()
}
)
} catch {
case e: Exception => e.printStackTrace
} finally {
if (ps != null) {
ps.close()
}
}
}

//根據時間計算時間間隔,返回值為 duration期間(分鍾)
def time_interval(startime: String, endTime: String): String = {
val fm = new SimpleDateFormat("yyyyMMddHHmmss")
val dt1 = fm.parse(startime)
val dt2 = fm.parse(endTime)
return ((dt1.getTime - dt2.getTime) / (1000 * 60)).toString
}

//檢驗數據的合格性
def inspect(str: Array[String]): Boolean = {
if (str.size > 9) {
val t1 = str(4).trim.size
val t2 = str(5).trim.size
// val lac=str(6).trim.size
// val ci=str(7).trim.size
if (t1 == 14 && t2 == 14) {
return true
} else {
return false
}
} else {
return false
}


}

//處理邏輯


}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM