一、spark連接mysql數據庫的第一種方式:
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().master("local").appName("createdataframefrommysql")
.config("spark.sql.shuffle.partitions", 1).getOrCreate()
/**
* 讀取mysql的第一中方式
*
*/
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","123")
val person: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","person",properties)
person.show()
spark.read.jdbc("jdbc:mysql://192.168.126.111:3306/spark","(select person.id,person.name,person.age,score.score from person,score where person.id = score.id) T",properties).show()
二、第二種讀取mysql數據的方式
val map: Map[String, String] = Map[String, String]( elems = "url" -> "jdbc:mysql://192.168.126.111:3306/spark", "driver" -> "com.mysql.jdbc.Driver", "user" -> "root", "password" -> "123", "dbtable" -> "score" ) val score: DataFrame = spark.read.format("jdbc").options(map).load score.show()
三、第三種讀取mysql 的方式
val reader: DataFrameReader = spark.read.format("jdbc") .option("url", "jdbc:mysql://192.168.126.111:3306/spark") .option("driver", "com.mysql.jdbc.Driver") .option("user", "root") .option("password", "123") .option("dbtable", "score") val source2: DataFrame = reader.load() source2.show()
四、將spark中的數據傳輸到mysql數據庫
//將以上兩張表注冊為臨時表,進行關聯查詢 person.createOrReplaceTempView("person") score2.createOrReplaceTempView("score") val result = spark.sql("select person.id,person.name,person.age,score.score from person,score where person.id=score.id ") result.show() //將查詢出的結果保存到mysql表之中 result.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result",properties)
一個重要的參數:
參數: spark.sql.shuffle.partitions指定sql執行時,解析成sparkjob的分區數。
spark-sql將hive中的數據加載成為Dataframe
通過配置讓spark找到hive所在的位置:
1、啟動hive hive --service metastore &
2、將mynode3節點hive的配置文件發送到spark的配置節點 scp ./hive-site-xml mynode4:/software/spark-2.3.1/conf/
3、修改hive-site-xml中的配置參數 將其與的配置都刪了,只保留
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://mynode1:9083</value>
</property>
</configuration>
配置這個文件的作用:讓spark可以找到hive中的元數據 ,找到元數據也就找到了hive
------------------------------------------------------------------------------------------------------------------------
用spark-sql查詢hive中的數據:
1、啟動hadoop 2、啟動hive 3、啟動spark /software/spark-2.3.1/sbin/ ./start-all.sh
./spark-shell --master spark://mynode1:7077,mynode2:7077 --通過這個命令啟動spark的服務
spark.sql("show databases").show()
使用MR和spaek sql 測試對同一批數據的查詢速度
spark代碼在本地運行的時候,沒有SparkSession.master()屬性的設置,運行一定會報錯
spark-sql讀取hive中的數據:
val spark = SparkSession.builder().appName("CreateDataFrameFromHive").enableHiveSupport().getOrCreate() spark.sql("use spark") spark.sql("drop table if exists student_infos") spark.sql("create table if not exists student_infos (name string,age int) row format delimited fields terminated by '\t'") spark.sql("load data local inpath '/root/test/student_infos' into table student_infos") spark.sql("drop table if exists student_scores") spark.sql("create table if not exists student_scores (name string,age int) row format delimited fields terminated by '\t'") spark.sql("load data local inpath'/root/test/student_scores' into table student_scores") val frame: DataFrame = spark.table("student_infos") frame.show() val df: DataFrame = spark.sql("select si.name,si.age,ss.score from student_infos si,student_scores ss where si.name = ss.name") df.show() spark.sql("drop table if exists good_student_infos") /** * 將結果保存到hive 表之中 * */ df.write.mode(SaveMode.Overwrite).saveAsTable("good_student_infos")
用maven將spark讀取hive中的數據進行打包,首先 clear 之前項目中的tarfget文件就會消失,之后package 對數據進行打包
將打包完成的jar包上傳到linux從服務器,
用一下命令讀取hive中的數據,並在hive中完成創建表或者刪除一張表
在spark bin 目錄下
./spark-submit --master spark://mynode1:7077,mynode2:7077 --calss 報名.類名 jar在linux服務器中所在的位置
UDF : 用戶自定義函數
使用UDF是一對一的關系,讀取一條數據處理得到一條數據
注冊UDF: spark.udf.register("udf name ",function)\
使用UDF: sparkSession.sql("select xx,udf Name from tableName ....")
實例代碼:
val spark: SparkSession = SparkSession.builder().master("local").appName("test").getOrCreate() val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhouyu", "lili") import spark.implicits._ val nameDF: DataFrame = nameList.toDF("name") nameDF.createOrReplaceTempView("students") nameDF.show() /** * 注冊並自定義函數 * */ spark.udf.register("STRLEN",(name:String)=> {name.length} ) spark.sql("select name,STRLEN(name) as length from students order by length desc").show(100)
UDAF: 用戶自定義聚合函數
主要是引用一個繼承了UserDefinedAggregateFunction 類的類
繼承這個類需要實現八個方法 ,以及每個方法所實現的作用
initialize : 1、在Map端每個RDD分區內,按照group by 的字段分組,每個分組都有初始化的值
2、在reduce 端給每個group by 的分組做初始值
update : 每個組,有新的值進來的時候,進行分組對應的聚合值的計算
merge : 在reduce階段,有新的數據進來的時候,對該數據進行聚合
bufferSchema: 聚合操作的時候,所處理數據的類型
dataType : //最終函數返回值得數據類型
deterministic: 多次運行相同的輸入總是有相同的輸出
evaluate:最后返回一個最終的聚合值要和dataType的類型一致
UDAF:用戶自定義聚合函數 的主要作用就是可以實現自己的聚合操作的具體內容的控制,具體實現需要按照業務的不同需求,去重新定義繼承類的八個方法
開窗函數 :
def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("over").enableHiveSupport().getOrCreate() spark.sql("use spark") spark.sql("create table if not exists sales (riqi string,leibie string,jine Int) " + "row format delimited fields terminated by '\t'") spark.sql("load data local inpath '/root/test/sales' into table sales") /** * rank 在每個組內從1開始 * 5 A 200 --- 1 * 3 A 100 ---2 * 4 A 80 ---3 * 7 A 60 ---4 * * 1 B 100 ---1 * 8 B 90 ---2 * 6 B 80 ---3 * 1 B 70 ---4 */ val result = spark.sql( "select" +" riqi,leibie,jine " + "from (" + "select " +"riqi,leibie,jine,row_number() over (partition by leibie order by jine desc) rank " + "from sales) t " + "where t.rank<=3") result.write.mode(SaveMode.Append).saveAsTable("salesResult") result.show(100)
用java語言實現讀取mysql中的數據:
SparkConf conf = new SparkConf(); conf.setMaster("local").setAppName("mysql"); conf.set("spark.sql.shuffle.partitions","1"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); Map<String,String> map = new HashMap<String,String>(); map.put("driver","com.mysql.jdbc.Driver"); map.put("url","jdbc:mysql://192.168.126.111:3306/spark"); map.put("user","root"); map.put("password","123"); map.put("dbtable","person"); Dataset<Row> df = sqlContext.read().options(map).format("jdbc").load(); //df.show(); df.registerTempTable("person1"); /** * 第二種連接JDBC的方式 * */ DataFrameReader read = sqlContext.read(); read.option("driver","com.mysql.jdbc.Driver"); read.option("url","jdbc:mysql://192.168.126.111:3306/spark"); read.option("user","root"); read.option("password","123"); read.option("dbtable","score"); Dataset<Row> jdbc = read.format("jdbc").load(); jdbc.registerTempTable("score1"); Dataset<Row> result = sqlContext.sql("select person1.id,person1.name,person1.age,score1.score from person1 join score1 on person1.name = score1.name "); result.show(); Properties prop = new Properties(); prop.put("user","root"); prop.put("password","123"); result.write().mode(SaveMode.Overwrite).jdbc("jdbc:mysql://192.168.126.111:3306/spark","result1",prop); //jdbc.show(); sc.stop();
用Java言語實現讀取hive 中的數據 :
SparkConf conf = new SparkConf(); conf.setAppName("hive"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext hc = new SQLContext(sc); //創建表並加載數據 hc.sql("use spark"); hc.sql("create table student_info(name string,age int) row format delimited fields terminated by ','"); hc.sql("load data local inpath '/root/data/student_infos' into table student_info"); hc.sql("create table student_scores(name string,score int) row format delimited fields terminated by ','"); hc.sql("load data local inpath '/root/data/student_scores' into table student_score"); //得到表連接結果 Dataset<Row> sql = hc.sql("select t1.name,t1.age,t2.score from student_info t1 join student_score t2 on t1.name = t2.name"); //將結果寫回到hive sql.write().mode(SaveMode.Overwrite).saveAsTable("student_result");