目录

  • 同步与异步
    • 阻塞与非阻塞
    • 综合使用
  • 创建进程的多种方式
    • 前言
    • windows系统创建进程的问题(重要)
    • multiprocessing模块之Process
      • 展现异步
      • 创建进程的方式(一):使用Process()创建进程对象
        • 基本使用
        • 给子进程运行的函数传参
      • 创建进程的方式(二):重写Process类的run方法
        • 还是如何传参
      • join方法
  • 进程间的数据隔离
  • IPC机制 消息队列
    • multiprocessing模块之Queue
      • get() put()
      • full() empty()
      • get_nowait()
      • 消息队列实现子进程消息传递
  • 消费者模型
  • 进程对象多种方法
    • 如何查看进程号
    • multiprocessing模块之Process其他方法
  • 守护进程
  • 僵尸进程
  • 孤儿进程
  • 多进程数据错乱问题
  • 练习

同步与异步

用来表达任务的提交方式

同步:
提交完任务之后原地等待任务的返回结果 期间不做任何事
异步:
提交完任务之后不愿地等待任务的返回结果 直接去做其他事 有结果自动通知

阻塞与非阻塞

用来表达任务的执行状态

阻塞
程序处于阻塞态
非阻塞
程序处于就绪态、运行态

综合使用

  1. 同步阻塞
    提交任务之后 cpu走了 进程不执行了
  2. 同步非阻塞
    在原地做一些事情
  3. 异步阻塞
    cpu没来做不了事情
  4. 异步非阻塞(效率最高)
    提交完任务之后 进程还继续 cpu也还在

异步非阻塞框架(写游戏、响应速度、效率高)

创建进程的多种方式前言

# 1.创建进程的步骤如何创建进程?用鼠标双击一个桌面图标,就创建了一个应用程序的进程。而我们可以用python代码创建进程,这其中至少有这样的过程:1.硬盘中存放python代码2.读取代码到内存3.cpu执行代码4.操作系统创建新进程# 2.不同操作系统的差异(重要)因为进程是由操作系统创建的,所以根据不同操作系统也会有差异:"""在不同的操作系统中创建进程底层原理不一样    windows        以导入模块的形式创建进程(需要使用if __name__ == 'main')    linux/mac        以拷贝代码的形式创建进程 (复制的时候不包含创建进程的代码)"""# 3.父进程和子进程比如一个浏览器,可能会有很多分页。那相对的来说浏览器就是主进程,一个页面就是一个子进程。一个py文件,里面写了创建进程的代码。那这个py文件运行之后,首先他自己是一个进程(父进程),被他创建出来的进程是他的子进程。父进程和子进程是相对的概念,比如刚刚所说的py文件,又是pycharm的子进程。

windows系统创建进程的问题(重要)

在windows系统下使用Python模块multiprocessing模块创建进程:

from multiprocessing import Processimport timedef task():    print('子进程开始执行')    time.sleep(3)    print('子进程执行结束')p = Process(target=task)  # 使用Process模块创建一个子进程 在子进程运行task函数p.start()  # 运行子进程print('主进程运行结束')# 执行后会产生如下报错信息:'''RuntimeError:         An attempt has been made to start a new process before the        current process has finished its bootstrapping phase.        This probably means that you are not using fork to start your        child processes and you have forgotten to use the proper idiom        in the main module:            if __name__ == '__main__':                freeze_support()                ...        The "freeze_support()" line can be omitted if the program        is not going to be frozen to produce an executable.Process finished with exit code 0'''

为什么会产生这样的报错呢?
windows以导入模块的形式创建进程。

所以产生报错的就是如下两行代码:

解决:

from multiprocessing import Processimport timedef task():    print('子进程开始执行')    time.sleep(3)    print('子进程执行结束')if __name__ == '__main__':    p = Process(target=task)    p.start()'''将这两行代码放入if __name__ == 'main':使主进程作为模块导入的时候,不再执行这两行代码,就不会出现问题了。'''

multiprocessing模块之Process展现异步

展现异步之前,先举一个同步的例子:

import timedef task():    print('子进程开始执行')    time.sleep(3)    print('子进程执行结束')if __name__ == '__main__':    task()   # 程序会跳回去执行task函数,task没执行完就不会执行下面的print()    print('主进程运行结束')'''输出结果:子进程开始执行子进程执行结束主进程运行结束'''

展现异步:

def task():    print('子进程开始执行')    time.sleep(3)    print('子进程执行结束')if __name__ == '__main__':    p = Process(target=task)    p.start()    print('主进程运行结束') # 这行代码执行最快 主进程真的结束了吗? =。='''输出结果:主进程运行结束子进程开始执行子进程执行结束'''

为什么是这样的输出结果?
p.start()执行完后,相当于向操作系统发送了一个信息,要创建一个子进程。操作系统要处理信息是需要一定时间的。主进程只是发了一个信息就完了,代码继续往下走,自然会执行print。等操作系统反应过来了,子进程才会开始执行,而这个速度是慢于主进程代码执行速度的。也就是说: 操作系统开辟内存空间的时候 主进程不停着,直接运行下一行。
请解释输出结果:

请解释输出结果:

创建进程的方式(一):使用Process()创建进程对象基本使用

import timefrom multiprocessing import Processdef task():    print('子进程开始执行')    time.sleep(3)    print('子进程执行结束')if __name__ == '__main__':    p = Process(target=task)    print(p, type(p))  #      p.start()# 用Process创建一个子进程对象 然后调用子进程对象的start()方法运行子进程。

查看Process源码发现他继承BaseProcess类:

BaseProcess类:看红色框就好。

给子进程运行的函数传参

import timefrom multiprocessing import Processdef task1(a,b):    time.sleep(3)    print(a,b)def task2(a,b):    time.sleep(3)    print(a,b)if __name__ == '__main__':    p1 = Process(target=task1, args=('cloud','alice'))  # 位置参数    p2 = Process(target=task2, kwargs={'a':'cloud','b':'alice'})  # 关键字参数    p1.start()    p2.start()

创建进程的方式(二):重写Process类的run方法

import timefrom multiprocessing import Processclass MyProcess(Process):    def run(self):        print('run is running')        time.sleep(3)        print('run is over')if __name__ == '__main__':    obj = MyProcess()  # 产生进程对象    obj.start()  # 此时就会开一个子进程 子进程会去运行类中的run方法    print('主')

为什么可以这么做呢?我们知道Process继承BaseProcess类。于是我们去看BaseProcess类的run方法:

可以得知这个run方法就是留给我们重写的。

还是如何传参

需要重写BaseProcess类的双下init:

from multiprocessing import Processimport timeclass MyProcess(Process):    def __init__(self, name, age):  # 重写__init__方法        super().__init__()  # 这里init不传参是因为 父类init里面的形参全是默认参数        self.name = name  # 这里的self是进程对象        self.age = age  # 给新产生的进程对象新增两个对象独有的属性name、age                        # 备忘:__new__产生新对象    def run(self):        print('run is running', self.name, self.age)  # 使用对象中的属性        time.sleep(3)        print('run is over', self.name, self.age)if __name__ == '__main__':    obj = MyProcess('cloud', 18)    obj.start()    print('主')

join方法

join方法的作用是:让主进程代码等待子进程代码结束之后,再执行。
主进程会卡在join方法处,主进程join方法下面的代码,不会执行。直到子进程运行结束。

# 1.基本使用import timefrom multiprocessing import Processdef task():    print('子进程开始执行')    time.sleep(3)    print('子进程执行结束')if __name__ == '__main__':    p = Process(target=task)    p.start()    p.join()    print('主')'''输出结果:子进程开始执行子进程执行结束主'''使用join方法会让主程序等待子进程结束。不加join时,应该是'主'最先被输出。
# 2.非阻塞例子from multiprocessing import Processimport timedef task(name, n):    print('%s is running' % name)    time.sleep(n)    print('%s is over' % name)if __name__ == '__main__':    p1 = Process(target=task, args=('jason1', 1))  # 子进程p1 执行时间为1s    p2 = Process(target=task, args=('jason2', 2))  # 子进程p2 执行时间为2s    p3 = Process(target=task, args=('jason3', 3))  # 子进程p3 执行时间为3s    start_time = time.time()    p1.start()    p2.start()    p3.start()    p1.join()    p2.join()    p3.join()    print(time.time() - start_time)  # 请问运行时间是?    '''jason1 is runningjason2 is runningjason3 is runningjason1 is overjason2 is overjason3 is over3.2216908931732178'''
# 3.阻塞例子from multiprocessing import Processimport timedef task(name, n):    print('%s is running' % name)    time.sleep(n)    print('%s is over' % name)if __name__ == '__main__':    p1 = Process(target=task, args=('jason1', 1))  # 子进程p1 执行时间为1s    p2 = Process(target=task, args=('jason2', 2))  # 子进程p2 执行时间为2s    p3 = Process(target=task, args=('jason3', 3))  # 子进程p3 执行时间为3s    start_time = time.time()    p1.start()    p1.join()    p2.start()    p2.join()    p3.start()    p3.join()        print(time.time() - start_time)  # 请问运行时间是?'''jason1 is runningjason1 is overjason2 is runningjason2 is overjason3 is runningjason3 is over6.449279308319092'''
# 4.谁的速度快?import timefrom multiprocessing import Processdef task(name):    start_time = time.time()    print(f'{name}子进程开始执行')    time.sleep(3)    print(f'{name}子进程执行结束')    print(f'{name}进程耗时:{time.time()-start_time}')if __name__ == '__main__':    p1 = Process(target=task,args = ('p1',))    p2 = Process(target=task,args = ('p2',))    p1.start()    p2.start()    p1.join()    print('主')  # 这段print代码有时可以跑的比第二个子进程还快!
# 5.无法阻挡子进程import timefrom multiprocessing import Processdef task(name):    start_time = time.time()    print(f'{name}子进程开始执行')    time.sleep(3)    print(f'{name}子进程执行结束')    print(f'{name}进程耗时:{time.time()-start_time}')def task2(name):    start_time = time.time()    print(f'{name}子进程开始执行')    time.sleep(1)    print(f'{name}子进程执行结束')    print(f'{name}进程耗时:{time.time()-start_time}')if __name__ == '__main__':    p1 = Process(target=task,args = ('p1',))    p2 = Process(target=task2,args = ('p2',))    p1.start()    p2.start()    p1.join()  # join只能卡住主进程的代码运行 不能阻挡子进程    print('主')'''p1子进程开始执行p2子进程开始执行p2子进程执行结束p2进程耗时:1.0048680305480957p1子进程执行结束p1进程耗时:3.0130884647369385主'''

进程间的数据隔离

同一台计算机上的多个进程数据是严格意义上的物理隔离(默认情况下)from multiprocessing import Processimport timemoney = 1000def task():    global money    money = 666    print('子进程的task函数查看money', money)if __name__ == '__main__':    p1 = Process(target=task)    p1.start()  # 创建子进程    time.sleep(3)  # 主进程代码等待3秒    print(money)  # 主进程代码打印money

IPC机制 消息队列

IPC的概念:实现进程间通信
消息队列:消息队列可以理解成一个公共存数据的地方 所有进程都可以存 也可以取。

multiprocessing模块之Queue

from multiprocessing import Queue# 1.产生消息队列qq = Queue(3)  # 括号内可以指定存储数据的个数

不给Queue传值的情况下,会自动使用最大值:(SEM_VALUE_MAX)

此时队列可以容纳值的数量为:

get() put()

使用get从队列中获取值 使用put往队列添加值 符合先进先出

from multiprocessing import Queueq = Queue(3)q.put(111)q.put(222)q.put(333)print(q.get()) # 111print(q.get()) # 222print(q.get()) # 333

当队列已满时,你使用put添加值
或者队列已空,你使用get获取值
都会导致当前进程阻塞,等待有别的进程往队列里添加/获取值

from multiprocessing import Queueq = Queue(3)q.put(111)q.put(222)q.put(333)q.put(444)print('阻塞')  # 无法输出这行代码

full() empty()

full 用于判断队列是否为满。
但有些需要注意:
1.判断的是当前进程的队列
2.判断的是执行full()这行代码这个瞬时时间下,队列的状态是否为满
empty 与 full 相反 判断的是队列是否为空

from multiprocessing import Queueq = Queue(3)q.put(111)print(q.full())  # 判断队列是否已满  Falseq.put(222)q.put(333)print(q.full())  # 判断队列是否已满  Trueprint(q.get())print(q.get())print(q.empty())  # 判断队列是否为空  Falseprint(q.get())print(q.empty())  # 判断队列是否为空  True

注意:
full() empty() 在多进程中都不能使用!!!
可能当前进程你执行完q.empty 之后 马上又另外的进程塞一个数据进来 q.empty只能判断当前进程一个瞬时时间管道是否空。

get_nowait()

这个方法如他的名字,如果获取不到队列的值,就马上抛出异常。

from multiprocessing import Queueq = Queue(3)q.put(111)print(q.get())print(q.get_nowait())  # queue.Empty

消息队列实现子进程消息传递

from multiprocessing import Process, Queuedef product(q):    q.put('子进程p添加的数据')def consumer(q):    print('子进程获取队列中的数据', q.get())if __name__ == '__main__':    q = Queue()    # 主进程往队列中添加数据    # q.put('我是主进程添加的数据')    p1 = Process(target=consumer, args=(q,))    p2 = Process(target=product, args=(q,))    p1.start()    p2.start()    print('主')'''consumer进程在消息队列没有数据的时候 这个进程会等待product进程往Queue放东西'''

消费者模型

"""回想爬虫"""生产者负责产生数据的'人'消费者负责处理数据的'人'    该模型除了有生产者和消费者之外还必须有消息队列(只要是能够提供数据保存服务和提取服务的理论上都可以)生产者消费者模型可以实现程序结耦合。两个程序(生产者、消费者)基于消息队列都可以独立运行。用包子铺举例:老板提前做好包子放在蒸笼(消息队列)。没有客人的时候可以一直做包子。客人也不需要等待,随时可以来拿包子。(程序结耦合)

进程对象多种方法如何查看进程号

# 1.current_process查看from multiprocessing import Process, current_processdef task():    print('子进程')    print(current_process())  # 查看进程    print(current_process().pid)  # 查看进程号if __name__ == '__main__':    p = Process(target=task)    p.start()    print('主进程:')    print(current_process())    print(current_process().pid)'''主进程:504子进程23260'''
# 2.os模块import osprint(os.getpid())  # 获取当前进程的进程号  # 17672print(os.getppid())  # 获取当前进程的父进程的进程号  # 28600  # 我用的是pycharm应该获取的是pycharm的进程号

去cmd里输入tasklist查看,果然如此:

multiprocessing模块之Process其他方法

1.终止进程p1.terminate()ps:计算机操作系统都有对应的命令可以直接杀死进程windows:taskkill /F /PID 进程号2.判断进程是否存活p1.is_alive()3.start()4.join()
# 例子import timefrom multiprocessing import Processdef task():    print('子进程开始执行')    time.sleep(3)    print('子进程执行结束')if __name__ == '__main__':    p = Process(target=task)    p.start()    p.terminate()    print(p.is_alive())  # True  # 不是已经终止进程了吗?为什么还是True# 1.前脚刚开 后脚就关了 这时候进程都起不来 如果加中间sleep可能就子进程运行完了# 2.执行terminate相当于让操作系统关掉刚刚创建的子进程,而这是需要时间的。可能执行is_alive的速度比操作系统关进程的速度快,所以结果是True

守护进程

守护进程会随着守护的进程结束而立刻结束使用场景:一键关闭所有子进程eg: 吴勇是张红的守护进程 一旦张红嗝屁了 吴勇立刻嗝屁      from multiprocessing import Processimport timedef task(name):    print('德邦总管:%s' % name)    time.sleep(3)    print('德邦总管:%s' % name)if __name__ == '__main__':    p1 = Process(target=task, args=('大张红',))    p1.daemon = True    p1.start()    time.sleep(1)    print('恕瑞玛皇帝:小吴勇嗝屁了')

僵尸进程

僵尸进程进程执行完毕后并不会立刻销毁所有的数据 会有一些信息短暂保留下来 比如进程号、进程执行时间、进程消耗功率等给父进程查看 ps:所有的进程都会变成僵尸进程

孤儿进程

孤儿进程子进程正常运行 父进程意外死亡 操作系统针对孤儿进程会派遣福利院管理

多进程数据错乱问题

火车票抢票时经常出现这种问题:
本来有3张票 点进去之后一张票都没有
是因为:你看到的这三张票基于 进入app的时间
你需要刷新 才能获取当前时间的信息

模拟抢票软件from multiprocessing import Processimport timeimport jsonimport random# 查票def search(name):    with open(r'data.json', 'r', encoding='utf8') as f:        data = json.load(f)    print('%s在查票 当前余票为:%s' % (name, data.get('ticket_num')))# 买票def buy(name):    # 再次确认票    with open(r'data.json', 'r', encoding='utf8') as f:        data = json.load(f)    # 模拟网络延迟    time.sleep(random.randint(1, 3))    # 判断是否有票 有就买    if data.get('ticket_num') > 0:        data['ticket_num'] -= 1        with open(r'data.json', 'w', encoding='utf8') as f:            json.dump(data, f)        print('%s买票成功' % name)    else:        print('%s很倒霉 没有抢到票' % name)def run(name):    search(name)    buy(name)if __name__ == '__main__':    tick_dict = {'ticket_num': 1}  # 创建车票字典 里面就只有一张票    with open('data.json', 'w', encoding='utf8') as f:        json.dump(tick_dict, f)    for i in range(10):        p = Process(target=run, args=('用户%s'%i, ))        p.start()   """多进程操作数据很可能会造成数据错乱>>>:互斥锁互斥锁将并发变成串行 牺牲了效率但是保障了数据的安全也就是让进程排队"""

输出结果:

练习

1.将TCP服务端使用多进程实现并发效果聊天全部采用自动发送 不要用input手动输2.整理今日内容及博客3.查询IT行业可能出现的锁名称及概念4.整理理论内容 尝试编写cs架构的软件 实现数据的上传与下载