Python分布式進程報錯:pickle模塊不能序列化lambda函數


今天在學習到廖老師Python教程的分布式進程時,遇到了一個錯誤:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001710FDC2EA0>: attribute lookup <lambda> on __main__ failed(pickle模塊不能序列化lambda函數)

代碼如下:

 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 
 4 import random, queue
 5 from multiprocessing.managers import BaseManager
 6 
 7 # 發送任務的隊列
 8 task_queue = queue.Queue()
 9 # 接收結果的隊列
10 result_queue = queue.Queue()
11 # 從BaseManager繼承的QueueManager
12 
13 
14 class QueueManager(BaseManager):
15     pass
16 
17 
18 # 吧兩個Queue都注冊到網絡上,callable參數關聯了Queue對象
19 QueueManager.register('get_task_queue', callable=lambda: task_queue)
20 QueueManager.register('get_result_queue', callable=lambda: result_queue)
21 
22 # 綁定端口5000,設置驗證碼abc
23 manager = QueueManager(address=('', 5000), authkey=b'abc')
24 
25 # 啟動queue
26 manager.start()
27 
28 # 獲得通過網絡訪問的Queue對象
29 task = manager.get_task_queue()
30 result = manager.get_result_queue()
31 
32 # 放幾個任務
33 for i in range(10):
34     n = random.randint(0, 1000)
35     print('添加任務 %d' % n)
36     task.put(n)
37 
38 # 從result隊列讀取結果
39 print('嘗試獲取結果')
40 for i in range(10):
41     r = result.get(timeout=10)
42     print('結果是:%s' % r)
43 
44 # 關閉
45 manager.shutdown()
46 print('master exit')

 在教程中我記得有關pickle的事兒(有印象,看來思想還在線上,哈哈),翻了一下,看到:

 原來是系統問題造成的,那么,如何解決呢?在教程中我也看到,遇到這樣的情況,需要我們自己定義函數,實現序列化。

所以對代碼稍加修改,定義兩個函數return_task_queuereturn_result_queue實現序列化:

 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 
 4 import random, queue
 5 from multiprocessing.managers import BaseManager
 6 
 7 # 發送任務的隊列
 8 task_queue = queue.Queue()
 9 # 接收結果的隊列
10 result_queue = queue.Queue()
11 
12 
13 def return_task_queue():
14     global task_queue
15     return task_queue
16 
17 
18 def return_result_queue():
19     global result_queue
20     return result_queue
21 
22 
23 # 從BaseManager繼承的QueueManager
24 
25 
26 class QueueManager(BaseManager):
27     pass
28 
29 
30 if __name__ == '__main__':
31     # 吧兩個Queue都注冊到網絡上,callable參數關聯了Queue對象
32     QueueManager.register('get_task_queue', callable=return_task_queue)
33     QueueManager.register('get_result_queue', callable=return_result_queue)
34 
35     # 綁定端口5000,設置驗證碼abc
36     manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')
37 
38     # 啟動queue
39     manager.start()
40 
41     # 獲得通過網絡訪問的Queue對象
42     task = manager.get_task_queue()
43     result = manager.get_result_queue()
44 
45     # 放幾個任務
46     for i in range(10):
47         n = random.randint(0, 1000)
48         print('添加任務 %d' % n)
49         task.put(n)
50 
51     # 從result隊列讀取結果
52     print('嘗試獲取結果')
53     for i in range(10):
54         r = result.get(timeout=10)
55         print('結果是:%s' % r)
56 
57     # 關閉
58     manager.shutdown()
59     print('master exit')

運行結果:

歡迎各位來和我一起分享技術,交流學習心得


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM