1 package com.zjlantone.hive 2 3 import java.util.Properties 4 5 import com.zjlantone.hive.SparkOperaterHive.sparkSession 6 import org.apache.spark.rdd.RDD 7 import org.apache.spark.sql.types.StructType 8 import org.apache.spark.{SparkConf, SparkContext} 9 import org.apache.spark.sql._ 10 case class ManxingweiyanLis(diseaseName: String,cardId: String, lisName: String,lisResult:String,lisAndResult:String) 11 object jangganHive { 12 val sparkConf: SparkConf = new SparkConf().setAppName(jangganHive.getClass.getSimpleName) 13 val sparkSession: SparkSession = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate() 14 val url = "jdbc:mysql://192.168.4.732:3306/jianggan?Unicode=true&characterEncoding=utf8&autoReconnect=true&failOverReadOnly=false"; 15 def main(args: Array[String]): Unit = { 16 assc 17 sparkSession.stop() 18 } 19 20 def assc: Unit = { 21 import sparkSession.implicits._ 22 import sparkSession.sql 23 val df: DataFrame = sql("select cardId,lisName,lisresult,lisbet from janggan.gaozhixuelis where lisbet !=\"\" and lisName !=\"清洁度\"") 24 val rdd: RDD[Row] = df.rdd 25 //计算化验结果 26 val operatorLis: RDD[(String, String)] = rdd.map(row => { 27 var i = "" 28 val cardID: String = row.get(0).toString 29 val lisName: String = row.get(1).toString 30 try { 31 val lisResult: String = row.get(2).toString 32 val lisBet: String = row.get(3).toString 33 if (lisResult.contains("+")) { 34 (cardID + "&" + lisName, "阳性") 35 } else if(lisResult.contains("阴性") || lisResult.contains("-")){ 36 (cardID + "&" + lisName, "阴性") 37 }else { 38 val splits: Array[String] = lisBet.split("-|-") 39 if (lisResult.toDouble > splits(1).toDouble) { 40 i = "升高" 41 } else if (lisResult.toDouble < splits(0).toDouble) { 42 i = "降低" 43 }else{ 44 i="正常" 45 } 46 (cardID + "&" + lisName, i) 47 } 48 } catch { 49 case e: Exception => { 50 (cardID + "&" + lisName, "数据异常") 51 } 52 } 53 }) 54 55 val frame: DataFrame = operatorLis.map(x => { 56 ManxingweiyanLis("高脂血症",x._1.split("&")(0), x._1.split("&")(1), x._2,x._1.split("&")(1)+x._2) 57 }).toDF() 58 val proprttity=new Properties() 59 proprttity.put("user", "root") 60 proprttity.put("password", "123456") 61 proprttity.put("driver", "com.mysql.jdbc.Driver") 62 frame.write.mode(SaveMode.Append).jdbc(url, "exceptionLis", proprttity) 63 } 64 }