文章目录
- 前言
- 一.多进程
- 1.fork()系统调用
- 2.OS模块
- 3.multiprocessing模块
- 4.进程池(multiprocessing Pool模块)
- 5.子进程(subprocess模块)
- 6.进程间通信
- 7.小结
- 7.1.学习小结
- 7.2.Python分布式进程报错:pickle模块不能序列化lambda函数
- 二.分布式进程
- 1.Python的分布式进程是什么
- 2.如何实现分布式进程
- 编写服务进程
- 编写任务进程
- 3.小结
前言
上一文章【Python】从入门到上头— 多线程(9)已经讲了Python线程和进程的区别 以及Python线程有一个GIL锁任何线程在执行前都需要获得该锁 因此Python多线程不能有效利用多核优势实现多任务
一.多进程
1.fork()系统调用
要让Python程序实现多进程(multiprocessing
),我们先了解操作系统
的相关知识。
Unix/Linux操作系统提供了一个
fork()系统调用
,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次
,因为操作系统自动把当前进程(称为父进程
)复制了一份(称为子进程
),然后,分别在父进程和子进程内返回。
- 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,
一个父进程可以fork出很多子进程
,所以,父进程要记下每个子进程的ID
,而子进程只需要调用getppid()就
可以拿到父进程的ID。
- 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,
2.OS模块
Python的os模块
封装了常见的系统调用
,其中就包括fork
,可以在Python程序中轻松创建子进程:
import osprint('Process (%s) start...' % os.getpid())# Only works on Unix/Linux/Mac:pid = os.fork()if pid == 0:print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))else:print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
由于Windows没有fork调用,上面的代码在Windows上无法运行
。而Mac系统是基于BSD(Unix的一种)内核
,所以,在Mac下运行是没有问题的,推荐大家用Mac学Python
!
- 有了
fork调用
,一个进程在接到新任务时就可以复制出一个子进程来处理新任务
,常见的Apache服务器
就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求
。
3.multiprocessing模块
如果你打算编写多进程的服务程序,Unix/Linux
无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?
由于Python是跨平台的,自然也应该提供一个
跨平台的多进程支持
。multiprocessing模块就是跨平台版本的多进程模块
。- multiprocessing模块提供了一个
Process类
来代表一个进程对象
- multiprocessing模块提供了一个
如:启动一个子进程并等待其结束:
from multiprocessing import Processimport os# 子进程要执行的代码def run_proc(name):print('Run child process %s (%s)...' % (name, os.getpid()))if __name__=='__main__':print('Parent process %s.' % os.getpid())p = Process(target=run_proc, args=('test',))print('Child process will start.')p.start()p.join()print('Child process end.')
- 创建子进程时,只需要创建一个
Process实例
,传入一个执行函数和函数的参数
,用start()
方法启动,这样创建进程比fork()
还要简单。- join()方法可以
等待子进程结束后再继续往下运行
,通常用于进程间的同步
。
- join()方法可以
4.进程池(multiprocessing Pool模块)
如果要启动大量的子进程,可以用进程池
的方式批量创建子进程:
from multiprocessing import Poolimport os, time, randomdef long_time_task(name):print('Run task %s (%s)...' % (name, os.getpid()))start = time.time()time.sleep(random.random() * 3)end = time.time()print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__ == '__main__':print('Parent process %s.' % os.getpid())#创建长度为4的进程池p = Pool(4)#循环启动进程,传入调用函数和参数for i in range(13):p.apply_async(long_time_task, args=(i,))print('Waiting for all subprocesses done...')#关闭进程池,等待进程池所有子进程执行完毕p.close()p.join()print('All subprocesses done.')
执行结果如下:
代码解读:
对Pool对象
调用 join()
方法会等待所有子进程执行完毕,调用join()之前必须先调用close(
),调用close()之后就不能继续添加新的Process了。
- 请注意输出的结果,
task 0,1,2,3 4 5
是立刻执行的,而task 6-14要等待前面某个task完成后才执行, 由于Pool的默认大小是CPU的物理核数
,我的电脑是6核的,你要提交至少7个以上子进程才能看到上面的等待效果。
,因此,创建13个子进程,最多同时执行6个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:
5.子进程(subprocess模块)
很多时候,子进程并不是自身,而是一个外部进程
。我们创建了子进程后,还需要控制子进程的输入和输出。
subprocess模块
可以让我们非常方便地启动一个子进程
,然后控制其输入和输出。
如: 在Python代码中运行命令(查询域名指定类型的解析记录)nslookup www.python.org
,这和命令行直接运行的效果是一样的:
import subprocessprint('$ nslookup www.baidu.com')r = subprocess.call(['nslookup', 'www.baidu.com'])print('Exit code:', r)
代码执行
命令行执行
如果子进程还需要输入,则可以通过communicate()
方法输入:
import subprocessprint('$ nslookup')p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)output, err = p.communicate(b'set q=mx\nbaidu.com\nexit\n')print(output.decode('gbk'))print('Exit code:', p.returncode)
上面的代码相当于在命令行执行命令nslookup,然后手动输入:
set q=mxbaidu.comexit
代码执行
命令行执行
6.进程间通信
Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes
等多种方式来交换数据。
- 以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
from multiprocessing import Process, Queueimport os, time, random# 写数据进程执行的代码:def write(q):print('Process to write: %s' % os.getpid())#循环写入队尾for value in ['1', '2', '3', '4', '5']:print('Put %s to queue...' % value)q.put(value)time.sleep(random.random())# 读数据进程执行的代码:def read(q):print('Process to read: %s' % os.getpid())#循环读取队列队头数据while True:value = q.get(True)print('Get %s from queue.' % value)if __name__ == '__main__':# 父进程创建Queue,并传给各个子进程:q = Queue()#写进程pw = Process(target=write, args=(q,))#读进程pr = Process(target=read, args=(q,))# 启动子进程pw,写入:pw.start()# 启动子进程pr,读取:pr.start()# 等待pw结束:pw.join()# pr进程里是死循环,无法等待其结束,只能强行终止:pr.terminate()
7.小结
7.1.学习小结
在Unix/Linux下,可以使用fork()调用实现多进程。
要实现
跨平台的多进程
,可以使用multiprocessing模块。- 在Unix/Linux下,
multiprocessing模块
封装了fork()系统调用
,使我们不需要关注fork()的细节- 由于Windows没有fork调用,因此,
multiprocessing需要“模拟”出fork()的效果
,父进程所有Python对象都必须通过pickle序列化
再传到子进程去
,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了
。
- 由于Windows没有fork调用,因此,
- 在Unix/Linux下,
进程间通信是通过Queue、Pipes等实现的
7.2.Python分布式进程报错:pickle模块不能序列化lambda函数
原因:
原来是windows操作系统问题造成的,因此稍需要我们自己定义函数,实现序列化。
- 对代码稍加修改,定义两个函数
return_task_queue和return_result_queue实现序列化
# task_master.pyimport random, time, queuefrom multiprocessing.managers import BaseManager# 发送任务的队列:task_queue = queue.Queue()# 接收结果的队列:result_queue = queue.Queue()#windows要我们自己定义函数,实现序列化,然后注册到 QueueManager.register,Unix/Linux不需要def return_task_queue():global task_queuereturn task_queuedef return_result_queue():global result_queuereturn result_queue# 从BaseManager继承的QueueManager:class QueueManager(BaseManager):passif __name__ == '__main__':# 把两个Queue都注册到网络上,callable参数关联了Queue对象QueueManager.register('get_task_queue', callable=return_task_queue)QueueManager.register('get_result_queue', callable=return_result_queue)# 绑定端口5000,设置验证码abcmanager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')# 启动queuemanager.start()# 获得通过网络访问的Queue对象task = manager.get_task_queue()result = manager.get_result_queue()# 放几个任务for i in range(10):n = random.randint(0, 1000)print('添加任务 %d' % n)task.put(n)# 从result队列读取结果print('尝试获取结果')for i in range(10):r = result.get(timeout=10)print('结果是:%s' % r)# 关闭manager.shutdown()print('master exit')
二.分布式进程
1.Python的分布式进程是什么
Python在线程和进程汇总中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而线程最多只能分布到同一台机器的多个CPU上。
- Python的
multiprocessing模块
不但支持多进程
,其中**managers子模块
**还支持把多进程分布到多台机器上
。**一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。- **由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写
分布式多进程程序
。
- **由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写
举个例子:如果我们已经有一个通过Queue通信
的多进程程序在同一台机器
上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程
和处理任务的进程
分布到2台机器上。怎么用分布式进程实现?
- 原有的Queue可以继续使用,但是,
通过managers模块把Queue通过网络暴露出去
,就可以让其他机器的进程
访问Queue了。
2.如何实现分布式进程
编写服务进程
服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务
:# task_master.pyimport random, time, queuefrom multiprocessing.managers import BaseManager# 发送任务的队列:task_queue = queue.Queue()# 接收结果的队列:result_queue = queue.Queue()# windows要我们自己定义函数,实现序列化,然后注册到 QueueManager.register,Unix/Linux不需要def return_task_queue():global task_queuereturn task_queuedef return_result_queue():global result_queuereturn result_queue# 从BaseManager继承的QueueManager:class QueueManager(BaseManager):passif __name__ == '__main__':# 把两个Queue都注册到网络上, callable参数关联了Queue对象:QueueManager.register('get_task_queue', callable=return_task_queue)QueueManager.register('get_result_queue', callable=return_result_queue)# 绑定端口5000, 设置验证码'abc':manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')# 启动Queue:manager.start()# 获得通过网络访问的Queue对象:task = manager.get_task_queue()result = manager.get_result_queue()# 放几个任务进去:for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)# 从result队列读取结果:print('Try get results...')for i in range(10):r = result.get(timeout=10)print('Result: %s' % r)# 关闭:manager.shutdown()print('master exit.')
重要!!!!!
- 当我们在
一台机器
上写多进程程序时,创建的Queue可以直接拿来用
- 但是,在分布式多进程环境下,
添加任务到Queue不可以直接对原始的task_queue进行操作
,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加
。
- 但是,在分布式多进程环境下,
编写任务进程
- 在另一台机器上启动任务进程(本机上启动也可以):
# task_worker.pyimport time, sys, queuefrom multiprocessing.managers import BaseManager# 创建类似的QueueManager:class QueueManager(BaseManager):passif __name__ == '__main__':# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')# 连接到服务器,也就是运行task_master.py的机器:server_addr = '127.0.0.1'print('Connect to server %s...' % server_addr)# 端口和验证码注意保持与task_master.py设置的完全一致:m = QueueManager(address=(server_addr, 5000), authkey=b'abc')# 从网络连接:m.connect()# 获取Queue的对象:task = m.get_task_queue()result = m.get_result_queue()# 从task队列取任务,并把结果写入result队列:for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n * n)time.sleep(1)result.put(r)except queue.Empty:print('task queue is empty.')# 处理结束:print('worker exit.')
- 任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。
先启动服务进程开始执行
Put task 0...Put task 1...Put task 2...Put task 3...Put task 4...Put task 5...Put task 6...Put task 7...Put task 8...Put task 9...Try get results...#等待任务线程写入队列
- task_master.py进程发送完任务后,开始等待
result队列
的结果。
现在启动task_worker.py
进程:
Connect to server 127.0.0.1...run task 0 * 0...run task 1 * 1...run task 2 * 2...run task 3 * 3...run task 4 * 4...run task 5 * 5...run task 6 * 6...run task 7 * 7...run task 8 * 8...run task 9 * 9...worker exit.
- task_worker.py进程结束,在task_master.py进程中
会继续打印出结果
:
- 这个简单的
Master/Worker模型
有什么用?- 其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把
任务分布到几台甚至几十台机器上
,比如把计算n*n的代码换成发送邮件
,就实现了邮件队列的异步发送
。
- 其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把
Queue对象存储在哪?
- 注意到
task_worker.py
中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程
中:
- 而Queue之所以能通过
网络
访问,就是通过QueueManager
实现的。由于QueueManage管理的不止一个Queue
,所以,要给每个Queue的网络调用接口起个名字
,比如get_task_queue。
authkey有什么用?
保证两台机器正常通信,不被其他机器恶意干扰
。如果task_worker.py
的authkey和task_master.py
的authkey不一致,肯定连接不上。
3.小结
Python的分布式进程接口简单,封装良好,·适合需要把繁重任务分布到多台机器的环境下。·
注意Queue的作用是用来
传递任务和接收结果
,每个任务的描述数据量要尽量小
。比如发送一个处理日志文件的任务,就不要发送几百兆
的日志文件本身,而是发送日志文件存放的完整路径
,由Worker进程再去共享的磁盘上读取文件。