pyspark:'PipelinedRDD' object does not support indexing、 Initial job has not accepted any resources、IOException not a file: hdfs:// XXXX java.sql、Failed to replace a bad datanode on the existing


最近使用Pyspark的時候,遇到一些新的問題,希望記錄下來,解決的我會補充。

1. WARN DomainSocketFactory: The short-circuit local reads feature cannot be used

 

2. pyspark TypeError: 'PipelinedRDD' object does not support indexing

該格式的RDD不能直接索引,但是可以通過其他方式實現:

方法一:使用take之后,再索引 —— some_rdd.take(10)[5] :即表示取前10個中的索引為5的元素;

方法二:如果數據量較少,可以先 collect —— some_rdd.collect() 轉化為array格式的數據,再索引;

方法三:通多lambda函數和map函數可以實現 —— some_rdd.map(lambda x: x)

 

3.WARN DFSClient: Failed to connect to /ip:port for block, add to deadNodes

據說是防火牆原因,但是本人尚未嘗試。

 

4. WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

本人使用過:spark-submit --executor-memory 512M --total-executor-cores 2 test.py

但是這個方法沒有解決這個問題,還在查找中。

原因:可能是內存不足造成的,可以用 free -m 查看一下節點的內存使用情況。

解決方法:

可以嘗試方法一:在spark-env.sh中添加環境變量 —— export SPARK_EXECUTOR_MEMORY=512m

然后重啟之后再執行。

可以嘗試方法二:先清理內存,再執行,即依次執行以下三條命令:

sync    #寫緩存到文件系統
echo 3 > /proc/sys/vm/drop_caches   #手動釋放內存

# 其中:
# 0:不釋放(系統默認值)
# 1:釋放頁緩存
# 2:釋放dentries和inodes
# 3:釋放所有緩存,即清除頁面緩存、目錄項和節點;

free -h     #查看是否已經清理

# 注:指定內存和核,--executor-memory 需要大於450MB, 也就是471859200B

 

5. java.io.IOException not a file: hdfs:// XXXX java.sql.SQLException

解決方法:在spark-sql命令行中,設置參數,即執行:

SET mapred.input.dir.recursive=true;

SET hive.mapred.supports.subdirectories=true;

原因:猜測是因為要讀取的文件或者表在子目錄導致。

 

6. java.io.IOException: Failed to replace a bad datanode on the existing pipeline due to no more good datanodes being availableto try 在pyspark+kafka+sparkstreaming 測試時報錯

解決方法:

方法一:修改hdfs-site.xml

 修改hdfs-site.xml文件,添加或修改如下兩項:

    <property>

        <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>        

        <value>true</value>

    </property>

   

    <property>

        <name>dfs.client.block.write.replace-datanode-on-failure.policy</name>

        <value>NEVER</value>

    </property>

方法二:修改程序,加入配置

import os
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
from pyspark.storagelevel import StorageLevel
from pyspark.sql.types import StructField, StructType, StringType

from pyspark.streaming import StreamingContext   #sparkstreaming
from pyspark.streaming.kafka import KafkaUtils

import warnings
warnings.filterwarnings("ignore")

# cluster模式
conf = SparkConf().setAppName('test')
conf.set(" "," ")    # 各種spark配置
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER")
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", True)
# ......
sc = SparkContext(conf=conf)
......

 

參考:

https://blog.csdn.net/xwc35047/article/details/53933265

https://jingyan.baidu.com/article/375c8e1971d00864f3a22902.html

https://blog.csdn.net/Gavinmiaoc/article/details/80527717

http://www.bubuko.com/infodetail-1789319.html

https://blog.csdn.net/caiandyong/article/details/44730031


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM