IDEA連接Spark集群執行Scala程序


1、首先安裝Scala插件,File->Settings->Plugins,搜索出Scla插件,點擊Install安裝;
2、File->New Project->maven,新建一個Maven項目,填寫GroupId和ArtifactId;


3、編輯pom.xml文件
添加項目所需要的依賴:前面幾行是系統自動生成的,我們只需要從 1.0-SNAPSHOT 之后開始添加就行。關於spark.version和scala.version需要在服務器通過啟動spark-shell查詢。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>test</groupId>
    <artifactId>SparkPi</artifactId>
    <version>1.0-SNAPSHOT</version>
    
    <properties>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11</scala.version>
    </properties>
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

4、File->Project Structure->Libraries,選擇和Spark運行環境一致的Scala版本

5、File->Project Structure->Modules,在src/main/下面增加一個scala文件夾,並且設置成source文件夾

6、在scala文件夾下面新建一個scala文件SparkPi

SparkPi文件的代碼如下:其中,setMaster用來指定spark集群master的位置;setJars用來指定程序jar包的位置,此位置在下面1步中添加程序jar包的output directory可以看到。


import scala.math.random
import org.apache.spark._

object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster("spark://222.201.187.178:7077").setJars(Seq("E:\\IdeaProjects\\SparkPi\\out\\artifacts\\SparkPi_jar\\SparkPi.jar"))
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    println("Time:" + spark.startTime)
    val n = math.min(1000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}

7、File->Project Structure->Artifacts,新建一個Jar->From modules with dependencies…,選擇Main Class,之后在Output Layput中刪掉不必要的jar


注意這里如果沒有刪除沒用的jar包,后面執行會報錯java.lang.ClassNotFoundException: SparkPi$$anonfun$1

8、在服務器集群配置文件/usr/local/spark/conf/spark-env.sh中加入以下代碼:

export SPARK_SUBMIT_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"
# address:JVM在5005端口上監聽請求,這個設定為一個不沖突的端口即可。  
# server:y表示啟動的JVM是被調試者,n表示啟動的JVM是調試器。
# suspend:y表示啟動的JVM會暫停等待,直到調試器連接上才繼續執行,n則JVM不會暫停等待。

9、在服務器Master節點主機上啟動hadoop集群,然后再啟動spark集群,最后運行jps命令檢查進程。

cd /usr/local/hadoop/
sbin/start-all.sh # 啟動hadoop集群
cd /usr/local/spark/
sbin/start-master.sh # 啟動Master節點
sbin/start-slaves.sh # 啟動所有Slave節點
jps

10、在IDEA上添加遠程配置,根據spark集群中spark-env.sh的SPARK_SUBMIT_OPTS的變量,對遠程執行進行配置,保持端口號一致

11、配置完成,右鍵run執行scala程序。初次運行報錯如下,選擇右下角彈窗中的enable auto import,然后再重新執行一次。

12、結束記得關閉spark集群

sbin/stop-master.sh # 關閉Master節點
sbin/stop-slaves.sh # 關閉Worker節點
cd /usr/local/hadoop/
sbin/stop-all.sh # 關閉Hadoop集群

參考鏈接:https://blog.csdn.net/weixin_38493025/article/details/103365712


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM