zl程序教程

您现在的位置是:首页 >  后端

当前栏目

8-1-1python语法基础-并发编程-进程-进程创建,进程冲突,进程之间通信,进程池,

进程基础并发编程通信 创建 语法 之间
2023-09-14 09:00:33 时间

进程

并发编程的相关概念:

进程

  • 1,运行中的程序,就是进程,程序是没有生命的实体,运行起来了就有生命了,
  • 操作系统可以管理进程,进程是操作系统基本的执行单元,
  • 2,每一个进程都有它自己的地址空间,进程之间是不会混的,比如qq不能访问微信的地址空间,
  • 操作系统替你隔离开了,这也是操作系统引入进程这个概念的原因,

进程的调度

  • 1,先来先服务,有一个不好的,就是不利于短作业
  • 2,短作业优先算法,但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。
  • 3,时间片轮转算法,就是轮流执行,已经很科学了,
  • 4,多级反馈队列算法,有多个队列,有一个新任务来了放入第一个队列,这是优先级加上时间片轮转,第二个任务来了放入下一级,

并发和并行:

  • 进程的并行:这种只有在多核cpu才可以实现,
  • 进程的并发:这是轮流执行,由于速度很快,看起来像是一起执行的,比如一遍听音乐,一遍写代码,

进程的三状态转换图:非常重要

  • 1,进程一开始运行的时候,是就绪的状态,这是第一个状态,就是告诉cpu,我已经准备好可以运行了,进入排队了,
  • 2,时间片轮转,轮到你了之后,你就运行了,这是第二个状态,
  • 3,发生阻塞,这是第三个状态,比如你的程序让你输入内容,input方法, 这时候是阻塞的,你输入完毕了之后,就又畅通了,
  • 这是等待I/O完成,input,sleep,文件的输入和输出,
  • 事件处理之后,你还要进入就绪状态了,
  • 全部处理完了,就结束了,

同步和异步

  • 1,同步,需要等待,需要排队,你什么也不能干,
  • 2,异步,不需要等待,你可以去做其他事情,

阻塞和非阻塞

  • 1,阻塞,就是input,sleep这些,需要等待,这是阻塞,
  • 2,非阻塞,就是跳过这些阻塞,但是程序中不可避免的需要阻塞,因为需要等待内容处理,

同步异步和阻塞非阻塞:

  • 同步阻塞
  • 同步非阻塞
  • 异步阻塞
  • 异步非阻塞,效率更高,

Python中使用多进程

  • 运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。
  • 因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。
  • 多个进程可以实现并发效果,就会让程序的执行速度变快。
  • 多进程有一个内置的模块,我们需要借助这个模块:
  • from multiprocessing import Process

multiprocessing模块

  • 在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,
  • 我将这部分大致分为四个部分:
  • 1,创建进程部分,
  • 2,进程同步部分,
  • 3,进程池部分,
  • 4,进程之间数据共享。

创建进程

创建进程的方式1,通过构造函数的方式

  • 认知这个多进程:
  • 1,python里面的多进程是使用的标准库,import multiprocessing,实现的
  • 2,p.start():启动进程,并调用该子进程中的p.run()
  • 3,p.join(),这个join就是在感知一个子进程的一个结束,将异步改成同步,主进程必须要等待子进程结束之后,才会结束,如果不加,主进程就会先结束,
  • 4,注意参数的传递,args是传递一个元组,一个元素的时候要有逗号,kargs是传递一个字典,

创建单个进程 
import multiprocessing, os


def hi(arg):
    print(arg)
    print("父进程号", os.getppid(), "子进程号", os.getpid())


if __name__ == '__main__':
    p = multiprocessing.Process(target=hi, args=("xiaoxiao",))
    p.start()
    p.join()
    print("end")


 

创建多个进程:
from multiprocessing import Process
import time


def task(name):
    print('{} is running!'.format(name))
    time.sleep(3)
    print('{} is done!'.format(name))


if __name__ == '__main__':
    # 开启10个子进程
    p_list = []
    for i in range(10):
        p = Process(target=task, args=(i, ))
        p_list.append(p)
        p.start()
    # p.join()
    [p.join() for p in p_list]  # 保证前面的10个进程全部结束了,才会执行下面的代码,
    print('--- 主进程 ----')
    # 这种开启了多进程,可以读多个进程去存文件,取文件内容,

创建进程的方式2:通过类的方式

  • 注意两点
  • 第一点,创建一个类,继承process
  • 第二点,类中必须实现run方法,这个run方法里面就是子进程要执行的内容,
import os
from multiprocessing import Process

class MyProcess(Process):  # 继承导入的process,
    def __init__(self,name):  # 为了进程能传递参数,
        super().__init__()  # 这是继承了父类所有的参数,
        self.name=name
    def run(self):
        # print(os.getpid())
        print("子进程号",self.pid)
        print("参数",self.name)  # print(os.getpid()) 这两句是一样的,


if __name__ == '__main__':
    # p1=MyProcess()  # 这是不传参数的
    p1=MyProcess("name1")  # 这是传参数的,这就是面向对象的实例化,
    p2=MyProcess('name2')
    p3=MyProcess('name3')

    p1.start() #start会自动调用run
    p2.start()
    # p2.run()
    p3.start()
    # 三个进程之间是异步的,

    p1.join()
    p2.join()
    p3.join()
    # 三个进程都结束了才会执行下面的内容,这是把异步,变成异步,
    print('主线程')

进程冲突和进程锁

  • 这个和线程冲突和线程锁是一样的,
  • 本质就是同时操作同一个资源导致的冲突,
  • 解决方法就是加锁,

体验进程之间的数据隔离问题

  • 进程之间的数据隔离问题,也就是不能共享全局变量
  • 进程和进程之间的数据肯定是隔离的,比如qq和微信,不隔离就坏事了
  • 几个进程之间,如果不通过特殊的手段,是不可能共享一个数据的,这个记住,没有什么可理解的,
from multiprocessing import Process

def work():
    global n  # 声明了一个全局变量,
    n=0
    print('子进程内: ',n)


if __name__ == '__main__':
    n = 100
    p=Process(target=work)
    p.start()
    print('主进程内: ',n)

进程之间通信

进程之间通信1,---管道

  • 上面我们看到了,进程之间是不能通信的,那怎么解决进程之间的通信问题呢,可以使用管道,
    from multiprocessing import Pipe

进程之间通信2,---队列

  • 认知队列的操作
  • 1,主要是put和get,
  • 2,使用这个队列,可以实现复杂的生产者和消费者模型,这个将是主要的进程之间通信的方式,
from multiprocessing import Process, Queue

def work(q):
    q.put("xiaoxiao")


if __name__ == '__main__':
    q = Queue()  # 创建一个Queue对象
    p=Process(target=work,args=(q,)) #创建一个进程
    p.start()
    print(q.get())
    p.join()

进程之间通信3,---数据共享

from multiprocessing import Value

总结

  • 进程间的三种通信(IPC)方式:
  • 方式一:队列(推荐使用)
  • 方式二:管道(不推荐使用,了解即可)
  • 管道相当于队列,但是管道不自动加锁
  • 方式三:共享数据(不推荐使用,了解即可)
  • 共享数据也没有自动加锁的功能,所以还是推荐用队列的。感兴趣的可以研究研究管道和共享数据

进程池

为什么会有进程池?

  • 1,因为每次开启一个进程,都需要创建一个内存空间,这是耗时的

  • 2,进程过多,操作调度也会耗时,

  • 所以会有非常大的性能问题,

  • 所以我们不会让进程太大,我们会设计一个进程池,

  • 进程池:

  • 1,Python中先创建一个进程的池子,

  • 2,这个进程池能存放多少个进程,比如有5个进程,

  • 3,先把这些进程创建好,

  • 4,比如有50个任务他们到进程池里面去找进程,找到的就执行,找不到的就等待,

  • 5,进程执行结束之后,不会结束,而是返回进程池,等待下一个任务,

  • 所以进程池,可以节省进程创建的时间,节省了操作系统的调度,而且进程不会过多的创建,

进程池的使用:进程池的同步调用:

import os,time
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(1)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
                                    # 但不管该任务是否存在阻塞,同步调用都会在原地等着
        res_l.append(res)

    print(res_l)  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

进程池的使用:进程池的异步调用:

import os
import time
import random
from multiprocessing import Pool

def work(n):
    print('%s run' %os.getpid())
    time.sleep(random.random())
    return n**2

if __name__ == '__main__':
    p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
                                          # 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
                                          # 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
                                          # 而是执行完一个就释放一个进程,这个进程就去接收新的任务。  
        res_l.append(res)

    # 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
    # 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

进程池的返回值

from multiprocessing import Pool, Process
def func(i):
    return i
# if __name__ == '__main__':
#     pool = Pool(5)
#     res_list = []
#     for i in range(10):
#         # res = pool.apply(func,args=(i,))  # 所以这个结果接收,就是返回值,
#         res = pool.apply_async(func,args=(i,))  # 所以这个结果接收,就是返回值,
#         res_list.append(res)
#     for res in res_list:
#         print(res.get())  # get会阻塞等待结果

# 上面讲了apply和apply_async 的返回值的问题,
# 下面讲讲map的返回值的问题,比较简单

if __name__ == '__main__':
    pool = Pool(5)
    ret = pool.map(func,range(10))
    print(ret)  # 这是返回了一个列表,

# 使用的时候想用map,map搞不定就使用,apply_async

进程池的回调函数

from multiprocessing import Pool

def func1(n):
    print(111)
    return n

def func2(n):
    print(222)
    print(n*2)

if __name__ == '__main__':
    p = Pool(5)
    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()
    # 回调函数都是在主进程中执行的,

总结

  • 更高级的进程池是比较智能的,
  • 比如现在进程池有5个进程,就可以处理过来了,就不需要增加
  • 但是如果处理等待的任务太多了,急需要往进程池里面加进程,一直到设置的进程池上限
  • 如果任务减少了,就进程池里面减少,
  • 这是比较智能的,
  • Python中没有高级的进程池,只有一个固定的进程数的进程池,没有弹性的那种,

多进程的信号量

进程池和信号量有什么关系?

  • 使用了信号量,并没有改变进程的个数,
  • 比如有200个任务,
  • 依然200个进程在排队,只是控制了同一时间有几个进程在执行,
  • 如果信号量是5
  • 也就是只允许5个进程让操作系统调度,节省了操作系统的调度时间,但是并没有节省进程的创建时间,
  • 而进程池,是有200个任务去拿进程,所以进程池既是节省了操作系统的调度时间,也节省进程的创建时间,
from multiprocessing import Process
import time,random
from multiprocessing import Semaphore
# ktv只有1个房间,1个房间只能装4个人,但是这样写就是20个人都进入到房间了,
# 假设ket门口有4把钥匙,一个进程来了那一把钥匙,然后关门,这样只有4个进程能拿到,剩下的之后1个进程出来了才可以继续其他的进程,
# 这个概念就叫做信号量,同一时间就只有四个人,
def ktv(i,sem):
    sem.acquire()  # 获取钥匙
    print("%d进入ktv"%i)
    time.sleep(random.randint(60,180))  # 这是每一个人唱歌1-3分钟
    print("%d走出ktv"%i)
    sem.release()  # 还钥匙

if __name__ == "__main__":
    sem = Semaphore(4)  # 这就是设置有多少把钥匙,  信号量的英文就是:Semaphore
    for i in range(20):
        p=Process(target=ktv,args=(i,sem))
        p.start()

进程的事件

# 事件
import time
from multiprocessing import Event, Process


# 一个信号,可以使所有的进程都进入阻塞状态,也可以控制所有信号都解除阻塞,
# 一个事件创建之后,默认是阻塞状态,
# e = Event()  # 创建一个事件
# print(e.is_set())  # 查看一个事件是否是阻塞状态,
# print(123445)
# e.set()  # 这是把阻塞的状态改为true,
# print(e.is_set())
# e.wait()  # 根据e.is_set()的结果,如果是false,就会阻塞,如果是true就会不阻塞
# print(12344)
# e.clear()  # 这是把阻塞的状态改为false
# print(e.is_set())
# e.wait()  # 虽然阻塞了,但是一定要有这个wait,才会阻塞后面的代码,
# print(444444)


# 举一个例子,红绿灯

# 每一个进程表示一辆车,
def car(e,i):
    #e.is_set() 默认返回False 代表的是绿灯
    if not e.is_set():
        print("car%s在等待"%i)
        e.wait()
    print("car%s通行了"%i)

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print('\033[31m红灯亮了\033[0m')
        else:
            e.set()
            print('\033[32m绿灯亮了\033[0m')
        time.sleep(2)


if __name__ == '__main__':
    e=Event()
    # 模拟启动交通灯
    p1=Process(target=light,args=(e,))
    p1.daemon=True
    p1.start()
    #模拟20辆小车
    for i in range(20):
        import random
        time.sleep(random.uniform(0,2))
        p2=Process(target=car,args=(e,i))
        p2.start()
    print("程序彻底结束!")

守护进程

"""
守护进程

父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束。
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

"""
# 第一版:主进程结束了,子进程还没有结束,
# import time
# from multiprocessing import Process
#
# def func():
#     while True:
#         time.sleep(1)
#         print("我还活着")
#
#
# if __name__ == '__main__':
#     p=Process(target=func)
#     p.start()
#     i = 0
#     while i<10:
#         time.sleep(1)
#         i+=1
#     print("主进程结束")


# 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,
import time
from multiprocessing import Process


def func():
    while True:
        time.sleep(1)
        print("我还活着")


if __name__ == '__main__':
    p = Process(target=func)
    p.daemon = True  # 设置子进程为守护进程, #一定要在p.start()前设置,设置p为守护进程
    p.start()
    i = 0
    while i < 5:
        time.sleep(1)
        i += 1
    print("主进程代码结束")

进程的其他方法

from multiprocessing import Process
import time
def func(name):
    print("%s在test...."%name)

if __name__ == "__main__":
    p = Process(target=func,args=("andy",))
    p.start()
    print(p.is_alive())  # # 判断一个进程是否活着
    p.terminate()  # 结束一个进程,
    time.sleep(1)
    print(p.is_alive())