最近使用Pyspark的時候,遇到一些新的問題,希望記錄下來,解決的我會補充。
1. WARN DomainSocketFactory: The short-circuit local reads feature cannot be used
2. pyspark TypeError: 'PipelinedRDD' object does not support indexing
該格式的RDD不能直接索引,但是可以通過其他方式實現:
方法一:使用take之后,再索引 —— some_rdd.take(10)[5] :即表示取前10個中的索引為5的元素;
方法二:如果數據量較少,可以先 collect —— some_rdd.collect() 轉化為array格式的數據,再索引;
方法三:通多lambda函數和map函數可以實現 —— some_rdd.map(lambda x: x)
3.WARN DFSClient: Failed to connect to /ip:port for block, add to deadNodes
據說是防火牆原因,但是本人尚未嘗試。
4. WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
本人使用過:spark-submit --executor-memory 512M --total-executor-cores 2 test.py
但是這個方法沒有解決這個問題,還在查找中。
原因:可能是內存不足造成的,可以用 free -m 查看一下節點的內存使用情況。
解決方法:
可以嘗試方法一:在spark-env.sh中添加環境變量 —— export SPARK_EXECUTOR_MEMORY=512m
然后重啟之后再執行。
可以嘗試方法二:先清理內存,再執行,即依次執行以下三條命令:
sync #寫緩存到文件系統 echo 3 > /proc/sys/vm/drop_caches #手動釋放內存 # 其中: # 0:不釋放(系統默認值) # 1:釋放頁緩存 # 2:釋放dentries和inodes # 3:釋放所有緩存,即清除頁面緩存、目錄項和節點; free -h #查看是否已經清理
# 注:指定內存和核,--executor-memory 需要大於450MB, 也就是471859200B
5. java.io.IOException not a file: hdfs:// XXXX java.sql.SQLException
解決方法:在spark-sql命令行中,設置參數,即執行:
SET mapred.input.dir.recursive=true;
SET hive.mapred.supports.subdirectories=true;
原因:猜測是因為要讀取的文件或者表在子目錄導致。
6. java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being availableto try 在pyspark+kafka+sparkstreaming 測試時報錯
解決方法:
方法一:修改hdfs-site.xml
修改hdfs-site.xml文件,添加或修改如下兩項:
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
<value>true</value>
</property>
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>
方法二:修改程序,加入配置
import os from pyspark import SparkContext, SparkConf from pyspark.sql.session import SparkSession from pyspark.sql import HiveContext from pyspark.sql import SQLContext from pyspark.storagelevel import StorageLevel from pyspark.sql.types import StructField, StructType, StringType from pyspark.streaming import StreamingContext #sparkstreaming from pyspark.streaming.kafka import KafkaUtils import warnings warnings.filterwarnings("ignore") # cluster模式 conf = SparkConf().setAppName('test') conf.set(" "," ") # 各種spark配置 conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER") conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", True) # ...... sc = SparkContext(conf=conf)
......
參考:
https://blog.csdn.net/xwc35047/article/details/53933265
https://jingyan.baidu.com/article/375c8e1971d00864f3a22902.html
https://blog.csdn.net/Gavinmiaoc/article/details/80527717
http://www.bubuko.com/infodetail-1789319.html
https://blog.csdn.net/caiandyong/article/details/44730031