1 概述
Spark的on Yarn模式。其資源分配是交給Yarn的ResourceManager來進行管理的。可是眼下的Spark版本號,Application日志的查看,僅僅能通過Yarn的yarn logs命令實現。
在部署和執行Spark Application的過程中,假設不注意一些小的細節,或許會導致一些問題的出現。
2 防火牆
部署好Spark的包和配置文件,on yarn的兩種模式都無法執行,在NodeManager端的日志都是說Connection Refused,連接不上Driver所在的client節點,可是client的80port能夠正常訪問!同一時候,在日志中有類似信息出現:
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory
內存肯定是夠的。但就是無法獲取資源。檢查防火牆,果然client僅僅開啟的對80port的訪問,其它都禁止了!
假設你的程序在執行的時候也有類似連接被拒絕的情況。最好也是先檢查下防火牆的配置!
3 Spark Driver程序host的指定
部署完Spark后,分別使用yarn-cluster模式和yarn-client模式執行Spark自帶的計算pi的演示樣例。
Spark的一些配置文件除了一些基本屬性外,均未做配置,結果執行的時候兩種執行模式出現了不同的狀況。
yarn-cluster模式可以正常執行,yarn-client模式總是執行失敗。查看ResourceManager、NodeManager端的日志。發現程序總是找不到ApplicationMaster,這就奇怪了!
而且,客戶端的Driver程序開啟的port。在NodeManager端訪問被拒絕。非Spark的其它MR任務。可以正常執行。
檢查client配置文件。發現原來在client的/etc/hosts文件里。client的一個IP相應了多個Host,Driver程序會默認去取最后相應的那個Host,比方是hostB,可是在NodeManager端是配置的其它Host。hostA。所以導致程序無法訪問。為了不影響其它的程序使用client的Host列表,這里在Spark配置文件spark-defaults.conf中使用屬性spark.driver.host來指定yarn-client模式執行中和Yarn通信的DriverHost。此時yarn-client模式能夠正常執行。
上面配置完了之后,發現yarn-cluster模式又不能執行了!
想想原因,肯定是上面那個配置參數搞的鬼,凝視掉之后,yarn-cluster模式能夠繼續執行。原因是,yarn-cluster模式下。spark的入口函數是在client執行,可是Driver的其它功能是在ApplicationMaster中執行的。上面的那個配置相當於指定了ApplicationMaster的地址,實際上的ApplicationMaster在yarn-master模式下是由ResourceManager隨機指定的。
4 on Yarn日志的查看
測試環境下,通過yarn logs -applicationId xxx能夠查看執行結束的Application的日志,可是搞到還有一個環境下發現使用上述命令查看日志時,總是提演示樣例如以下信息:
Logs not available at /tmp/nm/remote/logs/hadoop/logs/application_xxx_xxx
Log aggregation has not completed or is not enabled.
去相應的NodeManger文件夾下,確實找不到日志文件。
可是/tmp/nm/remote/logs卻是在yarn-site.xml中指定了的文件夾。這個是對的,究竟什么原因呢?難道是Yarn的日志聚集沒有起作用?
去NodeManager上查看相應Application的日志:
2014-08-04 09:14:47,513 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Starting aggregate log-file for app application_xxx_xxx at /tmp/nm/remote/logs/spark/logs/application_xxx_xxx/hostB.tmp 2014-08-04 09:14:47,525 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_01_000007. Current good log dirs are /data/nm/log 2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Uploading logs for container container_xxx_xxx_000001. Current good log dirs are /data/nm/log 2014-08-04 09:14:47,526 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Deleting path : /data/nm/log/application_xxx_xxx 2014-08-04 09:14:47,607 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregatorImpl: Finished aggregate log-file for app application_xxx_xxx
可見,日志聚集確實起作用了,可是為什么通過命令不能查看!
猛然見看到日志中“/tmp/nm/remote/logs/spark/logs/ application_xxx_xxx/hostB.tmp”。日志的路徑有問題,在使用yarn logs命令查看的時候。用的是hadoop用戶。實際Spark Application的提交運行用的是spark用戶,而yarn logs命令默認去找的是當前用戶的路徑,這就是查看不到日志的原因。切換到spark用戶再查看,日志最終出來了。
5 LZO相關問題
假設在Spark中使用了LZO作為EventLog的的壓縮算法等,就得實現安裝好LZO這個東東,否則會出現類似於例如以下的異常:
Caused by: java.lang.IllegalArgumentException: Compression codec com.hadoop.compression.lzo.LzoCodec not found. at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:134) at org.apache.hadoop.io.compress.CompressionCodecFactory.<init>(CompressionCodecFactory.java:174) at org.apache.hadoop.mapred.TextInputFormat.configure(TextInputFormat.java:45) ... 66 more Caused by: java.lang.ClassNotFoundException: Class com.hadoop.compression.lzo.LzoCodec not found at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1680) at org.apache.hadoop.io.compress.CompressionCodecFactory.getCodecClasses(CompressionCodecFactory.java:127) ... 68 more
或者
[ERROR] [2014-08-05 10:34:41 933] com.hadoop.compression.lzo.GPLNativeCodeLoader [main] (GPLNativeCodeLoader.java:36) Could not load native gpl library java.lang.UnsatisfiedLinkError: no gplcompression in java.library.path
解決的方法就是得安裝好LZO,而且在HDFS、SPARK中配置好相關的包、文件等,詳細步驟見:
http://find.searchhub.org/document/a128707a98fe4ec6
https://github.com/twitter/hadoop-lzo/blob/master/README.md
http://hsiamin.com/posts/2014/05/03/enable-lzo-compression-on-hadoop-pig-and-spark/
6 Spark Hive無法訪問Mysql的問題
生產環境下,節點之間肯定是有防火牆限制的。並且Hive的元數據庫Mysql,更是對請求的IP和用戶等限制的嚴格,假設在Spark集群中使用yarn-cluster模式進行提交Spark的Application。其執行時Driver是和ApplicationMaster執行在一起。由Yarn的ResourceManager負責分配到集群中的某個NodeManager節點上,假設在Hive-site.xml中僅僅配置了Mysql數據庫而沒有配置MetaStore的話,或許會遇到連接元數據庫失敗的問題,此時,就得看下Hive-site.xml的配置,是否Mysql的相關權限配置正確、MetaStore服務能否夠正常連接。
7 內存溢出問題
在Spark中使用hql方法運行hive語句時。因為其在查詢過程中調用的是Hive的獲取元數據信息、SQL解析,而且使用Cglib等進行序列化反序列化。中間可能產生較多的class文件,導致JVM中的持久代使用較多,假設配置不當,可能引起類似於例如以下的OOM問題:
Exception in thread "Thread-2" java.lang.OutOfMemoryError: PermGen space
原因是實際使用時。假設用的是JDK1.6版本號,Server模式的持久代默認大小是64M,Client模式的持久代默認大小是32M,而Driver端進行SQL處理時。其持久代的使用可能會達到90M,導致OOM溢出。任務失敗。
解決方法就是在Spark的conf文件夾中的spark-defaults.conf里。添加對Driver的JVM配置。由於Driver才負責SQL的解析和元數據獲取。
配置例如以下:
spark.driver.extraJavaOptions -XX:PermSize=128M -XX:MaxPermSize=256M
可是,上述情況是在yarn-cluster模式下出現。yarn-client模式執行時倒是正常的。原來在$SPARK_HOME/bin/spark-class文件里已經設置了持久代大小:
JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"
當以yarn-client模式執行時。driver就執行在客戶端的spark-submit進程中。其JVM參數是取的spark-class文件里的設置,所謂未出現持久代溢出現象。
總結一下Spark中各個角色的JVM參數設置:
(1)Driver的JVM參數:
-Xmx。-Xms,假設是yarn-client模式,則默認讀取spark-env文件里的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值一樣大小;假設是yarn-cluster模式。則讀取的是spark-default.conf文件里的spark.driver.extraJavaOptions相應的JVM參數值。
PermSize,假設是yarn-client模式,則是默認讀取spark-class文件里的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;假設是yarn-cluster模式。讀取的是spark-default.conf文件里的spark.driver.extraJavaOptions相應的JVM參數值。
GC方式,假設是yarn-client模式,默認讀取的是spark-class文件里的JAVA_OPTS。假設是yarn-cluster模式,則讀取的是spark-default.conf文件里的spark.driver.extraJavaOptions相應的參數值。
以上值最后均可被spark-submit工具中的--driver-java-options參數覆蓋。
(2)Executor的JVM參數:
-Xmx,-Xms。假設是yarn-client模式,則默認讀取spark-env文件里的SPARK_EXECUTOR_MEMORY值,-Xmx。-Xms值一樣大小。假設是yarn-cluster模式,則讀取的是spark-default.conf文件里的spark.executor.extraJavaOptions相應的JVM參數值。
PermSize。兩種模式都是讀取的是spark-default.conf文件里的spark.executor.extraJavaOptions相應的JVM參數值。
GC方式。兩種模式都是讀取的是spark-default.conf文件里的spark.executor.extraJavaOptions相應的JVM參數值。
(3)Executor數目及所占CPU個數
假設是yarn-client模式。Executor數目由spark-env中的SPARK_EXECUTOR_INSTANCES指定,每一個實例的數目由SPARK_EXECUTOR_CORES指定;假設是yarn-cluster模式。Executor的數目由spark-submit工具的--num-executors參數指定,默認是2個實例,而每一個Executor使用的CPU數目由--executor-cores指定,默覺得1核。
每一個Executor執行時的信息能夠通過yarn logs命令查看到,類似於例如以下:
14/08/13 18:12:59 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Setting up executor with commands: List($JAVA_HOME/bin/java, -server, -XX:OnOutOfMemoryError='kill %p', -Xms1024m -Xmx1024m , -XX:PermSize=256M -XX:MaxPermSize=256M -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintHeapAtGC -Xloggc:/tmp/spark_gc.log, -Djava.io.tmpdir=$PWD/tmp, -Dlog4j.configuration=log4j-spark-container.properties, org.apache.spark.executor.CoarseGrainedExecutorBackend, akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler, 1, sparktest2, 3, 1>, <LOG_DIR>/stdout, 2>, <LOG_DIR>/stderr)
當中。akka.tcp://spark@sparktest1:41606/user/CoarseGrainedScheduler表示當前的Executor進程所在節點,后面的1表示Executor編號。sparktest2表示ApplicationMaster的host,接着的3表示當前Executor所占用的CPU數目。
8 序列化異常
在Spark上運行hive語句的時候,出現類似於例如以下的異常:
org.apache.spark.SparkDriverExecutionException: Execution error at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:849) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1231) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassCastException: scala.collection.mutable.HashSet cannot be cast to scala.collection.mutable.BitSet at org.apache.spark.sql.execution.BroadcastNestedLoopJoin$$anonfun$7.apply(joins.scala:336) at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:813) at org.apache.spark.rdd.RDD$$anonfun$19.apply(RDD.scala:810) at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56) at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:845)
排查其前后的日志。發現大都是序列化的東西:
14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Serialized task 8.0:3 as 20849 bytes in 0 ms 14/08/13 11:10:01 INFO org.apache.spark.Logging$class.logInfo(Logging.scala:58): Finished TID 813 in 25 ms on sparktest0 (progress: 3/200)
而在spark-default.conf中,事先設置了序列化方式為Kryo:
spark.serializer org.apache.spark.serializer.KryoSerializer
依據異常信息,可見是HashSet轉為BitSet類型轉換失敗。Kryo把松散的HashSet轉換為了緊湊的BitSet,把序列化方式凝視掉之后,任務能夠正常運行。難道Spark的Kryo序列化做的還不到位?此問題須要進一步跟蹤。
9 Executor僵死問題
執行一個Spark任務,發現其執行速度遠遠慢於執行相同SQL語句的Hive的執行,甚至出現了OOM的錯誤,最后卡住達幾小時!
而且Executor進程在瘋狂GC。
截取其一Task的OOM異常信息:
能夠看到這是在序列化過程中發生的OOM。
依據節點信息。找到相應的Executor進程,觀察其Jstack信息:
Thread 36169: (state = BLOCKED) - java.lang.Long.valueOf(long) @bci=27, line=557 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=5, line=113 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultSerializers$LongSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=103 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=158, line=338 (Compiled frame) - com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=4, line=293 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readObject(com.esotericsoftware.kryo.io.Input, java.lang.Class, com.esotericsoftware.kryo.Serializer) @bci=136, line=651 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(com.esotericsoftware.kryo.io.Input, java.lang.Object) @bci=143, line=605 (Compiled frame) - com.esotericsoftware.kryo.serializers.FieldSerializer.read(com.esotericsoftware.kryo.Kryo, com.esotericsoftware.kryo.io.Input, java.lang.Class) @bci=44, line=221 (Compiled frame) - com.esotericsoftware.kryo.Kryo.readClassAndObject(com.esotericsoftware.kryo.io.Input) @bci=158, line=732 (Compiled frame) - org.apache.spark.serializer.KryoDeserializationStream.readObject(scala.reflect.ClassTag) @bci=8, line=118 (Compiled frame) - org.apache.spark.serializer.DeserializationStream$$anon$1.getNext() @bci=10, line=125 (Compiled frame) - org.apache.spark.util.NextIterator.hasNext() @bci=16, line=71 (Compiled frame) - org.apache.spark.storage.BlockManager$LazyProxyIterator$1.hasNext() @bci=4, line=1031 (Compiled frame) - scala.collection.Iterator$$anon$13.hasNext() @bci=4, line=371 (Compiled frame) - org.apache.spark.util.CompletionIterator.hasNext() @bci=4, line=30 (Compiled frame) - org.apache.spark.InterruptibleIterator.hasNext() @bci=22, line=39 (Compiled frame) - scala.collection.Iterator$$anon$11.hasNext() @bci=4, line=327 (Compiled frame) - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(scala.collection.Iterator, scala.collection.Iterator) @bci=14, line=77 (Compiled frame) - org.apache.spark.sql.execution.HashJoin$$anonfun$execute$1.apply(java.lang.Object, java.lang.Object) @bci=9, line=71 (Interpreted frame) - org.apache.spark.rdd.ZippedPartitionsRDD2.compute(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=48, line=87 (Interpreted frame) - org.apache.spark.rdd.RDD.computeOrReadCheckpoint(org.apache.spark.Partition, org.apache.spark.TaskContext) @bci=26, line=262 (Interpreted frame)
有大量的BLOCKED線程,繼續觀察GC信息。發現大量的FULL GC。
分析。在插入Hive表的時候,實際上須要寫HDFS,在此過程的HashJoin時,伴隨着大量的Shuffle寫操作。JVM的新生代不斷GC,Eden Space寫滿了就往Survivor Space寫,同一時候超過一定大小的數據會直接寫到老生代。當新生代寫滿了之后,也會把老的數據搞到老生代。假設老生代空間不足了。就觸發FULL GC,還是空間不夠,那就OOM錯誤了,此時線程被Blocked。導致整個Executor處理數據的進程被卡住。
當處理大數據的時候,假設JVM配置不當就easy引起上述問題。解決辦法就是增大Executor的使用內存。合理配置新生代和老生代的大小,能夠將老生代的空間適當的調大點。
10 小節
問題是比較嚴重,Application都直接無法執行了。可是引起問題的解決辦法都比較小,歸根結底還是部署的時候環境較為復雜,不夠細致!再接再礪!
以后遇到相關的問題,會再這里持續更新,方便自己,也方便遇到類似問題的朋友們!
-------------------------------------------------------------------------------
假設您看了本篇博客,認為對您有所收獲。請點擊下方的 [頂]
假設您想轉載本博客。請注明出處
假設您對本文有意見或者建議,歡迎留言
感謝您的閱讀。請關注我的興許博客