運行錯誤:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed
代碼如下:
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 4 import random, time, queue 5 from multiprocessing.managers import BaseManager 6 7 # 發送任務的隊列: 8 task_queue = queue.Queue() 9 # 接收結果的隊列: 10 result_queue = queue.Queue() 11 12 # 從BaseManager繼承的QueueManager: 13 class QueueManager(BaseManager): 14 pass 15 16 # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象: 17 QueueManager.register('get_task_queue', callable=lambda: task_queue) 18 QueueManager.register('get_result_queue', callable=lambda: result_queue) 19 20 # 綁定端口5000, 設置驗證碼'abc': 21 manager = QueueManager(address=('', 5000), authkey=b'abc') 22 23 # 啟動Queue: 24 manager.start() 25 26 # 獲得通過網絡訪問的Queue對象: 27 task = manager.get_task_queue() 28 result = manager.get_result_queue() 29 30 # 放幾個任務進去: 31 for i in range(10): 32 n = random.randint(0, 10000) 33 print('Put task %d...' % n) 34 task.put(n) 35 36 # 從result隊列讀取結果: 37 print('Try get results...') 38 for i in range(10): 39 r = result.get(timeout=10) 40 print('Result: %s' % r) 41 42 # 關閉: 43 manager.shutdown() 44 print('master exit.')
報錯信息:
1 Traceback (most recent call last): 2 File "task_master.py", line 22, in <module> 3 manager.start() 4 File "E:\Anaconda\Anaconda3\lib\multiprocessing\managers.py", line 513, in start 5 self._process.start() 6 File "E:\Anaconda\Anaconda3\lib\multiprocessing\process.py", line 105, in start 7 self._popen = self._Popen(self) 8 File "E:\Anaconda\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen 9 return Popen(process_obj) 10 File "E:\Anaconda\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__ 11 reduction.dump(process_obj, to_child) 12 File "E:\Anaconda\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump 13 ForkingPickler(file, protocol).dump(obj) 14 _pickle.PicklingError: Can't pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed
錯誤原因:pickle模塊不能序列化lambda,需要自定義函數
修改代碼如下:
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 4 import random, time, queue 5 from multiprocessing.managers import BaseManager 6 7 # 發送任務的隊列: 8 task_queue = queue.Queue() 9 # 接收結果的隊列: 10 result_queue = queue.Queue() 11 12 # 自定義函數re_task_queue 13 def re_task_queue(): 14 global task_queue 15 return task_queue 16 17 # 自定義函數re_result_queue 18 def re_result_queue(): 19 global result_queue 20 return result_queue 21 22 # 從BaseManager繼承的QueueManager: 23 class QueueManager(BaseManager): 24 pass 25 26 if __name__ == '__main__': 27 28 # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象: 29 QueueManager.register('get_task_queue', callable=re_task_queue) 30 QueueManager.register('get_result_queue', callable=re_result_queue) 31 32 # 綁定端口5000, 設置驗證碼'abc': 33 manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') 34 35 # 啟動Queue: 36 manager.start() 37 38 # 獲得通過網絡訪問的Queue對象: 39 task = manager.get_task_queue() 40 result = manager.get_result_queue() 41 42 # 放幾個任務進去: 43 for i in range(10): 44 n = random.randint(0, 10000) 45 print('Put task %d...' % n) 46 task.put(n) 47 48 # 從result隊列讀取結果: 49 print('Try get results...') 50 for i in range(10): 51 r = result.get(timeout=10) 52 print('Result: %s' % r) 53 54 # 關閉: 55 manager.shutdown() 56 print('master exit.')
運行結果:
C:\Users\Lucky丶M\python>python task_master.py Put task 4962... Put task 3460... Put task 4774... Put task 4301... Put task 9120... Put task 7183... Put task 4915... Put task 3173... Put task 9138... Put task 5798... Try get results...