Python并发复习3 - 多进程模块 multiprocessing

  

 

Python标准库为我们提供了threading(多线程模块)和multiprocessing(多进程模块)。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor 两个类,实现了对threading和multiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。

核心原理是:concurrent.futures会以子进程的形式,平行的运行多个python解释器,从而令python程序可以利用多核CPU来提升执行速度。由于子进程与主解释器相分离,所以他们的全局解释器锁也是相互独立的。每个子进程都能够完整的使用一个CPU内核,可以利用multiprocessing实现真正的平行计算。


一 、进程的调用

1.1  函数式调用

 1 from multiprocessing import Process
 2 import time
 3 def f(name):
 4     time.sleep(1)
 5     print('hello', name,time.ctime())
 6 
 7 if __name__ == '__main__':
 8     p_list=[]
 9     for i in range(3):
10         p = Process(target=f, args=('alvin',))
11         p_list.append(p)
12         p.start()
13     for i in p_list:
14         p.join()
15     print('end')

1.2 类调用

from multiprocessing import Process
import time

class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()
        #self.name = name

    def run(self):
        time.sleep(1)
        print ('hello', self.name,time.ctime())


if __name__ == '__main__':
    p_list=[]
    for i in range(3):
        p = MyProcess()
        p.start()
        p_list.append(p)

    for p in p_list:
        p.join()

    print('end')

 

二 、Process类 

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None; 
  target: 要执行的方法; 
  name: 进程名; 
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  daemon:和线程的setDeamon功能一样

  name:进程名字。

  pid:进程号。

三 、进程间通讯 

1、进程对列Queue

   ----------  一个流水线,各个工人共享主线程流水线产品队列数据

2、 管道pipe

 1 from multiprocessing import Process, Pipe
 2 
 3 def func(contact):
 4     contact.send("这是管道测试信息")
 5     contact.close()
 6     
 7 if __name__ == '__main__':
 8     a_con, b_con = Pipe()
 9     p = Process(target=func, args=(a_con,))
10     print(b_con.recv())
11     b_con.send("管道返回信息")

3、manage

 --- Manager是一种较为高级的多进程通信方式,它能支持Python支持的的任何数据结构,适用于多个进程不是源于同一个父进程的情形。

     原理是:先启动一个ManagerServer进程,这个进程是阻塞的,它监听一个socket,然后其他进程(ManagerClient)通过socket来连接到ManagerServer,实现通信。

 1 from multiprocessing import Process, Manager
 2 from time import sleep
 3 
 4 
 5 def thread_a_main(sync_data_pool):  # A 进程主函数,存入100+的数
 6     for ix in range(100, 105):
 7         sleep(1)
 8         sync_data_pool.append(ix)
 9 
10 
11 def thread_b_main(sync_data_pool):  # B 进程主函数,存入300+的数
12     for ix in range(300, 309):
13         sleep(0.6)
14         sync_data_pool.append(ix)
15 
16 
17 def _test_case_000():  # 测试用例
18     manager = Manager()  # multiprocessing 中的 Manager 是一个工厂方法,直接获取一个 SyncManager 的实例
19     sync_data_pool = manager.list()  # 利用 SyncManager 的实例来创建同步数据池
20     Process(target=thread_a_main, args=(sync_data_pool, )).start()  # 创建并启动 A 进程
21     Process(target=thread_b_main, args=(sync_data_pool, )).start()  # 创建并启动 B 进程
22     for ix in range(6):  # C 进程(主进程)中实时的去查看数据池中的数据
23         sleep(1)
24         print(sync_data_pool)
25 
26 
27 if '__main__' == __name__: 
28     _test_case_000()

四 、进程同步

 1 from multiprocessing import Process, Lock
 2 
 3 def f(l, i):
 4   
 5     with l.acquire():
 6         print('hello world %s'%i)
 7 
 8 if __name__ == '__main__':
 9     lock = Lock()
10 
11     for num in range(10):
12         Process(target=f, args=(lock, num)).start()

 

相关文章