Python_多进程
Python 多进程库 multiprocessing ,支持子进程、通信、数据共享、执行不同形式的同步
多进程,绕过gil ,实现多核的利用,多进程也是原生进程,由操作系统维护
在pycharm中,可能没有办法正常使用multiprocessing.Process,最好是在Linux中运行
Process | 用于创建进程模块 |
Pool | 用于创建管理进程池 |
Queue | 用于进程通信,资源共享 |
Pipe | 用于管道通信 |
Manager | 用于资源共享,同步进程 |
1.Process类
Process(group = None,target =None,name=None, args=[ ], kwargs={ })
group | 线程组 |
target | 要执行的方法 |
name | 进程名 |
args/kwargs | 要传入方法的参数 |
process属性&方法:
authkey | 进程的身份验证密钥 |
daemon | 同thread的setDaemon,守护进程 |
exitcode | 进程运行时为None,若为—N,则表示被信号N结束 |
pid | 进程号 |
name | 进程名 |
is_alive() | 返回进程是否正在运行 |
join([timeout]) | 阻塞到线程结束或到timeout值 |
start() | 进程准备就绪,等待CPU调度 |
run() | start()调用run方法,如果实例进程时未制定传入target,start执行默认run()方法。 |
terminate() | 不管任务是否完成,立即停止工作进程 |
多进程的创建:
#!/usr/bin/python # -*- coding:utf-8 -*- '''多进程的创建''' from multiprocessing import Process import time def fun(name): time.sleep(1) print('hello,%s' % name) print('----') if __name__ =='__main__': for i in range(5): # 进程同步 p = Process(target=fun, args=('Presley',)) p.start() p.join() print('结束。')多进程
进程id :
#!/usr/bin/python3 # -*- coding:utf-8 -*- from multiprocessing import Process import os def info(title): print(title) print('moudle name :',__name__) print('parent process id ', os.getppid()) print('process id ', os.getpid()) if __name__ =='__main__': info('hei. ') # pycharm id和 主进程id for i in range(3): p = Process(target=info, args=('Presley',)) # 主进程id 和 info 子进程id p.start() p.join()多进程id
hei. moudle name : __main__ parent process id 1610 process id 1826 Presley moudle name : __main__ parent process id 1826 process id 1827 Presley moudle name : __main__ parent process id 1826 process id 1828 Presley moudle name : __main__ parent process id 1826 process id 1829result
2.Queue类
不同进程间内存是不共享的,想要实现两个进程间的数据交换,可以用Queue进行进程间通讯
queue是在多进程中做了一层封装的队列,以保证在当前进程里进程安全
方法:queue
进程中的队,以保证进程安全
from multiprocessing import Process,Queue def info(q): # global q # 错误,queue中 ,global 不行,因为子进程无法访问父进程的内存数据 q.put([34, None, 'yes']) if __name__ =='__main__': q = Queue() for i in range(3): p = Process(target=info, args=[q,]) # 多个子进程的数据可以都可以放父进程数据 p.start() print('来自父进程%s:%s'%(i, q.get())) p.join()多进程_queue
来自父进程0:[34, None, 'yes'] 来自父进程1:[34, None, 'yes'] 来自父进程2:[34, None, 'yes']result
3.Pipe类
管道操作(双向队列):会返回一对对象,管道的两端分别赋给子进程和父进程
和队列操作差不多,所以一般运用队列较多
方法:
send() | 发送序列 |
recv() | 接收序列 |
fileno() | 返回一个整型的文件描述符 |
close() | 退出 |
poll() | 判断子进程是否结束 |
send_bytes() | 以bytes格式发送序列 |
recv_bytes() | 以bytes格式接收序列 |
from multiprocessing import Process,Pipe import time def info(conn): time.sleep(0.5) conn.send([32,None,'ni hao wa']) conn.close() if __name__=='__main__': conn_parent ,conn_child = Pipe() print(conn_parent.fileno()) for i in range(3): p = Process(target=info,args=(conn_child,)) print(bool(conn_child.poll)) # 进程是否结束 p.start() # 如果没有消息可接收,recv方法会一直阻塞。如果管道已经被关闭,那么recv方法会抛出EOFError。 print('父端接收%s:%s'% (i,conn_parent.recv())) p.join()多进程_Pipe
200 True 父端接收0:[32, None, 'ni hao wa'] True 父端接收1:[32, None, 'ni hao wa'] True 父端接收2:[32, None, 'ni hao wa']result
4.Manager
通过Manager可以简单的使用list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barries,Value+Arrary等类型的高级接口
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全
例:对list,dict的应用例子:
#!/usr/bin/python3 # -*- coding:utf-8 -*- from multiprocessing import Process,Manager def fun(d,l,n): d[2] = '3' d['e'] = 'e' d[34] = None l.append(n) print(l) if __name__ == '__main__': with Manager() as manager: d = manager.dict() l = manager.list() join_list = [] for i in range(6): p = Process(target=fun, args=(d,l,i)) p.start() join_list.append(p) for res in join_list: res.join() print(l) print(d)example
[5] [5, 2] [5, 2, 3] [5, 2, 3, 0] [5, 2, 3, 0, 4] [5, 2, 3, 0, 4, 1] [5, 2, 3, 0, 4, 1] [5, 2, 3, 0, 4, 1] [5, 2, 3, 0, 4, 1] [5, 2, 3, 0, 4, 1] [5, 2, 3, 0, 4, 1] [5, 2, 3, 0, 4, 1] {2: '3', 'e': 'e', 34: None}result
Manager的详细参考:https://www.aliyun.com/jiaocheng/490316.html
5.Pool 类(进程池)
当进程数过多时,用于限制进程数
异步:进程并行
同步:进程串行
方法:
apply_async(func,args,kwds,callback) |
进程异步,并行(func:执行一个函数,args/ dwds:进程参数,callback:Foo执行结果返回到callback执行的函数中) |
apply(func,args,kwds) | 进程同步,串行 |
close() | 关闭进程池 |
terminate() | 结束工作进程,不在处理未完成的任务 |
join() | 主进程阻塞,等待子进程执行完毕 |
from multiprocessing import Pool,freeze_support import time def Foo(i): time.sleep(1) print('exec..') return i+100 # 返回到Bar中 def Bar(arg): print('来自Foo 的i :',arg) # 接收 Foo中 的返回值 if __name__ == '__main__': freeze_support() # 仅在Windows上才导入此模块进程程序才不会出错,Linux上不用 pool = Pool(5) # 限制每次进行的进程数为 5 for i in range(10): pool.apply_async(func=Foo, args=(i,),callback=Bar) # 进程异步 # callback 把前面func的放在Bar中打印 # pool.apply(func=Foo, args=(i,)) # 同步,串行 # 没有callback属性 print('结束。。') pool.close() # 注意:join必须放在close()后面,否则将不会等待子进程打印结束,而直接结束 pool.join()进程池
结束。。 exec.. exec.. exec.. exec.. exec.. 来自Foo 的i : 104 来自Foo 的i : 102 来自Foo 的i : 103 来自Foo 的i : 100 来自Foo 的i : 101 exec.. exec.. exec.. exec.. exec.. 来自Foo 的i : 105 来自Foo 的i : 106 来自Foo 的i : 107 来自Foo 的i : 108 来自Foo 的i : 109异步结果
exec.. exec.. exec.. exec.. exec.. exec.. exec.. exec.. exec.. exec.. 结束。。同步结果