8-1-1python语法基础-并发编程-进程-进程创建,进程冲突,进程之间通信,进程池,
2023-09-14 09:00:33 时间
进程
并发编程的相关概念:
进程
- 1,运行中的程序,就是进程,程序是没有生命的实体,运行起来了就有生命了,
- 操作系统可以管理进程,进程是操作系统基本的执行单元,
- 2,每一个进程都有它自己的地址空间,进程之间是不会混的,比如qq不能访问微信的地址空间,
- 操作系统替你隔离开了,这也是操作系统引入进程这个概念的原因,
进程的调度
- 1,先来先服务,有一个不好的,就是不利于短作业
- 2,短作业优先算法,但其对长作业不利;不能保证紧迫性作业(进程)被及时处理;作业的长短只是被估算出来的。
- 3,时间片轮转算法,就是轮流执行,已经很科学了,
- 4,多级反馈队列算法,有多个队列,有一个新任务来了放入第一个队列,这是优先级加上时间片轮转,第二个任务来了放入下一级,
并发和并行:
- 进程的并行:这种只有在多核cpu才可以实现,
- 进程的并发:这是轮流执行,由于速度很快,看起来像是一起执行的,比如一遍听音乐,一遍写代码,
进程的三状态转换图:非常重要
- 1,进程一开始运行的时候,是就绪的状态,这是第一个状态,就是告诉cpu,我已经准备好可以运行了,进入排队了,
- 2,时间片轮转,轮到你了之后,你就运行了,这是第二个状态,
- 3,发生阻塞,这是第三个状态,比如你的程序让你输入内容,input方法, 这时候是阻塞的,你输入完毕了之后,就又畅通了,
- 这是等待I/O完成,input,sleep,文件的输入和输出,
- 事件处理之后,你还要进入就绪状态了,
- 全部处理完了,就结束了,
同步和异步
- 1,同步,需要等待,需要排队,你什么也不能干,
- 2,异步,不需要等待,你可以去做其他事情,
阻塞和非阻塞
- 1,阻塞,就是input,sleep这些,需要等待,这是阻塞,
- 2,非阻塞,就是跳过这些阻塞,但是程序中不可避免的需要阻塞,因为需要等待内容处理,
同步异步和阻塞非阻塞:
- 同步阻塞
- 同步非阻塞
- 异步阻塞
- 异步非阻塞,效率更高,
Python中使用多进程
- 运行中的程序就是一个进程。所有的进程都是通过它的父进程来创建的。
- 因此,运行起来的python程序也是一个进程,那么我们也可以在程序中再创建进程。
- 多个进程可以实现并发效果,就会让程序的执行速度变快。
- 多进程有一个内置的模块,我们需要借助这个模块:
- from multiprocessing import Process
multiprocessing模块
- 在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,
- 我将这部分大致分为四个部分:
- 1,创建进程部分,
- 2,进程同步部分,
- 3,进程池部分,
- 4,进程之间数据共享。
创建进程
创建进程的方式1,通过构造函数的方式
- 认知这个多进程:
- 1,python里面的多进程是使用的标准库,import multiprocessing,实现的
- 2,p.start():启动进程,并调用该子进程中的p.run()
- 3,p.join(),这个join就是在感知一个子进程的一个结束,将异步改成同步,主进程必须要等待子进程结束之后,才会结束,如果不加,主进程就会先结束,
- 4,注意参数的传递,args是传递一个元组,一个元素的时候要有逗号,kargs是传递一个字典,
创建单个进程
import multiprocessing, os
def hi(arg):
print(arg)
print("父进程号", os.getppid(), "子进程号", os.getpid())
if __name__ == '__main__':
p = multiprocessing.Process(target=hi, args=("xiaoxiao",))
p.start()
p.join()
print("end")
创建多个进程:
from multiprocessing import Process
import time
def task(name):
print('{} is running!'.format(name))
time.sleep(3)
print('{} is done!'.format(name))
if __name__ == '__main__':
# 开启10个子进程
p_list = []
for i in range(10):
p = Process(target=task, args=(i, ))
p_list.append(p)
p.start()
# p.join()
[p.join() for p in p_list] # 保证前面的10个进程全部结束了,才会执行下面的代码,
print('--- 主进程 ----')
# 这种开启了多进程,可以读多个进程去存文件,取文件内容,
创建进程的方式2:通过类的方式
- 注意两点
- 第一点,创建一个类,继承process
- 第二点,类中必须实现run方法,这个run方法里面就是子进程要执行的内容,
import os
from multiprocessing import Process
class MyProcess(Process): # 继承导入的process,
def __init__(self,name): # 为了进程能传递参数,
super().__init__() # 这是继承了父类所有的参数,
self.name=name
def run(self):
# print(os.getpid())
print("子进程号",self.pid)
print("参数",self.name) # print(os.getpid()) 这两句是一样的,
if __name__ == '__main__':
# p1=MyProcess() # 这是不传参数的
p1=MyProcess("name1") # 这是传参数的,这就是面向对象的实例化,
p2=MyProcess('name2')
p3=MyProcess('name3')
p1.start() #start会自动调用run
p2.start()
# p2.run()
p3.start()
# 三个进程之间是异步的,
p1.join()
p2.join()
p3.join()
# 三个进程都结束了才会执行下面的内容,这是把异步,变成异步,
print('主线程')
进程冲突和进程锁
- 这个和线程冲突和线程锁是一样的,
- 本质就是同时操作同一个资源导致的冲突,
- 解决方法就是加锁,
体验进程之间的数据隔离问题
- 进程之间的数据隔离问题,也就是不能共享全局变量
- 进程和进程之间的数据肯定是隔离的,比如qq和微信,不隔离就坏事了
- 几个进程之间,如果不通过特殊的手段,是不可能共享一个数据的,这个记住,没有什么可理解的,
from multiprocessing import Process
def work():
global n # 声明了一个全局变量,
n=0
print('子进程内: ',n)
if __name__ == '__main__':
n = 100
p=Process(target=work)
p.start()
print('主进程内: ',n)
进程之间通信
进程之间通信1,---管道
- 上面我们看到了,进程之间是不能通信的,那怎么解决进程之间的通信问题呢,可以使用管道,
from multiprocessing import Pipe
进程之间通信2,---队列
- 认知队列的操作
- 1,主要是put和get,
- 2,使用这个队列,可以实现复杂的生产者和消费者模型,这个将是主要的进程之间通信的方式,
from multiprocessing import Process, Queue
def work(q):
q.put("xiaoxiao")
if __name__ == '__main__':
q = Queue() # 创建一个Queue对象
p=Process(target=work,args=(q,)) #创建一个进程
p.start()
print(q.get())
p.join()
进程之间通信3,---数据共享
from multiprocessing import Value
总结
- 进程间的三种通信(IPC)方式:
- 方式一:队列(推荐使用)
- 方式二:管道(不推荐使用,了解即可)
- 管道相当于队列,但是管道不自动加锁
- 方式三:共享数据(不推荐使用,了解即可)
- 共享数据也没有自动加锁的功能,所以还是推荐用队列的。感兴趣的可以研究研究管道和共享数据
进程池
为什么会有进程池?
-
1,因为每次开启一个进程,都需要创建一个内存空间,这是耗时的
-
2,进程过多,操作调度也会耗时,
-
所以会有非常大的性能问题,
-
所以我们不会让进程太大,我们会设计一个进程池,
-
进程池:
-
1,Python中先创建一个进程的池子,
-
2,这个进程池能存放多少个进程,比如有5个进程,
-
3,先把这些进程创建好,
-
4,比如有50个任务他们到进程池里面去找进程,找到的就执行,找不到的就等待,
-
5,进程执行结束之后,不会结束,而是返回进程池,等待下一个任务,
-
所以进程池,可以节省进程创建的时间,节省了操作系统的调度,而且进程不会过多的创建,
进程池的使用:进程池的同步调用:
import os,time
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(1)
return n**2
if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply(work,args=(i,)) # 同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞
# 但不管该任务是否存在阻塞,同步调用都会在原地等着
res_l.append(res)
print(res_l) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
进程池的使用:进程池的异步调用:
import os
import time
import random
from multiprocessing import Pool
def work(n):
print('%s run' %os.getpid())
time.sleep(random.random())
return n**2
if __name__ == '__main__':
p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l=[]
for i in range(10):
res=p.apply_async(work,args=(i,)) # 异步运行,根据进程池中有的进程数,每次最多3个子进程在异步执行
# 返回结果之后,将结果放入列表,归还进程,之后再执行新的任务
# 需要注意的是,进程池中的三个进程不会同时开启或者同时结束
# 而是执行完一个就释放一个进程,这个进程就去接收新的任务。
res_l.append(res)
# 异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果
# 否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
p.close()
p.join()
for res in res_l:
print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
进程池的返回值
from multiprocessing import Pool, Process
def func(i):
return i
# if __name__ == '__main__':
# pool = Pool(5)
# res_list = []
# for i in range(10):
# # res = pool.apply(func,args=(i,)) # 所以这个结果接收,就是返回值,
# res = pool.apply_async(func,args=(i,)) # 所以这个结果接收,就是返回值,
# res_list.append(res)
# for res in res_list:
# print(res.get()) # get会阻塞等待结果
# 上面讲了apply和apply_async 的返回值的问题,
# 下面讲讲map的返回值的问题,比较简单
if __name__ == '__main__':
pool = Pool(5)
ret = pool.map(func,range(10))
print(ret) # 这是返回了一个列表,
# 使用的时候想用map,map搞不定就使用,apply_async
进程池的回调函数
from multiprocessing import Pool
def func1(n):
print(111)
return n
def func2(n):
print(222)
print(n*2)
if __name__ == '__main__':
p = Pool(5)
p.apply_async(func1,args=(10,),callback=func2)
p.close()
p.join()
# 回调函数都是在主进程中执行的,
总结
- 更高级的进程池是比较智能的,
- 比如现在进程池有5个进程,就可以处理过来了,就不需要增加
- 但是如果处理等待的任务太多了,急需要往进程池里面加进程,一直到设置的进程池上限
- 如果任务减少了,就进程池里面减少,
- 这是比较智能的,
- Python中没有高级的进程池,只有一个固定的进程数的进程池,没有弹性的那种,
多进程的信号量
进程池和信号量有什么关系?
- 使用了信号量,并没有改变进程的个数,
- 比如有200个任务,
- 依然200个进程在排队,只是控制了同一时间有几个进程在执行,
- 如果信号量是5
- 也就是只允许5个进程让操作系统调度,节省了操作系统的调度时间,但是并没有节省进程的创建时间,
- 而进程池,是有200个任务去拿进程,所以进程池既是节省了操作系统的调度时间,也节省进程的创建时间,
from multiprocessing import Process
import time,random
from multiprocessing import Semaphore
# ktv只有1个房间,1个房间只能装4个人,但是这样写就是20个人都进入到房间了,
# 假设ket门口有4把钥匙,一个进程来了那一把钥匙,然后关门,这样只有4个进程能拿到,剩下的之后1个进程出来了才可以继续其他的进程,
# 这个概念就叫做信号量,同一时间就只有四个人,
def ktv(i,sem):
sem.acquire() # 获取钥匙
print("%d进入ktv"%i)
time.sleep(random.randint(60,180)) # 这是每一个人唱歌1-3分钟
print("%d走出ktv"%i)
sem.release() # 还钥匙
if __name__ == "__main__":
sem = Semaphore(4) # 这就是设置有多少把钥匙, 信号量的英文就是:Semaphore
for i in range(20):
p=Process(target=ktv,args=(i,sem))
p.start()
进程的事件
# 事件
import time
from multiprocessing import Event, Process
# 一个信号,可以使所有的进程都进入阻塞状态,也可以控制所有信号都解除阻塞,
# 一个事件创建之后,默认是阻塞状态,
# e = Event() # 创建一个事件
# print(e.is_set()) # 查看一个事件是否是阻塞状态,
# print(123445)
# e.set() # 这是把阻塞的状态改为true,
# print(e.is_set())
# e.wait() # 根据e.is_set()的结果,如果是false,就会阻塞,如果是true就会不阻塞
# print(12344)
# e.clear() # 这是把阻塞的状态改为false
# print(e.is_set())
# e.wait() # 虽然阻塞了,但是一定要有这个wait,才会阻塞后面的代码,
# print(444444)
# 举一个例子,红绿灯
# 每一个进程表示一辆车,
def car(e,i):
#e.is_set() 默认返回False 代表的是绿灯
if not e.is_set():
print("car%s在等待"%i)
e.wait()
print("car%s通行了"%i)
def light(e):
while True:
if e.is_set():
e.clear()
print('\033[31m红灯亮了\033[0m')
else:
e.set()
print('\033[32m绿灯亮了\033[0m')
time.sleep(2)
if __name__ == '__main__':
e=Event()
# 模拟启动交通灯
p1=Process(target=light,args=(e,))
p1.daemon=True
p1.start()
#模拟20辆小车
for i in range(20):
import random
time.sleep(random.uniform(0,2))
p2=Process(target=car,args=(e,i))
p2.start()
print("程序彻底结束!")
守护进程
"""
守护进程
父进程中将一个子进程设置为守护进程,那么这个子进程会随着主进程的结束而结束。
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
"""
# 第一版:主进程结束了,子进程还没有结束,
# import time
# from multiprocessing import Process
#
# def func():
# while True:
# time.sleep(1)
# print("我还活着")
#
#
# if __name__ == '__main__':
# p=Process(target=func)
# p.start()
# i = 0
# while i<10:
# time.sleep(1)
# i+=1
# print("主进程结束")
# 守护进程,就是主进程代码结束了而结束,记住不是主进程彻底结束,而是代码结束,
import time
from multiprocessing import Process
def func():
while True:
time.sleep(1)
print("我还活着")
if __name__ == '__main__':
p = Process(target=func)
p.daemon = True # 设置子进程为守护进程, #一定要在p.start()前设置,设置p为守护进程
p.start()
i = 0
while i < 5:
time.sleep(1)
i += 1
print("主进程代码结束")
进程的其他方法
from multiprocessing import Process
import time
def func(name):
print("%s在test...."%name)
if __name__ == "__main__":
p = Process(target=func,args=("andy",))
p.start()
print(p.is_alive()) # # 判断一个进程是否活着
p.terminate() # 结束一个进程,
time.sleep(1)
print(p.is_alive())
相关文章
- 【NLP基础】英文关键词抽取RAKE算法
- 02_Linux基础-文件系统层次结构-提示符-进程-命令格式-隐藏文件-通配符-绝对相对路径-{1..100}-ls-mkdir-其他基础命令
- Supervisor进程管理工具快速入门与使用
- 探寻Linux系统中运行的进程(查看linux运行进程)
- PHP多进程开发1详解编程语言
- Linux进程管理:从基础到全方位(linux进程详解)
- 掌握Oracle查询进程的方法(oracle查询进程)
- 如何在 Linux 系统中结束进程或是中止程序
- 深入理解Linux进程切换原理与机制(linux进程切换)
- 如何获取Linux进程ID?(linux进程id获取)
- 如何查看 Linux 系统中的进程信息?(查看linux进程)
- Linux系统寻找特定进程的方法(linux查找某个进程)
- Linux进程挂起,如何恢复?(linux进程挂起)
- Linux下查看进程内存使用情况(linux查看进程内存占用)
- 如何快速终止Redis进程(怎么杀死redis进程)
- Oracle中断解锁表解决杀死进程难题(oracle中锁表杀进程)
- 让Redis享受多进程多携程的优越体验(多进程多携程写redis)