找了很多文章,没有详细说明如何在docker上搭建spark,写一篇随笔做记录
一,搭建spark
二,运行一个wordcount
硬件:centos 8.0 64位 阿里云ECS服务器
安装docker-compose
sudo pip install docker-compose==1.4.0
拉取镜像:
docker pull singularities/spark:latest
我这里使用的是:singularities/spark这个镜像 https://registry.hub.docker.com/r/singularities/spark
创建目录
mkdir /root/docker-spark
创建对应master的卷目录
mkdir /root/docker-spark/data-volumes-master
创建对应worker的卷目录
mkdir /root/docker-spark/data-volumes-worker
创建docker-compose.yml
version: "2" services: master: image: singularities/spark command: start-spark master hostname: master ports: - "6066:6066" - "7070:7070" - "8080:8080" - "50070:50070" volumes: - /root/docker-spark/data-volumes-master:/data-volumes worker: image: singularities/spark command: start-spark worker master environment: SPARK_WORKER_CORES: 1 SPARK_WORKER_MEMORY: 2g links: - master volumes: - /root/docker-spark/data-volumes-worker:/data-volumes
有如下:
建立JavaWordCount.java
/** * @ClassName JavaWordCount * @Author JK * @Description * @Date 2020-11-19 13:17 * @Version 1.0 */ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.SparkSession; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; /** * Created by xiaosi on 17-2-13. * * Spark 测试程序 WordCount * */ public final class JavaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaWordCount <file>"); System.exit(1); } SparkSession spark = SparkSession.builder().appName("JavaWordCount").getOrCreate(); JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD(); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) { return Arrays.asList(SPACE.split(s)).iterator(); } }); JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } spark.stop(); } }
其pom.xml如下
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.jk</groupId> <artifactId>TestSpark</artifactId> <version>1.0-SNAPSHOT</version> <properties> <spark.version>2.2.1</spark.version> </properties> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>6</source> <target>6</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies> </project>
build成jar包:略
然后建立word.txt做数据(自己喜欢怎么填都行,空格隔开)
aa bb cc dd we adrq gdsgwet sda aweq
打包好的jar包和txt数据,上传到之前的master卷中
使用docker exec 命令进入master容器
我们在hadoop中的根目录下创建目录data,并把work.txt上传到hadoop里面
#建立目录
hadoop fs -mkdir /data
#上传文件
hadoop fs -put word.txt hdfs://master:8020/data/word.txt
在 yourserver's ip:50070下的browse directory可以看到word.txt
然后在你的bash下执行
spark-submit --master spark://master:7077 --name MyWordCount --class JavaWordCount /data-volumes/TestSpark.jar hdfs://master:8020/data/word.txt --executor-memory 512M --executor-core 1 --num-executors 1 --driver-cores 1
spark-summit的参数参考:
--master master 的地址,提交任务到哪里执行,例如 spark://host:port, yarn, local --deploy-mode 在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client --class 应用程序的主类,仅针对 java 或 scala 应用 --name 应用程序的名称 --jars 用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下 --packages 包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标 --exclude-packages 为了避免冲突 而指定不包含的 package --repositories 远程 repository --conf PROP=VALUE 指定 spark 配置属性的值,例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m" --properties-file 加载的配置文件,默认为 conf/spark-defaults.conf --driver-memory Driver内存,默认 1G --driver-java-options 传给 driver 的额外的 Java 选项 --driver-library-path 传给 driver 的额外的库路径 --driver-class-path 传给 driver 的额外的类路径 --driver-cores Driver 的核数,默认是1。在 yarn 或者 standalone 下使用 --executor-memory 每个 executor 的内存,默认是1G --total-executor-cores 所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用 --num-executors 启动的 executor 数量。默认为2。在 yarn 下使用 --executor-core 每个 executor 的核数。在yarn或者standalone下使用
你就会在你的 ip:8080看到正在running的application
但是!!!!!!还是被执行时kill掉
如果把命令的参数从 --master spark://master:7077 换成 --master local
spark-submit --master local --name MyWordCount --class JavaWordCount /data-volumes/TestSpark.jar hdfs://master:8020/data/word.txt --executor-memory 512M --executor-core 1 --num-executors 1 --driver-cores 1
即可运行顺利
分析原因:
local模式下 驱动程序driver就是执行了一个Spark Application的main函数和创建Spark Context的进程,它包含了这个application的全部代码。
而如果在standalone下,估计是Spark Application 检测到1个core,导致在Registered StateStoreCoordinator endpoint就发生错误,kill掉了
具体原因仍在排查,欢迎指出不对!
补充原因:
以下是错误日志:
内存状态:初步估计内存问题,待有机会升级服务器再尝试解决问题