【Python】从入门到上头— 多进程与分布式进程(10)

文章目录

  • 前言
  • 一.多进程
    • 1.fork()系统调用
    • 2.OS模块
    • 3.multiprocessing模块
    • 4.进程池(multiprocessing Pool模块)
    • 5.子进程(subprocess模块)
    • 6.进程间通信
    • 7.小结
      • 7.1.学习小结
      • 7.2.Python分布式进程报错:pickle模块不能序列化lambda函数
  • 二.分布式进程
    • 1.Python的分布式进程是什么
    • 2.如何实现分布式进程
      • 编写服务进程
      • 编写任务进程
    • 3.小结

前言

上一文章【Python】从入门到上头— 多线程(9)已经讲了Python线程和进程的区别 以及Python线程有一个GIL锁任何线程在执行前都需要获得该锁 因此Python多线程不能有效利用多核优势实现多任务

图片[1] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

一.多进程

1.fork()系统调用

要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。

  • Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

    • 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

2.OS模块

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

import osprint('Process (%s) start...' % os.getpid())# Only works on Unix/Linux/Mac:pid = os.fork()if pid == 0:print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))else:print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

图片[2] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

由于Windows没有fork调用,上面的代码在Windows上无法运行。而Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的,推荐大家用Mac学Python

  • 有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求

3.multiprocessing模块

如果你打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?

  • 由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持multiprocessing模块就是跨平台版本的多进程模块

    • multiprocessing模块提供了一个Process类来代表一个进程对象

如:启动一个子进程并等待其结束:

from multiprocessing import Processimport os# 子进程要执行的代码def run_proc(name):print('Run child process %s (%s)...' % (name, os.getpid()))if __name__=='__main__':print('Parent process %s.' % os.getpid())p = Process(target=run_proc, args=('test',))print('Child process will start.')p.start()p.join()print('Child process end.')

图片[3] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

  • 创建子进程时,只需要创建一个Process实例,传入一个执行函数和函数的参数,用start()方法启动,这样创建进程比fork()还要简单。
    • join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步

4.进程池(multiprocessing Pool模块)

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Poolimport os, time, randomdef long_time_task(name):print('Run task %s (%s)...' % (name, os.getpid()))start = time.time()time.sleep(random.random() * 3)end = time.time()print('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__ == '__main__':print('Parent process %s.' % os.getpid())#创建长度为4的进程池p = Pool(4)#循环启动进程,传入调用函数和参数for i in range(13):p.apply_async(long_time_task, args=(i,))print('Waiting for all subprocesses done...')#关闭进程池,等待进程池所有子进程执行完毕p.close()p.join()print('All subprocesses done.')

执行结果如下:

图片[4] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

代码解读:

Pool对象调用 join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了

  • 请注意输出的结果,task 0,1,2,3 4 5是立刻执行的,而task 6-14要等待前面某个task完成后才执行, 由于Pool的默认大小是CPU的物理核数,我的电脑是6核的,你要提交至少7个以上子进程才能看到上面的等待效果。
    ,因此,创建13个子进程,最多同时执行6个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:

5.子进程(subprocess模块)

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

  • subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

如: 在Python代码中运行命令(查询域名指定类型的解析记录)nslookup www.python.org,这和命令行直接运行的效果是一样的:

import subprocessprint('$ nslookup www.baidu.com')r = subprocess.call(['nslookup', 'www.baidu.com'])print('Exit code:', r)

代码执行
图片[5] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL
命令行执行
图片[6] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

如果子进程还需要输入,则可以通过communicate()方法输入:

import subprocessprint('$ nslookup')p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)output, err = p.communicate(b'set q=mx\nbaidu.com\nexit\n')print(output.decode('gbk'))print('Exit code:', p.returncode)

上面的代码相当于在命令行执行命令nslookup,然后手动输入:

set q=mxbaidu.comexit

代码执行
图片[7] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

命令行执行
图片[8] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

6.进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python的multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

  • 以Queue为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
from multiprocessing import Process, Queueimport os, time, random# 写数据进程执行的代码:def write(q):print('Process to write: %s' % os.getpid())#循环写入队尾for value in ['1', '2', '3', '4', '5']:print('Put %s to queue...' % value)q.put(value)time.sleep(random.random())# 读数据进程执行的代码:def read(q):print('Process to read: %s' % os.getpid())#循环读取队列队头数据while True:value = q.get(True)print('Get %s from queue.' % value)if __name__ == '__main__':# 父进程创建Queue,并传给各个子进程:q = Queue()#写进程pw = Process(target=write, args=(q,))#读进程pr = Process(target=read, args=(q,))# 启动子进程pw,写入:pw.start()# 启动子进程pr,读取:pr.start()# 等待pw结束:pw.join()# pr进程里是死循环,无法等待其结束,只能强行终止:pr.terminate()

图片[9] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

7.小结

7.1.学习小结

  • 在Unix/Linux下,可以使用fork()调用实现多进程。

  • 要实现跨平台的多进程,可以使用multiprocessing模块。

    • 在Unix/Linux下,multiprocessing模块封装了fork()系统调用,使我们不需要关注fork()的细节
      • 由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork()的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessing在Windows下调用失败了,要先考虑是不是pickle失败了
  • 进程间通信是通过Queue、Pipes等实现的

7.2.Python分布式进程报错:pickle模块不能序列化lambda函数

图片[10] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL
原因:
图片[11] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

