spark連接數據源以及在spark集群上跑


1.spark連接mysql 

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.mutable.ArrayBuffer

object AcceptSQL {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SQL").setMaster("local")

    val spark = SparkSession.builder().config(conf).getOrCreate()
    //定義一個url連接,需要指定字符以避免亂碼
    val url = "jdbc:mysql://192.168.1.*:3306/test?useUnicode=true&characterEncoding=utf8"
    //加載數據庫中表
    val jdbcDF = spark.read.format("jdbc").options(
      Map("url" -> url,
        "user" -> "**k",
        "password" -> "**k",
        "dbtable" -> "test.pre_accept")).load()


    //由於要調用jdbc,需要導入Java類,也可直接在url中指明
    val prop = new java.util.Properties
    prop.setProperty("user", "**k")
    prop.setProperty("password", "**k")


    //---1
    jdbcDF.createOrReplaceTempView("Accept_Number")
    //每個人受理員受理的件數
    val Accept_Number = spark.sql("select ACCEPT_MAN  name_institution , COUNT(*)  Accept_Number FROM  Accept_Number GROUP BY ACCEPT_MAN ")
    // Accept_Number.show();
    //Accept_Number.write.mode(SaveMode.Overwrite).jdbc(url, "data.Accept_NumberSQL", prop)
}
}

2.spark連接oracle

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}

object Ceshi {
def main(args: Array[String]): Unit = {
    Class.forName("oracle.jdbc.driver.OracleDriver")
    var theConf = new SparkConf().setAppName("AAAAAA").setMaster("local")
    var theSC = new SparkContext(theConf)
    var theSC2 = new SQLContext(theSC)                                //用戶/密碼@//ip地址:端口/實例名
    var theJdbcDF = theSC2.load("jdbc", Map("url" -> "jdbc:oracle:thin:test/test@//192.168.1.***:1521/orcl",
      "dbtable" -> "(select * from **_BUS_PASSENGER_UPDOWN_PRE5) a", "driver" -> "oracle.jdbc.driver.OracleDriver"))
    theJdbcDF.registerTempTable("myuser")
    var theDT = theSC2.sql("select * from myuser")
    theDT.show();
    theDT.registerTempTable("tempsum")
  }
}

下面是另一種連接oracale沒有使用過

def getConnection() = { 
Class.forName(“oracle.jdbc.driver.OracleDriver”).newInstance() 
DriverManager.getConnection(“jdbc:oracle:thin:@192.168.1.***:1521:orcl”, “root”, “12 
3456”) 
} 
val rdd = new JdbcRDD(sc,getConnection, 
“SELECT * FROM table WHERE ? <= ID AND ID <= ?”, 
Long.MinValue, Long.MaxValue, 2 ,(r: ResultSet) => { r.getInt(“id”)+”\t”+r.getString(“name”)} 
) 

上面oracle連接需要oracle的驅動連接jar包,自己引入

 

maven文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <spark.version>2.2.1</spark.version>
        <scala.version>2.11</scala.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.codehaus.jettison</groupId>
            <artifactId>jettison</artifactId>
            <version>1.3.4</version>
        </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <!-- --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- simple logg facade for java(slf4-api.jar)日志接口 和 log4j具體日志系統之間的適配器 --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> <!-- --> <!-- https://mvnrepository.com/artifact/com.101tec/zkclient --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.19</version> <configuration> <skip>true</skip> </configuration> </plugin> </plugins> </build> </project>

 3.spark連接oracle簡單的方法

package JiSuanPai

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object Ceshi03 {
  def main(args: Array[String]): Unit = {
    Class.forName("oracle.jdbc.driver.OracleDriver")
    val url = "jdbc:oracle:thin:test/test@//192.168.1.238:1521/orcl"

    //加載數據庫中表
    val conf = new SparkConf().setAppName("SQL").setMaster("local")
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val jdbcDF = spark.read.format("jdbc").options(
      Map("url" -> url,
        "user" -> "test",
        "password" -> "test",
        "dbtable" -> "TM_BUS_PASSENGER_UPDOWN")).load()

    //由於要調用jdbc,需要導入Java類,也可直接在url中指明
    val prop = new java.util.Properties
    prop.setProperty("user", "test")
    prop.setProperty("password", "test")

    //這個是創建一個視圖
    jdbcDF.createOrReplaceTempView("AAAA03")
    //下面就可以寫您那邊的業務邏輯了
    val AAAA03 = spark.sql("select *  FROM  AAAA03  ")
     AAAA03.show();
    AAAA03.write.mode(SaveMode.Overwrite).jdbc(url, "TM_BUS_PASSENGER_UPDOWN04", prop)

    spark.close();
  }
}

 ----------------------------------------------------------------下面是spark sql在集群上跑oracle的數據並創建數據表-----------------------------------------------------------------------

代碼1.下面是scala文件

package JiSuanPai

