1.spark连接mysql
import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} import scala.collection.mutable.ArrayBuffer object AcceptSQL { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SQL").setMaster("local") val spark = SparkSession.builder().config(conf).getOrCreate() //定义一个url连接,需要指定字符以避免乱码 val url = "jdbc:mysql://192.168.1.*:3306/test?useUnicode=true&characterEncoding=utf8" //加载数据库中表 val jdbcDF = spark.read.format("jdbc").options( Map("url" -> url, "user" -> "**k", "password" -> "**k", "dbtable" -> "test.pre_accept")).load() //由于要调用jdbc,需要导入Java类,也可直接在url中指明 val prop = new java.util.Properties prop.setProperty("user", "**k") prop.setProperty("password", "**k") //---1 jdbcDF.createOrReplaceTempView("Accept_Number") //每个人受理员受理的件数 val Accept_Number = spark.sql("select ACCEPT_MAN name_institution , COUNT(*) Accept_Number FROM Accept_Number GROUP BY ACCEPT_MAN ") // Accept_Number.show(); //Accept_Number.write.mode(SaveMode.Overwrite).jdbc(url, "data.Accept_NumberSQL", prop)
}
}
2.spark连接oracle
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
object Ceshi {
def main(args: Array[String]): Unit = {
Class.forName("oracle.jdbc.driver.OracleDriver") var theConf = new SparkConf().setAppName("AAAAAA").setMaster("local") var theSC = new SparkContext(theConf) var theSC2 = new SQLContext(theSC) //用户/密码@//ip地址:端口/实例名 var theJdbcDF = theSC2.load("jdbc", Map("url" -> "jdbc:oracle:thin:test/test@//192.168.1.***:1521/orcl", "dbtable" -> "(select * from **_BUS_PASSENGER_UPDOWN_PRE5) a", "driver" -> "oracle.jdbc.driver.OracleDriver")) theJdbcDF.registerTempTable("myuser") var theDT = theSC2.sql("select * from myuser") theDT.show(); theDT.registerTempTable("tempsum") } }
下面是另一种连接oracale没有使用过
def getConnection() = { Class.forName(“oracle.jdbc.driver.OracleDriver”).newInstance() DriverManager.getConnection(“jdbc:oracle:thin:@192.168.1.***:1521:orcl”, “root”, “12 3456”) } val rdd = new JdbcRDD(sc,getConnection, “SELECT * FROM table WHERE ? <= ID AND ID <= ?”, Long.MinValue, Long.MaxValue, 2 ,(r: ResultSet) => { r.getInt(“id”)+”\t”+r.getString(“name”)} )
上面oracle连接需要oracle的驱动连接jar包,自己引入
maven文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>test</groupId> <artifactId>test</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spark.version>2.2.1</spark.version> <scala.version>2.11</scala.version> </properties> <dependencies> <dependency> <groupId>org.codehaus.jettison</groupId> <artifactId>jettison</artifactId> <version>1.3.4</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <!-- --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- simple logg facade for java(slf4-api.jar)日志接口 和 log4j具体日志系统之间的适配器 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> <!-- --> <!-- https://mvnrepository.com/artifact/com.101tec/zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.19</version> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> </project>
3.spark连接oracle简单的方法
package JiSuanPai import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} object Ceshi03 { def main(args: Array[String]): Unit = { Class.forName("oracle.jdbc.driver.OracleDriver") val url = "jdbc:oracle:thin:test/test@//192.168.1.238:1521/orcl" //加载数据库中表 val conf = new SparkConf().setAppName("SQL").setMaster("local") val spark = SparkSession.builder().config(conf).getOrCreate() val jdbcDF = spark.read.format("jdbc").options( Map("url" -> url, "user" -> "test", "password" -> "test", "dbtable" -> "TM_BUS_PASSENGER_UPDOWN")).load() //由于要调用jdbc,需要导入Java类,也可直接在url中指明 val prop = new java.util.Properties prop.setProperty("user", "test") prop.setProperty("password", "test") //这个是创建一个视图 jdbcDF.createOrReplaceTempView("AAAA03") //下面就可以写您那边的业务逻辑了 val AAAA03 = spark.sql("select * FROM AAAA03 ") AAAA03.show(); AAAA03.write.mode(SaveMode.Overwrite).jdbc(url, "TM_BUS_PASSENGER_UPDOWN04", prop) spark.close(); } }
----------------------------------------------------------------下面是spark sql在集群上跑oracle的数据并创建数据表-----------------------------------------------------------------------
代码1.下面是scala文件
package JiSuanPai import org.apache.spark.SparkConf import org.apache.spark.sql.{SaveMode, SparkSession} object Ceshi03 { def main(args: Array[String]): Unit = { Class.forName("oracle.jdbc.driver.OracleDriver") val url = "jdbc:oracle:thin:test/test@//192.168.1.238:1521/orcl" //加载数据库中表 val conf = new SparkConf().setAppName("jiqun").setMaster("spark://192.168.1.116:7077")
//跑集群下面注释的这个就不用了 //.setJars(List("D:\\IDEA2017.2.6\\workplace\\test\\out\\artifacts\\test_jar\\test.jar")) val spark = SparkSession.builder().config(conf).getOrCreate() val jdbcDF = spark.read.format("jdbc").options( Map("url" -> url, "user" -> "test", "password" -> "test", "dbtable" -> "TM_BUS_PASSENGER_UPDOWN_PRE5")).load() //由于要调用jdbc,需要导入Java类,也可直接在url中指明 val prop = new java.util.Properties prop.setProperty("user", "test") prop.setProperty("password", "test") //这个是创建一个视图 jdbcDF.createOrReplaceTempView("AAAA03") //下面就可以写您那边的业务逻辑了 val AAAA03 = spark.sql("select * FROM AAAA03 ") // AAAA03.show(); AAAA03.write.mode(SaveMode.Overwrite).jdbc(url, "U00003", prop) spark.close(); } }
2.idea将项目达成jar包
3.在集群处理
1).默认hadoop集群安装完成,我的在hadoop-2,hadoop-3,hadoop-4,hadoop-5这四个节点 安装spark了。其中hadoop-2是spark的主节点(有Master进程)。
2).将刚刚打好的jar包放到spark的主节点上即可(hadoop-2节点上,我将我刚刚打的jar包重命名为test05.jar,因为之前这个下面有test.jar。)
//下面是我的hadoop-2节点目录结构,当前目录位置,以及我安装的spark路径,注意标红色的
[root@hadoop-2 ~]# ll total 329040 -rw-------. 1 root root 1262 Feb 22 16:04 anaconda-ks.cfg -rw-r--r-- 1 root root 676 May 4 18:00 derby.log drwxr-xr-x 6 root root 4096 Mar 12 17:26 logstash-2.3.4 drwxr-xr-x 5 root root 126 May 4 18:00 metastore_db -rw-r--r-- 1 root root 113412083 Feb 23 14:56 scala-2.11.8.rpm -rw-r--r-- 1 root root 200934340 Feb 23 15:04 spark-2.2.1-bin-hadoop2.7.tgz -rw-r--r-- 1 root root 21122 Feb 28 17:59 spark221.jar drwxr-xr-x 2 root root 6 Feb 28 17:52 spark-warehouse -rw-r--r-- 1 root root 35139 Feb 23 15:42 test01.jar -rw-r--r-- 1 root root 136845 May 30 10:06 test05.jar -rw-r--r-- 1 root root 23942 Mar 20 14:44 test3.jar -rw-r--r-- 1 root root 56504 May 4 14:28 test.jar -rw-r--r--. 1 root root 22261552 Feb 22 17:47 zookeeper-3.4.8.tar.gz -rw-r--r--. 1 root root 25974 May 30 09:07 zookeeper.out [root@hadoop-2 ~]# pwd /root [root@hadoop-2 ~]# cd /usr/local/spark/spark-2.2.1-bin-hadoop2.7/
4)将orcale的驱动放到该目录下: /usr/local/spark/spark-2.2.1-bin-hadoop2.7/jars
3)执行命令让spark开始作业调度
//主spark集群 //自己oralce的驱动包 //驱动类 //代码 包+类 //jar包放的位置
[root@hadoop-2 spark-2.2.1-bin-hadoop2.7]# bin/spark-submit --master spark:hadoop-2:7077 --jars jars/ojdbc14-10.2.0.4.0.jar --driver-class-path jars/ojdbc14-10.2.0.4.0.jar.oracle.jdbc.driver.OracleDriver --class JiSuanPai.Ceshi03 file:///root/test05.jar
上面这个我执行成功了。下面这些我没有执行成功,但可以参考一下。
[root@hadoop-2 bin]# spark-submit --class JiSuanPai.Ceshi03 --master spark:hadoop-2:7077 file:///root/test05.jar JiSuanPai.Ceshi03
[root@hadoop-2 spark-2.2.1-bin-hadoop2.7]# bin/spark-submit --master spark:hadoop-2:7077 --driver-class-path /usr/local/spark/spark-2.2.1-bin-hadoop2.7/jars/ojdbc14-10.2.0.4.0.jar --class JiSuanPai.Ceshi03 file:///root/test05.jar
*题外话,若想访问spark的UI界面,需要启动主节点的spark-shell。然后访问:192.168.1.116:4040即可