zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Python学习笔记 day9 进程与线程

2023-09-27 14:26:26 时间

进程、与线程区别

进程

程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程。程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。

在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行。这是这样的设计,大大提高了CPU的利用率。进程的出现让每个用户感觉到自己独享CPU,因此,进程就是为了在CPU上实现多道编程而提出的。

有了进程为什么还要线程?

进程有很多优点,它提供了多道编程,让我们感觉我们每个人都拥有自己的CPU和其他资源,可以提高计算机的利用率。很多人就不理解了,既然进程这么优秀,为什么还要线程呢?其实,仔细观察就会发现进程还是有很多缺陷的,主要体现在两点上:

进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。

进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。

线程

线程是操作系统能够进行运算调度的最小单位。 它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
可以简单理解:线程就是一堆指令

 

 

区别

  • 线程共享内存空间,进程的内存是独立的
  • 线程启动速度快,进程启动速度慢。运行的时候两者没有可比性
  • 同一个进程的线程之间可以直接交流,两个进程想通信,必须通过一个中间代理来实现
  • 创建新线程很简单,创建新进程需要对父进程进行一次克隆
  • 一个线程可以控制和操作同一个进程里的其他线程,但是进程只能操作子进程
  • 一个进程一定至少有一个线程。进程执行要通过线程来执行,进程是一堆资源的集合。
  • 根本区别:进程是资源分配最小单位,线程是程序执行的最小单位。 计算机在执行程序时,会为程序创建相应的进程,进行资源分配时,是以进程为单位进行相应的分配。每个进程都有相应的线程,在执行程序时,实际上是执行相应的一系列线程。

线程

 

多线程例子
直接调用

普通情况等4import time
def run(n):
    print("task ",n)
    time.sleep(2)
run("t1")
run("t2")

多线程 并行,只需等2import threading
import time

def run(n):
    print("task ",n)
    time.sleep(2)
t1 = threading.Thread(target=run,args=("t1",))
t2 = threading.Thread(target=run,args=("t2",))
t1.start()
t2.start()


for循环利用,原本要sleep 100秒,通过多线程,只需要sleep 2import threading
import time

def run(n):
    print("task ",n)
    time.sleep(2)
    print("task done",n)

start_time = time.time()
for i in range(50):
    t = threading.Thread(target=run,args=("t-%s" %i ,))
    t.start()

# 正常情况下,如果打印出来就是程序执行完了
print("cost:",time.time() - start_time)  # 发觉时间不是2秒

但实际上,这个线程是没执行完毕。因为是多线程是并行的。
一个程序至少有一个线程,print("csot:")的是属于主线程,因为主线程启动了之后,子线程是独立的。 


如何测试所有线程执行完的时间呢?

继承式调用

import threading
import time
 
 
class MyThread(threading.Thread):
    def __init__(self,num):
        threading.Thread.__init__(self)  # super(MyThread).__init__(self)
        self.num = num
 
    def run(self):#定义每个线程要运行的函数,类里必须要run
        print("running on number:%s" %self.num)
        time.sleep(3)
 
if __name__ == '__main__':
    t1 = MyThread(1)
    t2 = MyThread(2)
    t1.start()
    t2.start()

Join&Daemon

import threading
import time

class MyThread(threading.Thread):
    def __init__(self, n,sleep_time):
        threading.Thread.__init__(self)
        self.n = n
        self.sleep_time = sleep_time

    def run(self):
        print("running task ",self.n)
        time.sleep(self.sleep_time)
        print("task done,",self.n)

if __name__ == '__main__':
    t1 = MyThread("t1",2)
    t2 = MyThread("t2",4)
    t1.start()
    t2.start()
    t1.join()  # 等待t1的结果
    t2.join()  # 等待t2的结果  #这样实际t2 的4秒,因为是并行就是等t1返回后的2秒
# 想要的效果是线程是并行的,等所有线程完毕后 然后在主线程往下走
    print("main thread")
# 程序完全退出前,最后会默认有join

如何一次性启动50个线程,然后统一算出等待的结果?
思路:循环创建50个线程,把线程实例放到空列表(这样不会卡住下一个线程的启动)。再循环线程实例,得到join结果

import threading
import time

def run(n):
    print("task ",n)
    time.sleep(2)
    print("task done",n)


start_time = time.time()
t_objs = []
for i in range(50):
    t = threading.Thread(target=run,args=("t-%s" %i ,))  # 创建线程
    t.start()
    t_objs.append(t) # 为了不阻塞后面线程的启动,不在这里join,先放到一个列表里

for t in t_objs: # 循环线程实例列表,等待所有线程执行完毕
    t.join()

# 想要的效果是线程是并行的,等所有线程完毕后 然后在主线程往下走
print("all thread has finished...")
print("cost:",time.time() - start_time)


# 会发现退出的时候会等待所有的线程执行完毕才退出程序
# 程序完全退出前,最后会默认有join

主线程和子线程

  • threading.current_thread() 可以判断是主线程(MainThread)还是子线程(Thread)
  • threading.active_count() 查看线程个数,如果是上面那个例子,就会看到51个
  • 如果没有join,主线程和子线程是并行的。加了join,主线程依赖子线程执行完后再往下走

守护进程 Daemon
守护进程 相当于是主人,搞了几个仆人是为主人服务的。 如果主人没了,就没了。 现在要把子线程变为守护进程。 主线程还需要等待子线程的执行完毕吗?
这里其实就是守护线程

现在想达到的效果:只要主线程执行完毕,不管子线程有没执行完毕,即把所有的子线程变为守护线程。
t.setDaemon(True) # 把当前线程设置为守护线程,要在start之前

加了守护线程后,会发现没有 task done。因为主线程结束后,子线程会强制中断。

import threading
import time

def run(n):
    print("task ",n)
    time.sleep(2)
    print("task done",n)


start_time = time.time()
t_objs = []
for i in range(50):
    t = threading.Thread(target=run,args=("t-%s" %i ,))
    t.setDaemon(True)
    t.start()
    t_objs.append(t)


# 想要的效果是线程是并行的,等所有线程完毕后 然后在主线程往下走
print("all thread has finished...",threading.active_count())
print("cost:",time.time() - start_time)
# 程序完全退出前,最后会默认有join

 

线程锁之Lock\Rlock\信号量

一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,会出现什么状况?

import threading
import time

def run(n):
    global num
    time.sleep(1)
    num -=1

num = 100  # 主线程声明,想在子线程改
t_objs = []
for i in range(100):
    t = threading.Thread(target=run,args=("t-%s" %i ,))
    t.start()
    t_objs.append(t)  # 为了不阻塞后面线程的启动,不在这里join,先放到一个列表里

for t in t_objs:  # 循环线程实例列表,等待所有线程执行完毕
    t.join()

print("-----------all threads has finished....")
print("num:", num)

正常来讲,这个num结果应该是0, 但在python 2.7上多运行几次,会发现,最后打印出来的num结果不总是0,为什么每次运行的结果不一样呢? 哈,很简单,假设你有A,B两个线程,此时都 要对num 进行减1操作, 由于2个线程是并发同时运行的,所以2个线程很有可能同时拿走了num=100这个初始变量交给cpu去运算,当A线程去处完的结果是99,但此时B线程运算完的结果也是99,两个线程同时CPU运算的结果再赋值给num变量后,结果就都是99。那怎么办呢? 很简单,每个线程在要修改公共数据时,为了避免自己在还没改完的时候别人也来修改此数据,可以给这个数据加一把锁, 这样其它线程想修改此数据时就必须等待你修改完毕并把锁释放掉后才能再访问此数据。

*注:不要在3.x上运行,不知为什么,3.x上的结果总是正确的,可能是自动加了锁

加锁版本
Lock
GIL VS Lock
Python已经有一个GIL来保证同一时间只能有一个线程来执行了,为什么这里还需要lock? 注意啦,这里的lock是用户级的lock,跟那个GIL没关系。 图从左下,上右
在这里插入图片描述全局解释器锁:保证同一时间只有一个线程执行
执行时间到了,被要求释放GIL,释放了就不执行,解锁,让其他线程执行。但是那个一线程还没执行完,就到二线程了,这时count还是0,线程2执行完后是1。 然后线程1重新执行之前的,从寄存器拿之前存的临时缓存(这时的count还是0),加完了之后直接赋值给count,在线程二执行后本来已经等于1了。 所以又赋值等于1。

如果想让他加的没问题,百分百执行正确? 锁定。 在加减的时候进行串行操作
Lock(互斥锁)
用户态的锁:保证同一时间只有一个线程修改数据 lock = threading.Lock()
此时只能我去修改这个数据,修改完后释放。 Python 3.x 可以不加锁,似乎是默认了

import threading
import time

def run(n):
    lock.acquire()
    global num
    num +=1
    # time.sleep(1)  通过sleep可以看出是串行。等待50秒
    lock.release()


lock = threading.Lock()
num = 0  # 主线程声明,想在子线程改
t_objs = []
for i in range(50):
    t = threading.Thread(target=run,args=("t-%s" %i ,))
    t.start()
    t_objs.append(t)  # 为了不阻塞后面线程的启动,不在这里join,先放到一个列表里

for t in t_objs:  # 循环线程实例列表,等待所有线程执行完毕
    t.join()

print("-----------all threads has finished....")
print("num:", num)

 
RLock(递归锁)
说白了就是在一个大锁中还要再包含子锁。即程序连续用好几次锁的时候,要RLock,实际情况中比较少用,了解。

先进入run1,然后再进入run2。run3大锁一直都在。 所以同一时刻有两把锁着

import threading,time
 
def run1():
    print("grab the first part data")
    lock.acquire()
    global num
    num +=1
    lock.release()
    return num
def run2():
    print("grab the second part data")
    lock.acquire()
    global  num2
    num2+=1
    lock.release()
    return num2
def run3():
    lock.acquire()
    res = run1()
    print('--------between run1 and run2-----')
    res2 = run2()
    lock.release()
    print(res,res2)
 
 
if __name__ == '__main__':
 
    num,num2 = 0,0
    lock = threading.RLock()  # lock = threading.Lock()就可以看出一直死循环,出不来,锁住了
    for i in range(10):
        t = threading.Thread(target=run3)
        t.start()
 
while threading.active_count() != 1:  # 不用join的方法。判断 线程执行完毕没有
    print(threading.active_count())
else:
    print('----all threads done---')
    print(num,num2)

 
Semaphore(信号量)
互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

第一次一次性执行5个线程,接着后面是每出来一个就放一个进去

import threading,time
 
def run(n):
    semaphore.acquire()
    time.sleep(1)
    print("run the thread: %s\n" %n)
    semaphore.release()
 
if __name__ == '__main__':
 
    num= 0
    semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
    for i in range(20):
        t = threading.Thread(target=run,args=(i,))
        t.start()
 
while threading.active_count() != 1: # 和join效果差不多
    pass #print threading.active_count()
else:
    print('----all threads done---')
    print(num)

 

Event事件

通过Event来实现两个或多个线程间的交互,线程1执行ace,线程2执行bdf。
If the flag is set, the wait method doesn’t do anything.
标志位设定了,代表绿灯,直接通行
If the flag is cleared, wait will block until it becomes set again.
标志位被清空,代表红灯,wait等待变绿灯
Any number of threads may wait for the same event.
每个线程(车)可以同时等待同一个事件

下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。

import threading,time
import random
def light():
    if not event.isSet():
        event.set() #先设置一个全局变量,绿灯  #wait就不阻塞 态  
    count = 0
    while True:
        if count < 10:
            print('\033[42;1m--green light on---\033[0m')
        elif count <13:
            print('\033[43;1m--yellow light on---\033[0m')
        elif count <20:
            if event.isSet():
                event.clear() # 清空
            print('\033[41;1m--red light on---\033[0m')
        else:
            count = 0
            event.set() #打开绿灯
        time.sleep(1)
        count +=1
def car(n):
    while 1:
        time.sleep(random.randrange(10))
        if  event.isSet(): #绿灯
            print("car [%s] is running.." % n)
        else:
            print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
    event = threading.Event()
    Light = threading.Thread(target=light)
    Light.start()
    for i in range(3):
        t = threading.Thread(target=car,args=(i,))
        t.start()

这里还有一个event使用的例子,员工进公司门要刷卡, 我们这里设置一个线程是“门”, 再设置几个线程为“员工”,员工看到门没打开,就刷卡,刷完卡,门开了,员工就可以通过。

#_*_coding:utf-8_*_
__author__ = 'Alex Li'
import threading
import time
import random

def door():
    door_open_time_counter = 0
    while True:
        if door_swiping_event.is_set():
            print("\033[32;1mdoor opening....\033[0m")
            door_open_time_counter +=1

        else:
            print("\033[31;1mdoor closed...., swipe to open.\033[0m")
            door_open_time_counter = 0 #清空计时器
            door_swiping_event.wait()


        if door_open_time_counter > 3:#门开了已经3s了,该关了
            door_swiping_event.clear()

        time.sleep(0.5)


def staff(n):

    print("staff [%s] is comming..." % n )
    while True:
        if door_swiping_event.is_set():
            print("\033[34;1mdoor is opened, passing.....\033[0m")
            break
        else:
            print("staff [%s] sees door got closed, swipping the card....." % n)
            print(door_swiping_event.set())
            door_swiping_event.set()
            print("after set ",door_swiping_event.set())
        time.sleep(0.5)
door_swiping_event  = threading.Event() #设置事件


door_thread = threading.Thread(target=door)
door_thread.start()



for i in range(5):
    p = threading.Thread(target=staff,args=(i,))
    time.sleep(random.randrange(3))
    p.start()

 

queue队列

class queue.Queue(maxsize=0) #先入先出
class queue.LifoQueue(maxsize=0) #last in fisrt out 先入后出
class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列

把所有硬盘按顺序放到桌子,想用的时候就取一块
为什么要队列?双方提高效率,完成程序的解耦(没有直接依赖关系)

如果没有队列,大家都干等着,而且桌子会着急,这么多咋整
 
有列表,为什么还要队列?列表取出来,数据还在列表。 队列取出来,数据只有一份,取出来就不在桌子上

Python2.x是大写的Q,Python3.x是小写的q
Queue.qsize()
Queue.empty() #return True if empty
Queue.full() # return True if full
Queue.put(item, block=True, timeout=None)
Queue.get(block=True, timeout=None)

简单例子

class queue.Queue(maxsize=0) #先入先出
import queue

q = queue.Queue()  # queue.Queue(maxsize=3) 设置最大为3
q.put(1)
q.put(2)

print(q.get())  # 默认取不到数据就会阻塞,q.get(block=False) 会报出异常。 q.get(timeout=1)
print(q.get())
# q.get_nowait()  # 如果抛出异常就不取
# q.qsize()  # 如果等于0就不取
结果:
1
2


class queue.LifoQueue(maxsize=0) #last in fisrt out   先入后出
import queue

q = queue.LifoQueue()  # queue.LifoQueue(maxsize=3) 设置最大为3
q.put(1)
q.put(2)

print(q.get())  # 默认取不到数据就会阻塞,q.get(block=False) 会报出异常。 q.get(timeout=1)
print(q.get())
结果:
2
1 


class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
import queue

q = queue.PriorityQueue()
q.put((10,"guozi"))
q.put((20,"xiaobin"))
q.put((-1,"he"))
q.put((5,"xixi"))

print(q.get())
print(q.get())
print(q.get())
print(q.get())
结果:(-1, 'he')
(5, 'xixi')
(10, 'guozi')
(20, 'xiaobin')

 

生产者消费者模型

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
 

最基本的生产者消费者模型的例子(队列,Queue)

import threading
import queue
 
def producer():
    for i in range(10):
        q.put("骨头 %s" % i )
    print("开始等待所有的骨头被取走...")
    q.join()
    print("所有的骨头被取完了...")
 
 
def consumer(n):
    while q.qsize() >0:
        print("%s 取到" %n  , q.get())
        q.task_done() #告知这个任务执行完了
 
 
q = queue.Queue()
p = threading.Thread(target=producer,) # 并发
p.start()
c1 = consumer("果子哥")

边吃边产的效果:

import threading
import queue
import time

q = queue.Queue()
def producer(name):
    count =1
    while True:
        q.put("骨头 %s" % count)
        print("[%s]生产了骨头[%s]" %(name,count))
        count +=1
        time.sleep(2)

def consumer(name):
    while True:
        print("[%s]取到[%s] 并且吃了它...." %(name,q.get()))
        time.sleep(1)



p = threading.Thread(target=producer,args=("果子哥",))
c = threading.Thread(target=consumer,args=("小西",))
c1 = threading.Thread(target=consumer,args=("小东",))

p.start()
c.start()
c1.start()
# 解决了解耦和排队的问题


结果:
[果子哥]生产了骨头[1]
[小西]取到[骨头 1] 并且吃了它....
[果子哥]生产了骨头[2]
[小东]取到[骨头 2] 并且吃了它....
[果子哥]生产了骨头[3]
[小西]取到[骨头 3] 并且吃了它....
[果子哥]生产了骨头[4]
[小东]取到[骨头 4] 并且吃了它....

吃包子

import time,random
import queue,threading
q = queue.Queue()
def Producer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(3))
    q.put(count)
    print('Producer %s has produced %s baozi..' %(name, count))
    count +=1
def Consumer(name):
  count = 0
  while count <20:
    time.sleep(random.randrange(4))
    if not q.empty():
        data = q.get()
        print(data)
        print('\033[32;1mConsumer %s has eat %s baozi...\033[0m' %(name, data))
    else:
        print("-----no baozi anymore----")
    count +=1
p1 = threading.Thread(target=Producer, args=('A',))
c1 = threading.Thread(target=Consumer, args=('B',))
p1.start()
c1.start()

 

python GIL全局解释器锁

无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行,但为什么看上去是并发的?因为是来回切换。

首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL

 

题目:简单主机批量管理工具

需求:
1.主机分组
2.主机信息配置文件用configparser解析
3.可批量执行命令、发送文件,结果实时返回,执行格式如下
batch_run -h h1,h2,h3 -g web_clusters,db_servers -cmd “df -h” 
batch_scp -h h1,h2,h3 -g web_clusters,db_servers -action put -local test.py -remote /tmp/ 
4.主机用户名密码、端口可以不同 (账户文件)
5. 执行远程命令使用paramiko模块
6.批量命令需使用multiprocessing并发

 

进程

进程间通讯

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues
使用方法跟threading里的queue差不多

from multiprocessing import Process, Queue
 
def f(q):
    q.put([42, None, 'hello'])
 
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

 

进程池