原来是windows操作系统问题造成的,因此稍需要我们自己定义函数,实现序列化。

  • 对代码稍加修改,定义两个函数return_task_queue和return_result_queue实现序列化
# task_master.pyimport random, time, queuefrom multiprocessing.managers import BaseManager# 发送任务的队列:task_queue = queue.Queue()# 接收结果的队列:result_queue = queue.Queue()#windows要我们自己定义函数,实现序列化,然后注册到 QueueManager.register,Unix/Linux不需要def return_task_queue():global task_queuereturn task_queuedef return_result_queue():global result_queuereturn result_queue# 从BaseManager继承的QueueManager:class QueueManager(BaseManager):passif __name__ == '__main__':# 把两个Queue都注册到网络上,callable参数关联了Queue对象QueueManager.register('get_task_queue', callable=return_task_queue)QueueManager.register('get_result_queue', callable=return_result_queue)# 绑定端口5000,设置验证码abcmanager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')# 启动queuemanager.start()# 获得通过网络访问的Queue对象task = manager.get_task_queue()result = manager.get_result_queue()# 放几个任务for i in range(10):n = random.randint(0, 1000)print('添加任务 %d' % n)task.put(n)# 从result队列读取结果print('尝试获取结果')for i in range(10):r = result.get(timeout=10)print('结果是:%s' % r)# 关闭manager.shutdown()print('master exit')

图片[12] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

二.分布式进程

1.Python的分布式进程是什么

Python在线程和进程汇总中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而线程最多只能分布到同一台机器的多个CPU上。

  • Python的multiprocessing模块不但支持多进程,其中**managers子模块**还支持把多进程分布到多台机器上。**一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。
      • **由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序

举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程处理任务的进程分布到2台机器上。怎么用分布式进程实现?

  • 原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

2.如何实现分布式进程

编写服务进程

  • 服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务

    # task_master.pyimport random, time, queuefrom multiprocessing.managers import BaseManager# 发送任务的队列:task_queue = queue.Queue()# 接收结果的队列:result_queue = queue.Queue()# windows要我们自己定义函数,实现序列化,然后注册到 QueueManager.register,Unix/Linux不需要def return_task_queue():global task_queuereturn task_queuedef return_result_queue():global result_queuereturn result_queue# 从BaseManager继承的QueueManager:class QueueManager(BaseManager):passif __name__ == '__main__':# 把两个Queue都注册到网络上, callable参数关联了Queue对象:QueueManager.register('get_task_queue', callable=return_task_queue)QueueManager.register('get_result_queue', callable=return_result_queue)# 绑定端口5000, 设置验证码'abc':manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc')# 启动Queue:manager.start()# 获得通过网络访问的Queue对象:task = manager.get_task_queue()result = manager.get_result_queue()# 放几个任务进去:for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)# 从result队列读取结果:print('Try get results...')for i in range(10):r = result.get(timeout=10)print('Result: %s' % r)# 关闭:manager.shutdown()print('master exit.')

重要!!!!!

  • 当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用
    • 但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加

编写任务进程

  • 在另一台机器上启动任务进程(本机上启动也可以):
# task_worker.pyimport time, sys, queuefrom multiprocessing.managers import BaseManager# 创建类似的QueueManager:class QueueManager(BaseManager):passif __name__ == '__main__':# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:QueueManager.register('get_task_queue')QueueManager.register('get_result_queue')# 连接到服务器,也就是运行task_master.py的机器:server_addr = '127.0.0.1'print('Connect to server %s...' % server_addr)# 端口和验证码注意保持与task_master.py设置的完全一致:m = QueueManager(address=(server_addr, 5000), authkey=b'abc')# 从网络连接:m.connect()# 获取Queue的对象:task = m.get_task_queue()result = m.get_result_queue()# 从task队列取任务,并把结果写入result队列:for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n * n)time.sleep(1)result.put(r)except queue.Empty:print('task queue is empty.')# 处理结束:print('worker exit.')
  • 任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

先启动服务进程开始执行

Put task 0...Put task 1...Put task 2...Put task 3...Put task 4...Put task 5...Put task 6...Put task 7...Put task 8...Put task 9...Try get results...#等待任务线程写入队列
  • task_master.py进程发送完任务后,开始等待result队列的结果。

现在启动task_worker.py进程:

Connect to server 127.0.0.1...run task 0 * 0...run task 1 * 1...run task 2 * 2...run task 3 * 3...run task 4 * 4...run task 5 * 5...run task 6 * 6...run task 7 * 7...run task 8 * 8...run task 9 * 9...worker exit.
  • task_worker.py进程结束,在task_master.py进程中会继续打印出结果

图片[13] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

  • 这个简单的Master/Worker模型有什么用?
    • 其实这就是一个简单但真正的分布式计算把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送

Queue对象存储在哪?

  • 注意到task_worker.py中根本没有创建Queue的代码,所以,Queue对象存储在task_master.py进程中:

图片[14] - 【Python】从入门到上头— 多进程与分布式进程(10) - MaxSSL

  • 而Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManage管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue。

authkey有什么用?

  • 保证两台机器正常通信,不被其他机器恶意干扰。如果task_worker.py的authkey和task_master.py的authkey不一致,肯定连接不上。

3.小结

  • Python的分布式进程接口简单,封装良好,·适合需要把繁重任务分布到多台机器的环境下。·

  • 注意Queue的作用是用来传递任务和接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享