pyspark如何遍歷broadcast


因為論文關系要用到pyspark,具體情形如下:

有一個list=['aaa','bbb','ccc','ddd'],然后有一個rdd內數據類型是str,eg:'abcdefg',正常如果是需要篩選數組包含的rdd數據可以定義一個broadcast,然后寫成:

broadcastvalue = sc.broadcast(list)

rdd.filter(lambda x:x in broadcastvalue.value).collect()

我的需求是要篩選str中包含有list中任意一個數據的那些數據,eg:如果str ='aaaxxxxxx',因為list[0]='aaa' in str,所以這個數據是我需要的,開始時嘗試寫成:

def choice(data,list):
    for i in list:
        if i in data:
            return True
    return False
broadcastvalue = sc.broadcast(list)
rdd.filter(lambda x:choice(x,broadcastvalue.value)).collect()

但是這樣會報錯broadcast is not iterable,這是說明broadcast是一個不可迭代的對象,搜索無果后想到了解決方案,竟然不可以迭代那么我就用非迭代的方式遍歷就行了:

def choice(data,list):
    for i in range(len(list)):
        if list[i] in data:
            return True
    return False
broadcastvalue = sc.broadcast(list)
rdd.filter(lambda x:choice(x,broadcastvalue.value)).collect()

其實修改很簡單,只是不再用它作為一個迭代對象來遍歷了。

廢話語錄:

在做這個的時候python3碰上了許多的UnicodeError問題,解決思路:

1.肯定是編碼問題

2.讀取數據庫的先判斷數據庫的編碼,表編碼、字段編碼,讀取csv的先判斷csv的編碼

3.再判斷python的腳本編碼

4.再判斷各個字符串的編碼情況(str.encode('utf-8'))

最后發現是在存csv的時候字段編碼是ascii,然后用.encode('utf-8')報了UnicodeError錯誤,最后找到了解決方法.encode('utf-8',errors='ignore')


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM