1.spark連接mysql
import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer object AcceptSQL { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SQL").setMaster("local") val spark = SparkSession.builder().config(conf).getOrCreate() //定義一個url連接,需要指定字符以避免亂碼 val url = "jdbc:mysql://192.168.1.*:3306/test?useUnicode=true&characterEncoding=utf8" //加載數據庫中表 val jdbcDF = spark.read.format("jdbc").options( Map("url" -> url, "user" -> "**k", "password" -> "**k", "dbtable" -> "test.pre_accept")).load() //由於要調用jdbc,需要導入Java類,也可直接在url中指明 val prop = new java.util.Properties prop.setProperty("user", "**k") prop.setProperty("password", "**k") //---1 jdbcDF.createOrReplaceTempView("Accept_Number") //每個人受理員受理的件數 val Accept_Number = spark.sql("select ACCEPT_MAN name_institution , COUNT(*) Accept_Number FROM Accept_Number GROUP BY ACCEPT_MAN ") // Accept_Number.show(); //Accept_Number.write.mode(SaveMode.Overwrite).jdbc(url, "data.Accept_NumberSQL", prop)
}
}
2.spark連接oracle
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
object Ceshi {
def main(args: Array[String]): Unit = {
Class.forName("oracle.jdbc.driver.OracleDriver") var theConf = new SparkConf().setAppName("AAAAAA").setMaster("local") var theSC = new SparkContext(theConf) var theSC2 = new SQLContext(theSC) //用戶/密碼@//ip地址:端口/實例名 var theJdbcDF = theSC2.load("jdbc", Map("url" -> "jdbc:oracle:thin:test/test@//192.168.1.***:1521/orcl", "dbtable" -> "(select * from **_BUS_PASSENGER_UPDOWN_PRE5) a", "driver" -> "oracle.jdbc.driver.OracleDriver")) theJdbcDF.registerTempTable("myuser") var theDT = theSC2.sql("select * from myuser") theDT.show(); theDT.registerTempTable("tempsum") } }
下面是另一種連接oracale沒有使用過
def getConnection() = { Class.forName(“oracle.jdbc.driver.OracleDriver”).newInstance() DriverManager.getConnection(“jdbc:oracle:thin:@192.168.1.***:1521:orcl”, “root”, “12 3456”) } val rdd = new JdbcRDD(sc,getConnection, “SELECT * FROM table WHERE ? <= ID AND ID <= ?”, Long.MinValue, Long.MaxValue, 2 ,(r: ResultSet) => { r.getInt(“id”)+”\t”+r.getString(“name”)} )
上面oracle連接需要oracle的驅動連接jar包,自己引入
maven文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test</groupId> <artifactId>test</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spark.version>2.2.1</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.codehaus.jettison</groupId> <artifactId>jettison</artifactId> <version>1.3.4</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <!-- --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- simple logg facade for java(slf4-api.jar)日志接口 和 log4j具體日志系統之間的適配器 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> <!-- --> <!-- https://mvnrepository.com/artifact/com.101tec/zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.19</version> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> </project>
3.spark連接oracle簡單的方法
package JiSuanPai import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} object Ceshi03 { def main(args: Array[String]): Unit = { Class.forName("oracle.jdbc.driver.OracleDriver") val url = "jdbc:oracle:thin:test/test@//192.168.1.238:1521/orcl" //加載數據庫中表 val conf = new SparkConf().setAppName("SQL").setMaster("local") val spark = SparkSession.builder().config(conf).getOrCreate() val jdbcDF = spark.read.format("jdbc").options( Map("url" -> url, "user" -> "test", "password" -> "test", "dbtable" -> "TM_BUS_PASSENGER_UPDOWN")).load() //由於要調用jdbc,需要導入Java類,也可直接在url中指明 val prop = new java.util.Properties prop.setProperty("user", "test") prop.setProperty("password", "test") //這個是創建一個視圖 jdbcDF.createOrReplaceTempView("AAAA03") //下面就可以寫您那邊的業務邏輯了 val AAAA03 = spark.sql("select * FROM AAAA03 ") AAAA03.show(); AAAA03.write.mode(SaveMode.Overwrite).jdbc(url, "TM_BUS_PASSENGER_UPDOWN04", prop) spark.close(); } }
----------------------------------------------------------------下面是spark sql在集群上跑oracle的數據並創建數據表-----------------------------------------------------------------------
代碼1.下面是scala文件
package JiSuanPai import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} object Ceshi03 { def main(args: Array[String]): Unit = { Class.forName("oracle.jdbc.driver.OracleDriver") val url = "jdbc:oracle:thin:test/test@//192.168.1.238:1521/orcl" //加載數據庫中表 val conf = new SparkConf().setAppName("jiqun").setMaster("spark://192.168.1.116:7077")
//跑集群下面注釋的這個就不用了 //.setJars(List("D:\\IDEA2017.2.6\\workplace\\test\\out\\artifacts\\test_jar\\test.jar")) val spark = SparkSession.builder().config(conf).getOrCreate() val jdbcDF = spark.read.format("jdbc").options( Map("url" -> url, "user" -> "test", "password" -> "test", "dbtable" -> "TM_BUS_PASSENGER_UPDOWN_PRE5")).load() //由於要調用jdbc,需要導入Java類,也可直接在url中指明 val prop = new java.util.Properties prop.setProperty("user", "test") prop.setProperty("password", "test") //這個是創建一個視圖 jdbcDF.createOrReplaceTempView("AAAA03") //下面就可以寫您那邊的業務邏輯了 val AAAA03 = spark.sql("select * FROM AAAA03 ") // AAAA03.show(); AAAA03.write.mode(SaveMode.Overwrite).jdbc(url, "U00003", prop) spark.close(); } }
2.idea將項目達成jar包
3.在集群處理
1).默認hadoop集群安裝完成,我的在hadoop-2,hadoop-3,hadoop-4,hadoop-5這四個節點 安裝spark了。其中hadoop-2是spark的主節點(有Master進程)。
2).將剛剛打好的jar包放到spark的主節點上即可(hadoop-2節點上,我將我剛剛打的jar包重命名為test05.jar,因為之前這個下面有test.jar。)
//下面是我的hadoop-2節點目錄結構,當前目錄位置,以及我安裝的spark路徑,注意標紅色的
[root@hadoop-2 ~]# ll total 329040 -rw-------. 1 root root 1262 Feb 22 16:04 anaconda-ks.cfg -rw-r--r-- 1 root root 676 May 4 18:00 derby.log drwxr-xr-x 6 root root 4096 Mar 12 17:26 logstash-2.3.4 drwxr-xr-x 5 root root 126 May 4 18:00 metastore_db -rw-r--r-- 1 root root 113412083 Feb 23 14:56 scala-2.11.8.rpm -rw-r--r-- 1 root root 200934340 Feb 23 15:04 spark-2.2.1-bin-hadoop2.7.tgz -rw-r--r-- 1 root root 21122 Feb 28 17:59 spark221.jar drwxr-xr-x 2 root root 6 Feb 28 17:52 spark-warehouse -rw-r--r-- 1 root root 35139 Feb 23 15:42 test01.jar -rw-r--r-- 1 root root 136845 May 30 10:06 test05.jar -rw-r--r-- 1 root root 23942 Mar 20 14:44 test3.jar -rw-r--r-- 1 root root 56504 May 4 14:28 test.jar -rw-r--r--. 1 root root 22261552 Feb 22 17:47 zookeeper-3.4.8.tar.gz -rw-r--r--. 1 root root 25974 May 30 09:07 zookeeper.out [root@hadoop-2 ~]# pwd /root [root@hadoop-2 ~]# cd /usr/local/spark/spark-2.2.1-bin-hadoop2.7/
4)將orcale的驅動放到該目錄下: /usr/local/spark/spark-2.2.1-bin-hadoop2.7/jars
3)執行命令讓spark開始作業調度
//主spark集群 //自己oralce的驅動包 //驅動類 //代碼 包+類 //jar包放的位置
[root@hadoop-2 spark-2.2.1-bin-hadoop2.7]# bin/spark-submit --master spark:hadoop-2:7077 --jars jars/ojdbc14-10.2.0.4.0.jar --driver-class-path jars/ojdbc14-10.2.0.4.0.jar.oracle.jdbc.driver.OracleDriver --class JiSuanPai.Ceshi03 file:///root/test05.jar
上面這個我執行成功了。下面這些我沒有執行成功,但可以參考一下。
[root@hadoop-2 bin]# spark-submit --class JiSuanPai.Ceshi03 --master spark:hadoop-2:7077 file:///root/test05.jar JiSuanPai.Ceshi03
[root@hadoop-2 spark-2.2.1-bin-hadoop2.7]# bin/spark-submit --master spark:hadoop-2:7077 --driver-class-path /usr/local/spark/spark-2.2.1-bin-hadoop2.7/jars/ojdbc14-10.2.0.4.0.jar --class JiSuanPai.Ceshi03 file:///root/test05.jar
*題外話,若想訪問spark的UI界面,需要啟動主節點的spark-shell。然后訪問:192.168.1.116:4040即可