[python](windows)分布式進程問題:pickle模塊不能序列化lambda函數


運行錯誤:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed

代碼如下:

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 
 4 import random, time, queue
 5 from multiprocessing.managers import BaseManager
 6 
 7 # 發送任務的隊列:
 8 task_queue = queue.Queue()
 9 # 接收結果的隊列:
10 result_queue = queue.Queue()
11 
12 # 從BaseManager繼承的QueueManager:
13 class QueueManager(BaseManager):
14     pass
15 
16 # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
17 QueueManager.register('get_task_queue', callable=lambda: task_queue)
18 QueueManager.register('get_result_queue', callable=lambda: result_queue)
19 
20 # 綁定端口5000, 設置驗證碼'abc':
21 manager = QueueManager(address=('', 5000), authkey=b'abc')
22 
23 # 啟動Queue:
24 manager.start()
25 
26 # 獲得通過網絡訪問的Queue對象:
27 task = manager.get_task_queue()
28 result = manager.get_result_queue()
29 
30 # 放幾個任務進去:
31 for i in range(10):
32     n = random.randint(0, 10000)
33     print('Put task %d...' % n)
34     task.put(n)
35 
36 # 從result隊列讀取結果:
37 print('Try get results...')
38 for i in range(10):
39     r = result.get(timeout=10)
40     print('Result: %s' % r)
41 
42 # 關閉:
43 manager.shutdown()
44 print('master exit.')

報錯信息:

 1 Traceback (most recent call last):
 2   File "task_master.py", line 22, in <module>
 3     manager.start()
 4   File "E:\Anaconda\Anaconda3\lib\multiprocessing\managers.py", line 513, in start
 5     self._process.start()
 6   File "E:\Anaconda\Anaconda3\lib\multiprocessing\process.py", line 105, in start
 7     self._popen = self._Popen(self)
 8   File "E:\Anaconda\Anaconda3\lib\multiprocessing\context.py", line 322, in _Popen
 9     return Popen(process_obj)
10   File "E:\Anaconda\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 65, in __init__
11     reduction.dump(process_obj, to_child)
12   File "E:\Anaconda\Anaconda3\lib\multiprocessing\reduction.py", line 60, in dump
13     ForkingPickler(file, protocol).dump(obj)
14 _pickle.PicklingError: Can't pickle <function <lambda> at 0x000002BAAEF12F28>: attribute lookup <lambda> on __main__ failed

錯誤原因:pickle模塊不能序列化lambda,需要自定義函數

修改代碼如下:

 

 1 #!/usr/bin/env python3
 2 # -*- coding: utf-8 -*-
 3 
 4 import random, time, queue
 5 from multiprocessing.managers import BaseManager
 6 
 7 # 發送任務的隊列:
 8 task_queue = queue.Queue()
 9 # 接收結果的隊列:
10 result_queue = queue.Queue()
11 
12 # 自定義函數re_task_queue
13 def re_task_queue():  
14     global task_queue  
15     return task_queue  
16 
17 # 自定義函數re_result_queue
18 def re_result_queue():  
19     global result_queue  
20     return result_queue
21 
22 # 從BaseManager繼承的QueueManager:
23 class QueueManager(BaseManager):
24     pass
25 
26 if __name__ == '__main__':
27 
28     # 把兩個Queue都注冊到網絡上, callable參數關聯了Queue對象:
29     QueueManager.register('get_task_queue', callable=re_task_queue)
30     QueueManager.register('get_result_queue', callable=re_result_queue)
31 
32     # 綁定端口5000, 設置驗證碼'abc':
33     manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
34 
35     # 啟動Queue:
36     manager.start()
37 
38     # 獲得通過網絡訪問的Queue對象:
39     task = manager.get_task_queue()
40     result = manager.get_result_queue()
41 
42     # 放幾個任務進去:
43     for i in range(10):
44         n = random.randint(0, 10000)
45         print('Put task %d...' % n)
46         task.put(n)
47 
48     # 從result隊列讀取結果:
49     print('Try get results...')
50     for i in range(10):
51         r = result.get(timeout=10)
52         print('Result: %s' % r)
53         
54     # 關閉:
55     manager.shutdown()
56     print('master exit.')

 

運行結果:

C:\Users\Lucky丶M\python>python task_master.py
Put task 4962...
Put task 3460...
Put task 4774...
Put task 4301...
Put task 9120...
Put task 7183...
Put task 4915...
Put task 3173...
Put task 9138...
Put task 5798...
Try get results...

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM