1、開啟多進程的簡單示例,處理函數無帶參數

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing def worker(): print('工作中') if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=worker) p.start()
運行效果
[root@ mnt]# python3 multiprocessing_simple.py
工作中
工作中
工作中
工作中
工作中
2、開啟多進程的簡單示例,處理函數有帶參數

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing def worker(num): print('工作id: %s' % num) if __name__ == '__main__': for i in range(5): p = multiprocessing.Process(target=worker, args=(i,)) p.start()
運行效果
[root@ mnt]# python3 multiprocessing_simple_args.py 工作id: 1 工作id: 2 工作id: 3 工作id: 4 工作id: 0
3、多進程處理導入模塊里面的任務

#!/usr/bin/env python # -*- coding: utf-8 -*- def worker(): print('工作中') return

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import multiprocessing_import_worker if __name__ == '__main__': for i in range(5): p = multiprocessing.Process( target=multiprocessing_import_worker.worker, ) p.start()
運行效果
[root@ mnt]# python3 multiprocessing_import_main.py
工作中
工作中
工作中
工作中
工作中
4、多進程自定義進程名字

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import logging import time logging.basicConfig( level=logging.DEBUG, format="(%(threadName)-10s) %(message)s", ) def worker(): name = multiprocessing.current_process().name logging.debug('%s 開始' % name) time.sleep(3) logging.debug('%s 結束' % name) def my_service(): name = multiprocessing.current_process().name logging.debug('%s 開始' % name) time.sleep(3) logging.debug('%s 結束' % name) if __name__ == '__main__': service = multiprocessing.Process( name='my_service', target=my_service, ) worker_1 = multiprocessing.Process( name='worker_1', target=worker, ) worker_2 = multiprocessing.Process( target=worker, ) service.start() worker_1.start() worker_2.start()
運行結果
[root@ mnt]# python3 multiprocessing_names.py (MainThread) worker_1 開始 (MainThread) Process-3 開始 (MainThread) my_service 開始 (MainThread) worker_1 結束 (MainThread) Process-3 結束 (MainThread) my_service 結束
5、守護進程無等待的方式

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time import logging logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) def daemon(): p = multiprocessing.current_process() logging.debug('%s %s 開始' % (p.name, p.pid)) time.sleep(2) logging.debug('%s %s 結束' % (p.name, p.pid)) def no_daemon(): p = multiprocessing.current_process() logging.debug('%s %s 開始' % (p.name, p.pid)) logging.debug('%s %s 結束' % (p.name, p.pid)) if __name__ == '__main__': daemon_obj = multiprocessing.Process( target=daemon, name='daemon' ) daemon_obj.daemon = True no_daemon_obj = multiprocessing.Process( target=no_daemon, name='no_daemon' ) no_daemon_obj.daemon = False daemon_obj.start() time.sleep(1) no_daemon_obj.start()
運行結果
[root@ mnt]# python3 multiprocessing_daemon.py (MainThread) daemon 21931 開始 (MainThread) no_daemon 21932 開始 (MainThread) no_daemon 21932 結束
6、守護進程等待所有進程執行完成

運行效果
[root@ mnt]# python3 multiprocessing_daemon_join.py (MainThread) daemon 21948 開始 (MainThread) no_daemon 21949 開始 (MainThread) no_daemon 21949 結束 (MainThread) daemon 21948 結束
7、守護進程設置等待超時時間

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time import logging logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) def daemon(): p = multiprocessing.current_process() logging.debug('%s %s 開始' % (p.name, p.pid)) time.sleep(2) logging.debug('%s %s 結束' % (p.name, p.pid)) def no_daemon(): p = multiprocessing.current_process() logging.debug('%s %s 開始' % (p.name, p.pid)) logging.debug('%s %s 結束' % (p.name, p.pid)) if __name__ == '__main__': daemon_obj = multiprocessing.Process( target=daemon, name='daemon' ) daemon_obj.daemon = True no_daemon_obj = multiprocessing.Process( target=no_daemon, name='no_daemon' ) no_daemon_obj.daemon = False daemon_obj.start() time.sleep(1) no_daemon_obj.start() daemon_obj.join(1) logging.debug('daemon_obj.is_alive():%s' % daemon_obj.is_alive()) no_daemon_obj.join()
運行效果
[root@ mnt]# python3 multiprocessing_daemon_join_timeout.py (MainThread) daemon 21997 開始 (MainThread) no_daemon 21998 開始 (MainThread) no_daemon 21998 結束 (MainThread) daemon_obj.is_alive():True
8、進程的終止,注意:terminate的時候,需要使用join()進程,保證進程成功終止

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time import logging logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) def slow_worker(): print('開始工作') time.sleep(0.1) print('結束工作') if __name__ == '__main__': p = multiprocessing.Process( target=slow_worker ) logging.debug('開始之前的狀態%s' % p.is_alive()) p.start() logging.debug('正在運行的狀態%s' % p.is_alive()) p.terminate() logging.debug('調用終止進程的狀態%s' % p.is_alive()) p.join() logging.debug('等待所有進程運行完成,狀態%s' % p.is_alive())
運行結果
[root@ mnt]# python3 multiprocessing_terminate.py
(MainThread) 開始之前的狀態False
(MainThread) 正在運行的狀態True
(MainThread) 調用終止進程的狀態True
(MainThread) 等待所有進程運行完成,狀態False
9、進程退出狀態碼

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import sys import time def exit_error(): sys.exit(1) def exit_ok(): return def return_value(): return 1 def raises(): raise RuntimeError('運行時的錯誤') def terminated(): time.sleep(3) if __name__ == '__main__': jobs = [] funcs = [ exit_error, exit_ok, return_value, raises, terminated, ] for func in funcs: print('運行進程的函數名 %s' % func.__name__) j = multiprocessing.Process( target=func, name=func.__name__ ) jobs.append(j) j.start() jobs[-1].terminate() for j in jobs: j.join() print('{:>15}.exitcode={}'.format(j.name, j.exitcode))
運行效果
[root@ mnt]# python3 multiprocessing_exitcode.py 運行進程的函數名 exit_error 運行進程的函數名 exit_ok 運行進程的函數名 return_value 運行進程的函數名 raises 運行進程的函數名 terminated Process raises: exit_error.exitcode=1 exit_ok.exitcode=0 return_value.exitcode=0 Traceback (most recent call last): File "/usr/local/Python-3.6.6/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/local/Python-3.6.6/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "multiprocessing_exitcode.py", line 25, in raises raise RuntimeError('運行時的錯誤') RuntimeError: 運行時的錯誤 #注意的是,拋出異常,退出碼默認是1 raises.exitcode=1 terminated.exitcode=-15
10、多進程全局日志的開啟

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import logging import sys def worker(): print('工作中...') sys.stdout.flush() if __name__ == '__main__': multiprocessing.log_to_stderr(logging.DEBUG) p = multiprocessing.Process(target=worker, ) p.start() p.join()
運行效果
[root@ mnt]# python3 multiprocessing_log_to_stderr.py [INFO/Process-1] child process calling self.run() 工作中... [INFO/Process-1] process shutting down [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0 [DEBUG/Process-1] running the remaining "atexit" finalizers [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 [DEBUG/MainProcess] running the remaining "atexit" finalizers
11、多進程日志開啟之設置日志的顯示級別

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import logging import sys def worker(): print('工作中...') sys.stdout.flush() if __name__ == '__main__': multiprocessing.log_to_stderr() logger = multiprocessing.get_logger() logger.setLevel(logging.INFO) p = multiprocessing.Process(target=worker, ) p.start() p.join()
測試效果
[root@ mnt]# python3 multiprocessing_get_logger.py [INFO/Process-1] child process calling self.run() 工作中... [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down
12、利用繼承multiprocessing.Process類,實現無參的多進程

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import logging import sys class Worker(multiprocessing.Process): def run(self): print('當前運行進程名字: %s' % self.name) if __name__ == '__main__': jobs = [] for i in range(5): p = Worker() jobs.append(p) p.start() for j in jobs: j.join()
運行效果
[root@ mnt]# python3 multiprocessing_subclass.py 當前運行進程名字: Worker-2 當前運行進程名字: Worker-3 當前運行進程名字: Worker-4 當前運行進程名字: Worker-5 當前運行進程名字: Worker-1
13、多進程隊列multiprocessing.Queue()的使用

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing class MyFancyClass(object): def __init__(self, name): self.name = name def do_something(self): proc_name = multiprocessing.current_process().name print('當前進程名字: %s,當前實例化初始名字:%s' % (proc_name, self.name)) def worker(q): obj = q.get() obj.do_something() if __name__ == '__main__': queue = multiprocessing.Queue() #開啟進程並且傳進隊列的實例化對象,此時隊列是空,所以會阻塞等數據的到來 p = multiprocessing.Process( target=worker, args=(queue,) ) p.start() #往隊列增加數據 queue.put(MyFancyClass('Mrs Suk')) queue.close() #隊列等待進程處理完成 queue.join_thread() p.join()
運行效果
[root@ mnt]# python3 multiprocessing_queue.py 當前進程名字: Process-1,當前實例化初始名字:Mrs Suk
14、多進程隊列multiprocessing.JoinableQueue()的使用,示例:實現數字乘法運算,並且把結果存入隊列中,最后再從隊列中取出打印出來

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time class Consumer(multiprocessing.Process): """消費者類""" def __init__(self, task_queue, result_queue, *args, **kwargs): super(Consumer, self).__init__(*args, **kwargs) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name # 獲取進程名字 while True: next_task = self.task_queue.get() if next_task is None: #如果獲取到對象為空的話,則隊列已經退出 print('%s 退出' % proc_name) self.task_queue.task_done() break print('{}:{}'.format(proc_name, next_task)) answer = next_task() # 這里會調用_Task類_call__方法 self.task_queue.task_done() #處理完成,向隊列發送task_done(),讓該隊列不要在join,如果沒有發送task_done(),則隊列一直是join self.result_queue.put(answer) # 將運行結果放在results隊列中 class Task(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self, *args, **kwargs): time.sleep(0.1) return '{self.a} * {self.b} = {product}'.format(self=self, product=self.a * self.b) def __str__(self): return '{self.a} * {self.b}'.format(self=self) if __name__ == '__main__': # 隊列比Queue多了兩個方法,task_done(),join() tasks = multiprocessing.JoinableQueue() # 結果存放的隊列 results = multiprocessing.Queue() # 獲取電腦CPU核數 num_consumers = multiprocessing.cpu_count() * 2 print('創建{}位消費者'.format(num_consumers)) consumers = [ Consumer(tasks, results) for i in range(num_consumers) ] # 開啟消費者多進程 for w in consumers: w.start() # 往排隊隊列增加數據 num_jobs = 10 for i in range(10): tasks.put(Task(i, i)) # 往每一個消費隊列設置默認值 None for i in range(num_consumers): tasks.put(None) # 等待所有的任務完成 tasks.join() # 打印處理的結果 while num_jobs: result = results.get() print('運算結果:', result) num_jobs -= 1
運行結果
[root@ mnt]# python3 multiprocessing_producer_consumer.py 創建2位消費者 #因為測試機只有2核,所以產生兩位消費者 Consumer-1:0 * 0 Consumer-2:1 * 1 Consumer-1:2 * 2 Consumer-2:3 * 3 Consumer-1:4 * 4 Consumer-2:5 * 5 Consumer-1:6 * 6 Consumer-2:7 * 7 Consumer-1:8 * 8 Consumer-2:9 * 9 Consumer-1 退出 Consumer-2 退出 運算結果: 1 * 1 = 1 運算結果: 0 * 0 = 0 運算結果: 2 * 2 = 4 運算結果: 3 * 3 = 9 運算結果: 5 * 5 = 25 運算結果: 4 * 4 = 16 運算結果: 7 * 7 = 49 運算結果: 6 * 6 = 36 運算結果: 8 * 8 = 64 運算結果: 9 * 9 = 81
15、多進程事件設置

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time def wait_for_event(event_obj): print('無超時等待事件開始') event_obj.wait() print('阻塞事件狀態:', event_obj.is_set()) def wait_for_event_timeout(event_obj, timeout): print('設置超時等待事件開始') event_obj.wait(timeout) print('非阻塞事件狀態:', event_obj.is_set()) if __name__ == '__main__': event_obj = multiprocessing.Event() block_task = multiprocessing.Process( name='block_task', target=wait_for_event, args=(event_obj,) ) block_task.start() non_block_task = multiprocessing.Process( name='non_block_task', target=wait_for_event_timeout, args=(event_obj, 2) ) non_block_task.start() print('等待3秒,讓所有進程都正常開啟') time.sleep(3) event_obj.set() print('設置事件狀態為set()=True')
運行效果
[root@ mnt]# python3 multiprocessing_event.py 等待3秒,讓所有進程都正常開啟 設置超時等待事件開始 無超時等待事件開始 非阻塞事件狀態: False 設置事件狀態為set()=True 阻塞事件狀態: True
16、多進程資源控制訪問,鎖的使用

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import sys def worker_with(lock, stream): with lock: stream.write('通過with獲取得到鎖\n') def worker_no_with(lock, stream): lock.acquire() try: stream.write('通過lock.acquire()獲取得到鎖\n') finally: lock.release() if __name__ == '__main__': lock = multiprocessing.Lock() w = multiprocessing.Process( target=worker_with, args=(lock, sys.stdout,) ) nw = multiprocessing.Process( target=worker_no_with, args=(lock, sys.stdout,) ) w.start() nw.start() w.join() nw.join()
運行效果
[root@ mnt]# python3 multiprocessing_lock.py
通過lock.acquire()獲取得到鎖
通過with獲取得到鎖
17、多進程multiprocessing.Condition()同步

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time def task_1(condition_obj): proc_name = multiprocessing.current_process().name print('開始 %s' % proc_name) with condition_obj: print('%s運行結束,開始運行task_2' % proc_name) condition_obj.notify_all() def task_2(condition_obj): proc_name = multiprocessing.current_process().name print('開始 %s' % proc_name) with condition_obj: condition_obj.wait() print('task_2 %s 運行結束' % proc_name) if __name__ == '__main__': condition_obj = multiprocessing.Condition() s1 = multiprocessing.Process(name='s1', target=task_1, args=(condition_obj,)) s2_clients = [ multiprocessing.Process( name='task_2[{}]'.format(i), target=task_2, args=(condition_obj,), ) for i in range(1, 3) ] for c in s2_clients: c.start() time.sleep(1) s1.start() s1.join() for c in s2_clients: c.join()
運行效果
[root@ mnt]# python3 multiprocessing_condition.py 開始 task_2[1] 開始 task_2[2] 開始 s1 s1運行結束,開始運行task_2 task_2 task_2[1] 運行結束 task_2 task_2[2] 運行結束
18、利用multiprocessing.Semaphore()自定義控制資源的並發訪問