import org.apache.spark.SparkConf
import org.apache.spark.sql.{SaveMode, SparkSession}

object Ceshi03 {
  def main(args: Array[String]): Unit = {


    Class.forName("oracle.jdbc.driver.OracleDriver")
    val url = "jdbc:oracle:thin:test/test@//192.168.1.238:1521/orcl"


    //加載數據庫中表
    val conf = new SparkConf().setAppName("jiqun").setMaster("spark://192.168.1.116:7077")
//跑集群下面注釋的這個就不用了
//.setJars(List("D:\\IDEA2017.2.6\\workplace\\test\\out\\artifacts\\test_jar\\test.jar")) val spark = SparkSession.builder().config(conf).getOrCreate() val jdbcDF = spark.read.format("jdbc").options( Map("url" -> url, "user" -> "test", "password" -> "test", "dbtable" -> "TM_BUS_PASSENGER_UPDOWN_PRE5")).load() //由於要調用jdbc,需要導入Java類,也可直接在url中指明 val prop = new java.util.Properties prop.setProperty("user", "test") prop.setProperty("password", "test") //這個是創建一個視圖 jdbcDF.createOrReplaceTempView("AAAA03") //下面就可以寫您那邊的業務邏輯了 val AAAA03 = spark.sql("select * FROM AAAA03 ") // AAAA03.show(); AAAA03.write.mode(SaveMode.Overwrite).jdbc(url, "U00003", prop) spark.close(); } }

2.idea將項目達成jar包

3.在集群處理

  1).默認hadoop集群安裝完成,我的在hadoop-2,hadoop-3,hadoop-4,hadoop-5這四個節點 安裝spark了。其中hadoop-2是spark的主節點(有Master進程)。

   2).將剛剛打好的jar包放到spark的主節點上即可(hadoop-2節點上,我將我剛剛打的jar包重命名為test05.jar,因為之前這個下面有test.jar。)

//下面是我的hadoop-2節點目錄結構,當前目錄位置,以及我安裝的spark路徑,注意標紅色的
[root@hadoop-2 ~]# ll total 329040 -rw-------. 1 root root 1262 Feb 22 16:04 anaconda-ks.cfg -rw-r--r-- 1 root root 676 May 4 18:00 derby.log drwxr-xr-x 6 root root 4096 Mar 12 17:26 logstash-2.3.4 drwxr-xr-x 5 root root 126 May 4 18:00 metastore_db -rw-r--r-- 1 root root 113412083 Feb 23 14:56 scala-2.11.8.rpm -rw-r--r-- 1 root root 200934340 Feb 23 15:04 spark-2.2.1-bin-hadoop2.7.tgz -rw-r--r-- 1 root root 21122 Feb 28 17:59 spark221.jar drwxr-xr-x 2 root root 6 Feb 28 17:52 spark-warehouse -rw-r--r-- 1 root root 35139 Feb 23 15:42 test01.jar -rw-r--r-- 1 root root 136845 May 30 10:06 test05.jar -rw-r--r-- 1 root root 23942 Mar 20 14:44 test3.jar -rw-r--r-- 1 root root 56504 May 4 14:28 test.jar -rw-r--r--. 1 root root 22261552 Feb 22 17:47 zookeeper-3.4.8.tar.gz -rw-r--r--. 1 root root 25974 May 30 09:07 zookeeper.out [root@hadoop-2 ~]# pwd /root [root@hadoop-2 ~]# cd /usr/local/spark/spark-2.2.1-bin-hadoop2.7/

4)將orcale的驅動放到該目錄下:  /usr/local/spark/spark-2.2.1-bin-hadoop2.7/jars 

3)執行命令讓spark開始作業調度

                                                             //主spark集群                  //自己oralce的驅動包                 //驅動類                                                                         //代碼  包+類                 //jar包放的位置     
[root@hadoop-2 spark-2.2.1-bin-hadoop2.7]# bin/spark-submit --master spark:hadoop-2:7077 --jars jars/ojdbc14-10.2.0.4.0.jar --driver-class-path jars/ojdbc14-10.2.0.4.0.jar.oracle.jdbc.driver.OracleDriver --class JiSuanPai.Ceshi03 file:///root/test05.jar

上面這個我執行成功了。下面這些我沒有執行成功,但可以參考一下。

[root@hadoop-2 bin]# spark-submit --class JiSuanPai.Ceshi03 --master spark:hadoop-2:7077 file:///root/test05.jar  JiSuanPai.Ceshi03

[root@hadoop
-2 spark-2.2.1-bin-hadoop2.7]# bin/spark-submit --master spark:hadoop-2:7077 --driver-class-path /usr/local/spark/spark-2.2.1-bin-hadoop2.7/jars/ojdbc14-10.2.0.4.0.jar --class JiSuanPai.Ceshi03 file:///root/test05.jar

*題外話,若想訪問spark的UI界面,需要啟動主節點的spark-shell。然后訪問:192.168.1.116:4040即可

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM