转载本文请注明 CSDN 链接处: https://blog.csdn.net/captain5339/article/details/129099833
由浅入深掌握各种 Python 多进程间通信方式
- 1、为什么要掌握进程间通信
- 2、进程间各类通信方式简介
- 3、消息机制通信
- 1) 管道 Pipe 通信方式
- 2) 消息队列Queue通信方式
- 4、同步机制通信
- (1) 同步锁 – Lock
- (2) 子进程间协调机制 — Event
- 5、共享变量
- 6、共享内存 Shared_memory
- 1) SharedMemory对象编程步骤
- 2) ShareableList 共享列表
- 7、管理器Manager
- 1) Manager的主要数据结构
- 2) 使用步骤
- 3) 销毁共享内存变量
- 4) 向管理器注册自定义类型
- 7、总结
1、为什么要掌握进程间通信
Python代码效率由于受制于GIL全局锁限制,多线程不能利用多核CPU来加速,而多进程方式却可以绕过GIL限制, 发挥多CPU加速的优势,达到提高程序的性能的目的。
然而进程间通信却是不得不考虑的问题。 进程不同于线程,进程有自己的独立内存空间,不能使用全局变量在进程间传递数据。
实际项目需求中,常常存在密集计算、或实时性任务,进程之间有时需要传递大量数据,如图片、大对象等,传递数据如果通过文件序列化、或网络接口来进行,难以满足实时性要求,采用redis,或者kaffka, rabbitMQ 之第3方消息队列包,又使系统复杂化了,而且多次重复编码解码数据,效率并不高。
Python multiprocessing 模块本身就提供了丰富高效的进程间通信机制,包括消息机制、同步机制、共享内存等。
了解并掌握 python 各种进程间通信的特点、使用方式,以及安全机制,可以帮助软件工程师在实际项目选用最合适的通信方式,从而获得程序运行的最佳性能。
2、进程间各类通信方式简介
下面列出了python支持的各种进程间通信方式
各类通信方式的适用场景
主要是由于不同应用场景,以及对性能的不同需求,使用通信方式。
- 多进程同时访问数据库,读写文件等场景,需要添加
同步机制
,可采用Lock, Event, Semaphore等机制。 - 而
Queue
队列、Pipe
适合大多数场景,如 生产者 – 消息者,订阅 – 发布,Director – Worker等场景,在大多数情况下,Queue, Pipe即可满足进程间通信的需求。 - 由于Queue对入列、出列对象需要进行序列化操作等,影响了效率。如果对性能要求高,可采用
共享变量
方式交换数据。 - 如果希望python性能飞起来,
共享内存
方式是不二之选,非常适合进程之间交换图片、视频等大块数据,只是对开发者要求更高,必须熟悉bytes, bytearray, memoryview类型的使用,以及pickle, json序列化、字符串编解码等知识。
关于进程间通信的内存安全
同1个应用程序的多进程间通信,可能会因同抢,意外销毁等原因造成共享变量异常,或者使用不当造成内存占用过高,也可能被其它进程非授权访问。
Multiprocessing 模块提供的Queue, Pipe, Lock, Event 对象,都已实现了进程间通信安全机制。
需要注意,采用共享内存方式通信,需要在代码中开发者自己来跟踪、销毁这些共享内存变量。除非开发者很清楚共享内存使用特点,否则不建议直接使用此共享内存,而是通过Manager管理器来使用共享内存。
进程管理器Manager
Multiprocessing提供了管理器Manager类,SharedMemoryManager类,可解决进程通信的内存安全问题,可以将各种共享数据加入管理器,包括 list, dict, Queue, Lock, Event, SharedMemory 等,由其统一跟踪与销毁。
下面按类别介绍种通信方式的特点以及代码开发步骤。
3、消息机制通信
1) 管道 Pipe 通信方式
类似于1上简单的socket通道,双端均可收发消息。
Pipe 对象的构建方法:
parent_conn, child_conn = Pipe(duplex=True/False)
参数说明
- duplex=True, 管道为双向通信
- duplex=False, 管道为单向通信,只有child_conn可以发消息,parent_conn只能接收。
示例代码:
from multiprocessing import Process, Pipe def myfunction(conn):conn.send(['hi!! I am Python'])conn.close()if __name__ == '__main__':parent_conn, child_conn = Pipe()p = Process(target=myfunction, args=(child_conn,))p.start()print (parent_conn.recv() )p.join()
2) 消息队列Queue通信方式
Multiprocessing 的Queue 类,是在python queue 3.0版本上修改的, 可以很容易实现生产者 – 消息者间传递数据,而且Multiprocessing的Queue 模块实现了lock安全机制。
Queue模块共提供了3种类型的队列。
(1) FIFO queue , 先进先出,
class queue.Queue(maxsize=0)
(2) LIFO queue, 后进先出, 实际上就是堆栈
class queue.LifoQueue(maxsize=0)
(3) 带优先级队列, 优先级最低entry value lowest 先了列
class queue.PriorityQueue(maxsize=0)
Multiprocessing.Queue类的主要方法:
method | Description |
---|---|
queue.qsize() | 返回队列长度 |
queue.full() | 队列满,返回 True, 否则返回False |
queue.empty() | 队列空,返回 True, 否则返回False |
queue.put(item) | 将数据写入队列 |
queue.get() | 将数据抛出队列 , |
queue.put_nowait(item), queue.get_nowait() | 无等待写入或抛出 |
说明:
- put(), get() 是阻塞方法, 而put_notwait(), get_nowait()是非阻塞方法。
- Multiprocessing 的Queue类没有提供Task_done, join方法
Queue模块的其它队列类:
(1) SimpleQueue
简洁版的FIFO队列, 适事简单场景使用
(2) JoinableQueue子类
Python 3.5 后新增的 Queue的子类,拥有 task_done(), join() 方法
- task_done()表示,最近读出的1个任务已经完成。
- join()阻塞队列,直到queue中的所有任务都已完成。
producer – consumer 场景,使用Queue的示例
import multiprocessingdef producer(numbers, q):for x in numbers:if x % 2 == 0:if q.full():print("queue is full")breakq.put(x)print(f"put {x} in queue by producer")return Nonedef consumer(q):while not q.empty():print(f"take data {q.get()} from queue by consumer")return Noneif __name__ == "__main__":# 设置1个queue对象,最大长度为5qu = multiprocessing.Queue(maxsize=5,) # 创建producer子进程,把queue做为其中1个参数传给它,该进程负责写p5 = multiprocessing.Process(name="producer-1",target=producer,args=([random.randint(1, 100) for i in range(0, 10)], qu))p5.start()p5.join()#创建consumer子进程,把queue做为1个参数传给它,该进程中队列中读p6 = multiprocessing.Process(name="consumer-1",target=consumer,args=(qu,))p6.start()p6.join()print(qu.qsize())
4、同步机制通信
(1) 同步锁 – Lock
Multiprocessing也提供了与threading 类似的同步锁机制,确保某个时刻只有1个子进程可以访问某个资源或执行某项任务, 以避免同抢。
例如:多个子进程同时访问数据库表时,如果没有同步锁,用户A修改1条数据后,还未提交,此时,用户B也进行了修改,可以预见,用户A提交的将是B个修改的数据。
添加了同步锁,可以确保同时只有1个子进程能够进行写入数据库与提交操作。
如下面的示例,同时只有1个进程可以执行打印操作。
from multiprocessing import Process, Lockdef f(l, i):l.acquire()try:print('hello world', i)finally:l.release()if __name__ == '__main__':lock = Lock()for num in range(10):Process(target=f, args=(lock, num)).start()
不过,acquire() 未能获取锁的话,会阻塞当前线程,直到其它线程将锁release掉,才会继续执行。若有两个以上的锁对象,容易造成死锁,多人项目运用Lock通信时应慎重。
(2) 子进程间协调机制 – Event
Event 机制的工作原理:
1个event 对象实例管理着1个 flag标记, 可以用set()方法将其置为true, 用clear()方法将其置为false, 使用wait()将阻塞当前子进程,直至flag被置为true.
这样由1个进程通过event flag 就可以控制、协调各子进程运行。
Event 是一种进程间通信的信号机制,以异步方式运行。此对象与pytho signal模块的中断信号工作机制是不同的。
Event 对象的使用方法:
1 主进程: 创建1个event 对象,如: flag = multiprocessing.Event()
, 做为参数传给各子进程
2) 子进程A: 不受event影响,通过event 控制其它进程的运行
- 先clear(),将event 置为False, 占用运行权.
- 完成工作后,用set()把flag置为True。
- 子进程 B, C: 受 event 影响
- event对象flag设置为 wait() 状态,暂停运行
- 直到flag重新变为True,恢复运行
Event对象主要方法:
set(), clear()设置 True/False,
wait() 使进程等待,直到flag被改为true.
is_set(), Return True if and only if the internal flag is true.
详细代码,如下
import multiprocessingimport timeimport randomdef joo_a(q, ev):print("subprocess joo_a start")if not ev.is_set():ev.wait()q.put(random.randint(1, 100))print("subprocess joo_a ended")def joo_b(q, ev):print("subprocess joo_b start")ev.clear()time.sleep(2)q.put(random.randint(200, 300))ev.set()print("subprocess joo_b ended")def main_event():qu = multiprocessing.Queue()ev = multiprocessing.Event()sub_a = multiprocessing.Process(target=joo_a, args=(qu, ev))sub_b = multiprocessing.Process(target=joo_b, args=(qu, ev,))sub_a.start()sub_b.start()# ev.set()sub_a.join()sub_b.join()while not qu.empty():print(qu.get())if __name__ == "__main__":main_event()
5、共享变量
进程间通信不能使用全局变量
由于每个子进程运行在自己独立内存空间,只是创建时全局变量复制过来了。因此在子进程A中修改了全局变量的值,子进程B是无法看到的。
原理: 在主进程中创建ctype共享变量对象,再被子进程继承 。
优点: 速度比queue快很多
Python相关支持模块:
• multiprocessing.Value, multiprocessing.Array. 是Sharedctypes的派生类
• multiprocessing.sharedctypes, 则支持更全面ctypes数据类型.
Value()语法:
Value(typecode_or_type, *args, lock=True)
参数说明:
- typecode: ctypes 类型缩写,或 ctype类型名
如’i’ 表示 c_init, ‘d’ 表示 c_double, ‘c’ 表示 c_char等。 - *args 传入值
- Lock lock=true, 表示添加1个锁
返回值
返回一个从共享内存上创建的 ctypes 对象
示例
camera_unit = multiprocessing.Value(ctypes.c_int,0)servo_on_flag = multiprocessing.Value(ctypes.c_bool,False)target_angle = multiprocessing.Value(‘d’,30.0)
注意 使用 share memory 要考虑同抢等问题,释放等问题,需要手工实现。因此在使用共享变量时,建议使用Manager管程来管理这些共享变量。
deffunc(num):num.value=10.78 #子进程改变数值的值,主进程跟着改变 if__name__=="__main__":num = multiprocessing.Value("d", 10.0) # d表示数值,主进程与子进程可共享这个变量。p=multiprocessing.Process(target=func,args=(num,))p.start()p.join() print(num.value)
进程之间共享数据(数组型):
multiprocessing.Array()
返回1个array, 元素为ctypes类型
import multiprocessing deffunc(num):num[2]=9999 #子进程改变数组,主进程跟着改变 if__name__=="__main__":num=multiprocessing.Array("i",[1,2,3,4,5]) p=multiprocessing.Process(target=func,args=(num,))p.start() p.join() print(num[:])
sharedctypes.RawValue() 使用方式与 Value() 是类似的,只是typecode必须是ctypes类型全称,本文不再介绍。
6、共享内存 Shared_memory
Python 提供了一个 SharedMemory 类,用于分配和管理多核或对称多处理器(SMP)机器上进程间的共享内存,而非直接共享变量,
共享内存
方式与 ctypes共享变量
是不同的。前者共享的是二进制内存,后者共享对象为ctype变量,更友好。
ShareMemory处理对象为binary类型,因此对编程者并不友好,但优点是处理速度快。
注意:直接使用SharedMemory 也存在着同抢、泄露隐患,应通过SharedMemory Manager 管理器来使用, 以确保内存安全。
1) SharedMemory对象编程步骤
创建共享内存区:
multiprocessing.shared_memory.SharedMemory(name=none, create=False, size=0)
参数说明:
- name: 全局唯一的名称
- create 指定创建一个新的共享内存块 (True) 还是连接到已存在的共享内存块(False)
使用方法:
父进程创建shared_memory 后,子进程可以使用它,当不再需要后,使用close(), 删除使用unlink()方法
相关属性:
获取内存区内容: shm.buf,为 memoryview 类型
获取内存区名称: shm.name
获取内存区字节数: shm.size
SharedMemory 编程步骤:
1) 在主进程中创建新的SharedMemory 块,
shm = shared_memory.SharedMemory(name="shmArray",create=True, size=10)
shmArray为共享内存的名字,其它进程中可以使用它来attach
初始化:
shm.buf[0:5] = bytearray([10,3,8,1,5])
2)将共享内存对象做为参数传给子进程任务函数
p1 = mp.Process(target=task_a,args=(shm,))
3) 子进程读取修改共享内存对象
buff = shm.bufbuff[5] = 200 # modify the shared memoryprint(f"shared_memeory: {buff[0:9].tolist()}")
4) 使用attach 方式共享内存对象。
这种方式工,不需将共享内存对象做为参数传给子进程中。子进程通过attach方式连接到主进程已创建好的共享内存对象。
具体使用:初始化SharedMemory对象时,name为已存在共享内存对象名,create参数设置为False, 如下:
shm_b = shared_memory.SharedMemory(name='shmArray',create=False)
这样,shm_b指向了主进程中创建的shm.
5) 释放共享内存对象
子进程通过attach方式访问,使用用close()方法关闭。所有子进程都不再使用后,调用unlink()方法释放共享内存对象。
示例: 创建两个进程,使用共享内存通信. 进程p1接受传入SharedMemory 对象,进程p2使用attach 方式访问主进程创建的SharedMemory 对象。
from multiprocessing import shared_memoryimport multiprocessing as mpimport osdef task_a(shm):"""Task function for multiprocessingInputs: shm: shared_memory object"""# create a shared memory object, attached to exsisting one.buff = shm.buf# modify the shared memorybuff[5] = 200print(f"current process: { os.getpid()}, shared_memeory: {buff[0:9].tolist()}")return Nonedef task_b():"""Task function for multiprocessingno input, access existing sharedMemory object. """# create shared memory object, attached to exsisting one.shm_b = shared_memory.SharedMemory(name='shmArray',create=False)buff = shm_b.bufbuff[6] = 100# Pytho SharedMemory class may have a bug:the length of attached SharedMemory object# is not equal to original one.print(f"current process: { os.getpid()}, shared_memeory: {buff[0:9].tolist()}")print(f"buff size: {shm_b.size}")shm_b.close()return Noneif__name__=="__main__":# 创建两个进程,使用共享内存通信# create a shared memory objectshm = shared_memory.SharedMemory(name="shmArray",create=True, size=10)shm.buf[0:5] = bytearray([10,3,8,1,5])# initialize the shared memoryp1 = mp.Process(target=task_a,args=(shm,))# 将sharedMmemory对象作为参数传递给子进程p2 = mp.Process(target=task_b,) # 无参数p1.start()p1.join()p2.start()p2.join()print("in main process, shared_memory: ",shm.buf.tolist() )shm.close()shm.unlink()
多个子进程对同1个SharedMemory对象同时修改操作,如何避免同抢?
参考第4章,子进程中在操作之前,使用 Lock同步锁来避免同抢。
2) ShareableList 共享列表
sharedMemory类还提供了1个共享列表类型,这样就更方便了,进程间可以直接共享python强大的列表
构建方法:
multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)
from multiprocessing import shared_memory>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])>>> [ type(entry) for entry in a ][<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]>>> a[2]-273.154>>> a[2] = -78.5>>> a[2]-78.5>>> a[2] = 'dry ice'# Changing data types is supported as well>>> a[2]'dry ice'>>> a[2] = 'larger than previously allocated storage space'Traceback (most recent call last):...ValueError: exceeds available storage for existing str>>> a[2]'dry ice'>>> len(a)7>>> a.index(42)6>>> a.count(b'howdy')0>>> a.count(b'HoWdY')1>>> a.shm.close()>>> a.shm.unlink()>>> del a# Use of a ShareableList after call to unlink() is unsupportedb = shared_memory.ShareableList(range(5)) # In a first process>>> c = shared_memory.ShareableList(name=b.shm.name)# In a second process>>> cShareableList([0, 1, 2, 3, 4], name='...')>>> c[-1] = -999>>> b[-1]-999>>> b.shm.close()>>> c.shm.close()>>> c.shm.unlink()
7、管理器Manager
Multiprocessing 提供了 Manager 管理器类,当调用1个Manager实例对象的start()方法时,会创建1个manager进程,其唯一目的就是管理共享变量、共享内存, 避免出现进程间共享数据不同步,内存泄露等现象。
其原理如下:
Manager管理器相当于提供了进间程共享内存对象的服务,不仅可以被主进程创建的多个子进程使用,还可以被其它进程访问,甚至跨网络访问。本文仅聚焦于由单一主进程创建的各子进程之间的通信。
1) Manager的主要数据结构
相关类:multiprocessing.Manager
还有两个子类:
- multiprocessing.managers.SharedMemoryManager
- multiprocessing.managers.BaseManager
支持共享变量类型:
Manger类支持的共享变量类型有: dic, list,Queue, Lock, Event, Condition, Semaphore, Barrier等类型
通过代理可支持:python各种类型, 以及ctypes 类型
SharedMemoryManager子类,额外支持 SharedMemory,ShareableList类型。
2) 使用步骤
1)创建管理器对象
snm = Manager() 或者
snm = SharedMemoryManager()
2)创建共享内存变量
新建list, dict
sl = snm.list()
sd = snm.dict()
新建1块bytes共享内存变量,需要指定大小
sx = snm.SharedMemory(size)
新建1个共享列表变量,可用列表来初始化
sl = snm.ShareableList(sequence) 如
sl = smm.ShareableList([‘howdy’, b’HoWdY’, -273.154, 100, True])
新建1个queue, 使用multiprocessing 的Queue类型
q = snm.Queue()
传入子进程
def foo(s, d):.....p1 = Process( target=foo, args=(sl, sd, ) )p1.start()p1.join()
示例 :
from multiprocessing import Process, Managerdef f(d, l):d[1] = '1'd['2'] = 2d[0.25] = Nonel.reverse()if __name__ == '__main__':with Manager() as manager:d = manager.dict()l = manager.list(range(10))p = Process(target=f, args=(d, l))p.start()p.join()print(d)print(l)
将打印
{0.25: None, 1: '1', '2': 2}[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
3) 销毁共享内存变量
方法一:
调用snm.shutdown()方法,会自动调用每个内存块的unlink()方法释放内存。或者 snm.close()
方法二:
使用with语句,结束后会自动释放所有manager变量
>>> with SharedMemoryManager() as smm:... sl = smm.ShareableList(range(2000))... # Divide the work among two processes, storing partial results in sl... p1 = Process(target=do_work, args=(sl, 0, 1000))... p2 = Process(target=do_work, args=(sl, 1000, 2000))... p1.start()... p2.start()# A multiprocessing.Pool might be more efficient... p1.join()... p2.join() # Wait for all work to complete in both processes... total_result = sum(sl)# Consolidate the partial results now in sl
4) 向管理器注册自定义类型
managers的子类BaseManager提供register()方法,支持注册自定义数据类型。如下例,注册1个自定义MathsClass类,并生成实例。
from multiprocessing.managers import BaseManagerclass MathsClass:def add(self, x, y):return x + ydef mul(self, x, y):return x * yclass MyManager(BaseManager):passMyManager.register('Maths', MathsClass)if __name__ == '__main__':with MyManager() as manager:maths = manager.Maths()print(maths.add(4, 3)) # prints 7print(maths.mul(7, 8))
7、总结
Python多进程(multiprocessing
) 编程是绕开GIL提升程序性能的重要方式,进程间通信方式包括消息机制(pipe, queue
)、同步机制( Lock, Event
) 、Shared Memory(Value, Array, Shared_Memory, etc)
等。
直接使用Shared Memory共享内存是不安全的,Multiprocessing.Manager
模块提供了安全管理共享内存变量的管理器功能。
在实际编程时,根据主进程与子进程,子进程之间所要交换数据的类型、大小,频度、实时性等需求,来选择适合的通信方式。