#!/usr/bin/env python # -*- coding: utf-8 -*- import random import multiprocessing import time class ActivePool: def __init__(self, *args, **kwargs): super(ActivePool, self).__init__(*args, **kwargs) self.mgr = multiprocessing.Manager() self.active = self.mgr.list() self.lock = multiprocessing.Lock() def makeActive(self, name): with self.lock: self.active.append(name) def makeInactive(self, name): with self.lock: self.active.remove(name) def __str__(self): with self.lock: return str(self.active) def worker(s, pool): name = multiprocessing.current_process().name with s: pool.makeActive(name) print('Activating {} now running {}'.format( name, pool)) time.sleep(random.random()) pool.makeInactive(name) if __name__ == '__main__': pool = ActivePool() s = multiprocessing.Semaphore(3) jobs = [ multiprocessing.Process( target=worker, name=str(i), args=(s, pool), ) for i in range(10) ] for j in jobs: j.start() while True: alive = 0 for j in jobs: if j.is_alive(): alive += 1 j.join(timeout=0.1) print('Now running {}'.format(pool)) if alive == 0: # all done break
運行效果
[root@ mnt]# python3 multiprocessing_semaphore.py Activating 9 now running ['9'] Activating 5 now running ['9', '5'] Activating 4 now running ['9', '5', '4'] Activating 1 now running ['9', '5', '1'] Now running ['9', '5', '1'] Now running ['9', '5', '1'] Now running ['9', '5', '1'] Now running ['9', '5', '1'] Activating 2 now running ['9', '1', '2'] Now running ['9', '1', '2'] Now running ['9', '1', '2'] Now running ['9', '1', '2'] Now running ['9', '1', '2'] Activating 6 now running ['9', '2', '6'] Now running ['9', '2', '6'] Now running ['9', '2', '6'] Activating 7 now running ['2', '6', '7'] Activating 8 now running ['2', '7', '8'] Now running ['2', '7', '8'] Now running ['2', '7', '8'] Now running ['2', '7', '8'] Now running ['2', '7', '8'] Activating 3 now running ['7', '8', '3'] Now running ['7', '8', '3'] Now running ['7', '8', '3'] Activating 0 now running ['7', '3', '0'] Now running ['7', '0'] Now running ['7'] Now running ['7'] Now running ['7'] Now running []
19、多進程multiprocessing.Manager()共享字典或列表數據

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing def worker(dict_obj, key, value): dict_obj[key] = value if __name__ == '__main__': #創建一個多進程共享的字典,所有進程都能看到字典的內容 mgr = multiprocessing.Manager() mgr_dict = mgr.dict() jobs = [ multiprocessing.Process( target=worker, args=(mgr_dict, i, i * 2), ) for i in range(10) ] #開啟worker任務 for j in jobs: j.start() ##等待worker任務執行完成 for j in jobs: j.join() print('運行結果:', mgr_dict)
運行效果
[root@ mnt]# python3 multiprocessing_manager_dict.py 運行結果: {5: 10, 6: 12, 1: 2, 2: 4, 3: 6, 7: 14, 8: 16, 9: 18, 4: 8, 0: 0}
20、多進程multiprocessing.Manager()共享命名空間,字符串類型:全局可以獲得值

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time def producer(namespace_obj, event): """生產者""" namespace_obj.value = '命名空間設置的值:1234' event.set() def consumer(namespace_obj, event): """"消費者""" """ 生產者和消費者首次進程開啟的時候, namespace_obj.value不存在,所以會拋異常, 當生產者事件設置set()的時候, 消費者event.wait()不阻塞,繼續執行后面的結果 """ try: print('進程事件前的值: {}'.format(namespace_obj.value)) except Exception as err: print('進程事件前錯誤:', str(err)) event.wait() print('進程事件后的值:', namespace_obj.value) if __name__ == '__main__': # 創建一個共享管理器 mgr = multiprocessing.Manager() # 創建一個命名空間類型共享類型 namespace = mgr.Namespace() # 創建多進程的事件 event = multiprocessing.Event() p = multiprocessing.Process( target=producer, args=(namespace, event), ) c = multiprocessing.Process( target=consumer, args=(namespace, event), ) c.start() time.sleep(1) p.start() c.join() p.join()
運行效果
[root@ mnt]# python3 multiprocessing_namespace.py 進程事件前錯誤: 'Namespace' object has no attribute 'value' 進程事件后的值: 命名空間設置的值:1234
21、多進程multiprocessing.Manager()共享命名空間,列表類型:全局不可以獲得值

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import time def producer(namespace_obj, event): """生產者""" namespace_obj.my_list.append('命名空間設置的值:1234') event.set() def consumer(namespace_obj, event): """"消費者""" """ 生產者和消費者首次進程開啟的時候, namespace_obj.value不存在,所以會拋異常, 當生產者事件設置set()的時候, 消費者event.wait()不阻塞,繼續執行后面的結果 """ try: print('進程事件前的值: {}'.format(namespace_obj.my_list)) except Exception as err: print('進程事件前錯誤:', str(err)) event.wait() print('進程事件后的值:', namespace_obj.my_list) if __name__ == '__main__': # 創建一個共享管理器 mgr = multiprocessing.Manager() # 創建一個命名空間類型共享類型 namespace = mgr.Namespace() # 如果是列表類型,不是能全局更換列表 namespace.my_list = [] # 創建多進程的事件 event = multiprocessing.Event() p = multiprocessing.Process( target=producer, args=(namespace, event), ) c = multiprocessing.Process( target=consumer, args=(namespace, event), ) c.start() p.start() c.join() p.join()
運行效果
[root@ mnt]# python3 multiprocessing_namespace_mutable.py
進程事件前的值: []
進程事件后的值: []
22、進程池之列表數字的運算

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing def do_calculation(data): return data * 2 def start_process(): print('進程開始', multiprocessing.current_process().name) if __name__ == '__main__': inputs = list(range(10)) print('inputs :', inputs) #使用內置的map方法運算 builtin_outputs = map(do_calculation, inputs) print('Built-in:', list(builtin_outputs)) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool( processes=pool_size, initializer=start_process, ) #使用進程池進行運算 pool_outputs = pool.map(do_calculation, inputs) pool.close() pool.join() print('Pool :', pool_outputs)
運行效果
[root@ mnt]# python3 multiprocessing_pool.py inputs : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 進程開始 ForkPoolWorker-2 進程開始 ForkPoolWorker-1 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
23、進程池設置一個進程最多運行多少次(maxtasksperchild)就執行重啟進程,作用:避免工作進程長時間運行消耗很多的系統資源

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing def do_calculation(data): return data * 2 def start_process(): print('進程開始', multiprocessing.current_process().name) if __name__ == '__main__': inputs = list(range(100)) print('inputs :', inputs) # 使用內置的map方法運算 builtin_outputs = map(do_calculation, inputs) print('Built-in:', list(builtin_outputs)) pool_size = multiprocessing.cpu_count() * 2 pool = multiprocessing.Pool( processes=pool_size, initializer=start_process, maxtasksperchild=2 ) # 使用進程池進行運算 pool_outputs = pool.map(do_calculation, inputs) pool.close() pool.join() print('Pool :', pool_outputs)
運行效果
[root@ mnt]# python3 multiprocessing_pool_maxtasksperchild.py inputs : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: [0, 2, 4, 6, 8, 10, 12, 14, 16, 18] 進程開始 ForkPoolWorker-2 進程開始 ForkPoolWorker-1 進程開始 ForkPoolWorker-4 進程開始 ForkPoolWorker-3 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
24、利用多進程的進程池實例MapReduce,下面示例簡單:讀取文件內容,分詞計數器

