Python并发复习3 - 多进程模块 multiprocessing
Python标准库为我们提供了threading(多线程模块)和multiprocessing(多进程模块)。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。
核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核,可以利用multiprocessing实现真正的平行计算。
一 、进程的调用
1.1 函数式调用
1 from multiprocessing import Process 2 import time 3 def f(name): 4 time.sleep(1) 5 print('hello', name,time.ctime()) 6 7 if __name__ == '__main__': 8 p_list=[] 9 for i in range(3): 10 p = Process(target=f, args=('alvin',)) 11 p_list.append(p) 12 p.start() 13 for i in p_list: 14 p.join() 15 print('end')
1.2 类调用
from multiprocessing import Process import time class MyProcess(Process): def __init__(self): super(MyProcess, self).__init__() #self.name = name def run(self): time.sleep(1) print ('hello', self.name,time.ctime()) if __name__ == '__main__': p_list=[] for i in range(3): p = MyProcess() p.start() p_list.append(p) for p in p_list: p.join() print('end')
二 、Process类
构造方法:
Process([group [, target [, name [, args [, kwargs]]]]])
group: 线程组,目前还没有实现,库引用中提示必须是None;
target: 要执行的方法;
name: 进程名;
args/kwargs: 要传入方法的参数。
实例方法:
is_alive():返回进程是否在运行。
join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。
start():进程准备就绪,等待CPU调度
run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。
terminate():不管任务是否完成,立即停止工作进程
属性:
daemon:和线程的setDeamon功能一样
name:进程名字。
pid:进程号。
三 、进程间通讯
1、进程对列Queue
---------- 一个流水线,各个工人共享主线程流水线产品队列数据
2、 管道pipe
1 from multiprocessing import Process, Pipe 2 3 def func(contact): 4 contact.send("这是管道测试信息") 5 contact.close() 6 7 if __name__ == '__main__': 8 a_con, b_con = Pipe() 9 p = Process(target=func, args=(a_con,)) 10 print(b_con.recv()) 11 b_con.send("管道返回信息")
3、manage
--- Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构,适用于多个进程不是源于同一个父进程的情形。
原理是:先启动一个ManagerServer进程,这个进程是阻塞的,它监听一个socket,然后其他进程(ManagerClient)通过socket来连接到ManagerServer,实现通信。
1 from multiprocessing import Process, Manager 2 from time import sleep 3 4 5 def thread_a_main(sync_data_pool): # A 进程主函数,存入100+的数 6 for ix in range(100, 105): 7 sleep(1) 8 sync_data_pool.append(ix) 9 10 11 def thread_b_main(sync_data_pool): # B 进程主函数,存入300+的数 12 for ix in range(300, 309): 13 sleep(0.6) 14 sync_data_pool.append(ix) 15 16 17 def _test_case_000(): # 测试用例 18 manager = Manager() # multiprocessing 中的 Manager 是一个工厂方法,直接获取一个 SyncManager 的实例 19 sync_data_pool = manager.list() # 利用 SyncManager 的实例来创建同步数据池 20 Process(target=thread_a_main, args=(sync_data_pool, )).start() # 创建并启动 A 进程 21 Process(target=thread_b_main, args=(sync_data_pool, )).start() # 创建并启动 B 进程 22 for ix in range(6): # C 进程(主进程)中实时的去查看数据池中的数据 23 sleep(1) 24 print(sync_data_pool) 25 26 27 if '__main__' == __name__: 28 _test_case_000()
四 、进程同步
1 from multiprocessing import Process, Lock 2 3 def f(l, i): 4 5 with l.acquire(): 6 print('hello world %s'%i) 7 8 if __name__ == '__main__': 9 lock = Lock() 10 11 for num in range(10): 12 Process(target=f, args=(lock, num)).start()