因為論文關系要用到pyspark,具體情形如下:
有一個list=['aaa','bbb','ccc','ddd'],然后有一個rdd內數據類型是str,eg:'abcdefg',正常如果是需要篩選數組包含的rdd數據可以定義一個broadcast,然后寫成:
broadcastvalue = sc.broadcast(list) rdd.filter(lambda x:x in broadcastvalue.value).collect()
我的需求是要篩選str中包含有list中任意一個數據的那些數據,eg:如果str ='aaaxxxxxx',因為list[0]='aaa' in str,所以這個數據是我需要的,開始時嘗試寫成:
def choice(data,list): for i in list: if i in data: return True return False broadcastvalue = sc.broadcast(list) rdd.filter(lambda x:choice(x,broadcastvalue.value)).collect()
但是這樣會報錯broadcast is not iterable,這是說明broadcast是一個不可迭代的對象,搜索無果后想到了解決方案,竟然不可以迭代那么我就用非迭代的方式遍歷就行了:
def choice(data,list): for i in range(len(list)): if list[i] in data: return True return False broadcastvalue = sc.broadcast(list) rdd.filter(lambda x:choice(x,broadcastvalue.value)).collect()
其實修改很簡單,只是不再用它作為一個迭代對象來遍歷了。
廢話語錄:
在做這個的時候python3碰上了許多的UnicodeError問題,解決思路:
1.肯定是編碼問題
2.讀取數據庫的先判斷數據庫的編碼,表編碼、字段編碼,讀取csv的先判斷csv的編碼
3.再判斷python的腳本編碼
4.再判斷各個字符串的編碼情況(str.encode('utf-8'))
最后發現是在存csv的時候字段編碼是ascii,然后用.encode('utf-8')報了UnicodeError錯誤,最后找到了解決方法.encode('utf-8',errors='ignore')