#!/usr/bin/env python # -*- coding: utf-8 -*- import collections import itertools import multiprocessing class SimpleMapReduce: def __init__(self, map_func, reduce_func, num_workers=None): """ :param map_func: 會調用file_to_words(filename)函數 :param reduce_func: 會調用count_words(item)的函數 :param num_workers: """ self.map_func = map_func self.reduce_func = reduce_func self.pool = multiprocessing.Pool(num_workers) def partition(self, mapped_values): """包裝一個字典集合""" partitioned_data = collections.defaultdict(list) for key, value in mapped_values: partitioned_data[key].append(value) return partitioned_data.items() def __call__(self, inputs, chunksize=1): """ :param inputs:文件名 :param chunksize: 處理塊的大小 :return: """ #這里返回值是:[(word,1)...] map_responses = self.pool.map( self.map_func, inputs, chunksize=chunksize, ) # 返回的是collections.defaultdict().items()的key,value partitioned_data = self.partition( itertools.chain(*map_responses) ) #將包組好的dict_items()對象,調用傳入count_words(item)的item里面,這樣子,就可以使聚合函數sum()生效 reduced_values = self.pool.map( self.reduce_func, partitioned_data, ) return reduced_values

#!/usr/bin/env python # -*- coding: utf-8 -*- import multiprocessing import string from multiprocessing_mapreduce import SimpleMapReduce def file_to_words(filename): """作用:讀取文件內容,分詞+計數""" # 怱略統計字符串集合 STOP_WORDS = set([ 'a', 'an', 'and', 'are', 'as', 'be', 'by', 'for', 'if', 'in', 'is', 'it', 'of', 'or', 'py', 'rst', 'that', 'the', 'to', 'with', ]) TR = str.maketrans({ p: ' ' for p in string.punctuation }) print('進程:{} 讀取文件名:{}'.format(multiprocessing.current_process().name, filename)) output = [] with open(filename, 'rt', encoding='utf-8') as f: for line in f: #怱略注釋..開頭 if line.lstrip().startswith('..'): continue line = line.translate(TR) # 去除TR包含的符號 for word in line.split():#通過空格分割 word = word.lower() if word.isalpha() and word not in STOP_WORDS: output.append((word, 1)) return output def count_words(item): """詞的聚合函數求合""" word, occurences = item return (word, sum(occurences)) if __name__ == '__main__': import operator import glob #搜索當前文件,后綴為*.rst結尾的文件 input_files = glob.glob('*.rst') #實例化一個MapReduce對象 mapper = SimpleMapReduce(file_to_words, count_words) word_counts = mapper(input_files) #這里會調用SimpleMapReduce類里面的__call__方法 word_counts.sort(key=operator.itemgetter(1)) #獲取word_counts的下標為1,作為排序 word_counts.reverse() #倒序 print('\nTOP 20 WORDS BY FREQUENCY\n') top20 = word_counts[:20] longest = max(len(word) for word, count in top20) for word, count in top20: print('{word:<{len}}: {count:5}'.format( len=longest + 1, word=word, count=count) )

If there is a relationship() from Parent to Child, but there is not a reverse-relationship that links a particular Child to each Parent, SQLAlchemy will not have any awareness that when deleting this particular Child object, it needs to maintain the “secondary” table that links it to the Parent. No delete of the “secondary” table will occur. If there is a relationship that links a particular Child to each Parent, suppose it’s called Child.parents, SQLAlchemy by default will load in the Child.parents collection to locate all Parent objects, and remove each row from the “secondary” table which establishes this link. Note that this relationship does not need to be bidirectional; SQLAlchemy is strictly looking at every relationship() associated with the Child object being deleted. A higher performing option here is to use ON DELETE CASCADE directives with the foreign keys used by the database. Assuming the database supports this feature, the database itself can be made to automatically delete rows in the “secondary” table as referencing rows in “child” are deleted. SQLAlchemy can be instructed to forego actively loading in the Child.parents collection in this case using the passive_deletes directive on relationship(); see Using Passive Deletes for more details on this. Note again, these behaviors are only relevant to the secondary option used with relationship(). If dealing with association tables that are mapped explicitly and are not present in the secondary option of a relevant relationship(), cascade rules can be used instead to automatically delete entities in reaction to a related entity being deleted - see Cascades for information on this feature.
運行效果
[root@python-mysql mnt]# python3 multiprocessing_wordcount.py 進程:SpawnPoolWorker-1 讀取文件名:test.rst TOP 20 WORDS BY FREQUENCY child : 8 relationship : 8 this : 7 parent : 5 on : 4 delete : 4 table : 4 sqlalchemy : 4 not : 4 can : 3 database : 3 used : 3 option : 3 deleted : 3 parents : 3 will : 3 each : 3 particular : 3 links : 3 there : 3