zl程序教程

您现在的位置是:首页 >  其他

当前栏目

8-2-1python语法基础-并发编程-线程-创建线程,线程冲突(锁),线程通信(condition,队列),线程池,定时器

2023-09-14 08:59:04 时间

线程

为什么要使用多线程

  • 多线程的目的为了并发,并发是为了性能,效率,这一点要提高到最核心最基础的认识里面,
  • 举例:比如一个6层楼,要找一个人,如果1个人搜索就要6层依次搜寻,如果6个人就是6倍的效率提高

如何使用python实现多线程?

  • 很多语言都有多线程,我们现在是关注的python如何来实现多线程
  • 我要有一个自己的例子,就是多线程验证ip可用性的例子
  • 先上代码:
import redis
import queue
import requests
import threading
import logging

logging.basicConfig(level=logging.INFO,
                    format=
                    # 日志的时间
                    '%(asctime)s'
                    # 日志级别名称 : 当前行号
                    ' %(levelname)s [%(filename)s : %(lineno)d ]'
                    # 日志信息
                    ' : %(message)s'
                    # 指定时间格式
                    , datefmt='[%Y/%m/%d %H:%M:%S]')
logging = logging.getLogger(__name__)

# 第一步,把数据取出来,在redis
conn = redis.Redis(host="127.0.0.1", port="6379")
# proxy_list = conn.hgetall("use_proxy")
# proxy_list = conn.hvals("use_proxy")
proxy_list = conn.hkeys("use_proxy")
# logging.info(proxy_list)

# 第二步,把数据存入队列
proxy_queue = queue.Queue()
for proxy in proxy_list:
    proxy_queue.put(str(proxy, encoding="utf-8"))
queue_size = proxy_queue.qsize()


# logging.info(queue_size)
# logging.info(proxy_queue.get())


# 第三步,多线程验证
# 1,先写一个类用来验证ip
class TreadCheck(threading.Thread):
    def __init__(self, target_queue, thread_name):
        threading.Thread.__init__(self, name=thread_name)
        self.target_queue = target_queue
        self.thread_name = thread_name

    def run(self):
        logging.info("{} 开始".format(self.name))
        while True:
            try:
                proxy = self.target_queue.get(block=False)
            except queue.Empty:
                logging.info("{} 结束".format(self.name))
                break

            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0',
                       'Accept': '*/*',
                       'Connection': 'keep-alive',
                       'Accept-Language': 'zh-CN,zh;q=0.8'}
            proxies = {
                "http": "http://" + str(proxy),
                "https": "https://" + str(proxy)
            }
            # 代理验证目标网站
            url_http = "http://httpbin.org"
            url_https = "https://www.qq.com"
            http_code = False
            https_code = False
            try:
                res_http = requests.head(url=url_http, headers=headers, proxies=proxies, timeout=10)
                if res_http.status_code == 200:
                    http_code = True
            except Exception as e:
                http_code = False

            try:
                res_https = requests.head(url=url_https, headers=headers, proxies=proxies, timeout=10)
                logging.info(res_https.status_code)
                if res_https.status_code == 200:
                    https_code = True
            except Exception as e:
                https_code = False

            logging.info(
                "{} http_status:{} https_status:{} 代理:{} ".format(self.name, str(http_code).ljust(6),
                                                             str(https_code).ljust(6), proxy.ljust(25)))


# 2,使用多线程
thread_list = []
for index in range(20):
    thread_list.append(TreadCheck(proxy_queue, "thread_" + str(index).zfill(2)))

for thread in thread_list:
    thread.setDaemon(True)
    thread.start()

for thread in thread_list:
    thread.join()

示例代码解析

  • 通过上面的示例代理,来学习一下python的多线程

创建线程

  • 认知:
  • 创建线程有两种方式,一种基于函数创建,一种基于类创建
  • 简单的多线程可以使用函数来实现,
  • 复杂的多线程,需要类的方式来实现,

第一种方式:基于构造函数创建

from threading import Thread
import time


def hi(msg, sec):
    print("enter hi() . {} @{}".format(msg, time.strftime("%H:%M:%S")))
    time.sleep(sec)
    print("{} @{}".format(msg, time.strftime("%H:%M:%S")))
    return sec


print("begin @{}".format(time.strftime("%H:%M:%S")))

for i in range(1, 5):
    t = Thread(target=hi, args=(i, i))  # 注册
    t.start()  # 这是启动了一个线程
    # hi(i, i)

print("end at {}".format(time.strftime("%H:%M:%S")))

# 使用单线程的这个结果应该是,1+2+3+4=10秒
# 使用多线程的这个结果应该是,4秒

创建线程第二种方式:基于类创建

  • 注意两点:
  • 1,要继承Thread
  • 2,要重写run方法
from threading import Thread
import time, os


class MyTread(Thread):
    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):   # 这个run是重写 
        print("enter hi() . {} @{}".format(self.arg, time.strftime("%H:%M:%S")))
        time.sleep(self.arg)
        print("{} @{}".format(self.arg, time.strftime("%H:%M:%S")))
        # print(1, os.getpid())


print("begin @{}".format(time.strftime("%H:%M:%S")))

for i in range(1, 5):
    t = MyTread(i)  # 初始化线程,传递参数
    t.start()  # 开启线程,

# print("主线程", os.getpid())  # 打印子进程和主进程的进程号,都是一样的

print("end at {}".format(time.strftime("%H:%M:%S")))

通过一个例子再次体验多线程的加速

  • 认知;
  • 使用多线程加速的例子,龟速和秒速
  • 如果不是多线程,那就是一个一个排队,
  • 如果是多线程,就是并发,那就是成倍的提高速度,
from threading import Thread
import time


def hi(sec):
    time.sleep(1)
    print(sec)


print("begin @{}".format(time.strftime("%H:%M:%S")))

for i in range(256):
    t = Thread(target=hi, args=(i,))  # 注册
    t.start()  # 这是启动了一个线程
    # hi(i)

print("end at {}".format(time.strftime("%H:%M:%S")))

setDaemon方法

  • 当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,

  • 当设置多线程时,主线程会创建多个子线程,

  • 当主线程结束时根据子线程daemon(设置thread.setDaemon(True))属性值的不同

  • 可能会发生下面的两种情况之一:

  • False

  • 如果某个子线程的daemon属性为False,

  • 主线程结束时会检测该子线程是否结束,如果该子线程还在运行,则主线程会等待它完成后再退出;

  • 在python中,默认情况下(其实就是setDaemon(False)),

  • True

  • 如果某个子线程的daemon属性为True,

  • 设置为true,意思就是把主线程A设置为守护线程,

  • 这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.

  • 这就是setDaemon方法的含义,

  • 这基本和join是相反的。

  • 可能出现的情况就是,子线程的任务还没有完全执行结束,就被迫停止,

  • 如果需要修改daemon值,必须在调用start()方法启动线程之前进行设置。

  • 示例

from threading import Thread
import time
def func1(name):
    while True:
        print(11111111)
        time.sleep(1)

def func2(name):
    print(2222222)
    time.sleep(5)


if __name__ == '__main__':
    t=Thread(target=func1,args=('andy',))
    t.daemon = True  # 主线程代码结束,子线程随之结束,
    # 不加守护线程,主线程就会等待子线程的结束,然后主线程才会结束,
    t.start()

    t2=Thread(target=func2,args=('lucy',))
    t2.start()
    # 主线程会等待子线程结束
    print('主线程')
    print(t.is_alive())

join方法

  • 必须要深刻理解join:不同地方起到不同的作用
  • 一定要写对地方,否则起不到并发的效果
  • 主线程A中,创建了子线程B,并且在主线程A中调用了B.join(),
  • 那么,主线程A会在调用的地方阻塞,直到子线程B完成操作后,才可以接着往下执行。
  • 一般是设置守护线程为true,然后设置join,这样保证子线程结束的时候,子线程是结束的
  • 而且也能保证主线程是在每一个子线程结束之后才会结束
from threading import Thread
import time


class MyTread(Thread):
    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):   # 这个run是重写
        print("enter hi() . {} @{}".format(self.arg, time.strftime("%H:%M:%S")))
        time.sleep(self.arg)
        print("{} @{}".format(self.arg, time.strftime("%H:%M:%S")))


print("begin @{}".format(time.strftime("%H:%M:%S")))

tread_list = []
for i in range(1, 5):
    t = MyTread(i)  # 初始化线程,传递参数
    t.start()  # 开启线程,
    tread_list.append(t)
   # t.join # 如果你把t.join 放到这里,就会导致线程是一个一个的顺序执行的,这就和普通的for循环一样了,起不到并发的意义了,所以不能放到这个地方,

for t in tread_list:
    t.join()  # 主线程会等待子线程结束了之后才会结束,我们经常会用到这个功能,比如100个线程,我们需要在主线程汇总100个线程的结果就需要用到这个方法

print("end at {}".format(time.strftime("%H:%M:%S")))

为什么要使用queue队列?

  • 线程通信-----通过队列实现
  • 这是因为线程会出现冲突,当然可以加锁解决线程冲突
  • 但是加锁会自己写代码,不方便,我们可以使用队列
  • 队列内置了很多的锁,可以保证数据安全,
  • queue队列 :使用import queue,用法与进程Queue一样
import queue

q = queue.Queue()

q.put()
q.put_nowait()  # 这个会报错
q.get()
q.get_nowait()  # 这个会报错,

# 队列的特点:先进先出,


queue.LifoQueue()  # 栈,先进后出,
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())

queue.PriorityQueue()  # 优先级队列,
# put进入一个元组
# 元组的第一个元素是优先级,(通常也可以是数字,或者也可以是非数字之间的比较)  数字越小,优先级越高
q.put(20, 1)
q.put(10, 2)  # 数字越小,优先级越高,优先级一样,就是按照和ASk码排,
q.put(30, 3)
print(q.get())
print(q.get())
print(q.get())

# 总结:
# 1,普通队列
# 2,栈
# 3,优先级队列,
# 这三种都不会出现多线程抢占资源,

线程总结

  • 上面的是最为重要的,是必须要理解的,下面的可以慢慢理解,
为什么会有进程?
主要是能够同时处理多个任务,多个任务还要进行切换,时间片轮转
为什么会有线程?
进程并不是执行任务的最小单元,每一个进程里面有都一个线程,我们叫做主线程,
早期没有线程,一个进程只能干一个任务,如果有多个任务,只能多起进程,进程太多不行的,
进程内部的内存是共享的,所以需要线程,否则还要涉及到进程间的通信,这比较浪费资源

所以线程的出现解决了两个问题:
1,进程内部的通信
2,一个进程可以处理多个任务,


# 进程和线程的对比:
进程和线程的区别?

进程拥有一个完整的虚拟地址空间,不依赖于线程而独立存在;
反之,线程是进程的一部分,没有自己的地址空间,与进程内的其他线程一起共享分配给该进程的所有资源。
1,进程之间的内存是独立的,但是一个进程之内的线程是可以共享的,
2,进程之间切换是慢于线程之间的切换的,

关系问题,资源问题,分工问题,效率问题,数据问题,

第一个结论要记住:进程先有的,然后才有线程,线程是进程的一部分,线程是依赖于进程的,没有进程就没有线程,
第二个结论要记住:一个进程里面一定有一个主线程,进程是一个资源分配的单位,真正执行代码的时候还是线程,
第三个结论要记住:进程需要开辟新的资源空间,所以多线程需要很大的资源才可以实现多任务,但是线程只需要很少的资源就可以实现多任务,
第四个结论要记住:进程之间是数据独立的,所以涉及到进程间的通信问题,但是一个进程的线程之间是资源共享的,

比如:
一个网易云音乐,开启了之后就是一个进程,
使用这个网易云音乐既要下载音乐,又要播放音乐,这就是两个线程,


线程冲突和解决

  • 上面只是提到了使用队列解决线程冲突,
  • 还是要深度理解一下这个线程冲突,和其他的解决方案

体验线程冲突的例子

  • 为什么会出现这种冲突的情况,就是因为多个线程同时操作一个资源的时候就会出现这个问题,
  • 线程冲突还可以用来生成随机数的,
  • 但是有GIL锁啊,为什么还会产生这种情况?
  • 还是因为时间片轮转,你取到了数据,还没有来得及操作,这个值就被其他的线程拿走了,
from threading import Thread

num = 0


class MyTread(Thread):

    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):  # 这个run是重写
        global num
        for i in range(1000000):
            num += 1
        print(num)


tread_list = []
for i in range(1, 5):
    t = MyTread(i)  # 初始化线程,传递参数
    t.start()  # 开启线程,
    tread_list.append(t)

for t in tread_list:
    t.join()  # 主线程会等待子线程结束了之后才会结束,我们经常会用到这个功能,比如100个线程,我们需要在主线程汇总100个线程的结果就需要用到这个方法

互斥锁解决线程冲突

  • 认识:
  • 加锁降低了性能,但是保证了数据的安全性,
  • 互斥锁和join的区别:在线程启动阶段就加入join实现串行也能避免线程冲突,
  • 但是明显这种效率太低,很明显是加锁的效率更高.
  • 这种很常见,比如同时操作一个文件,
from threading import Thread,Lock


num = 0

Lock = Lock()
class MyTread(Thread):

    def __init__(self, arg):
        super().__init__()
        self.arg = arg

    def run(self):  # 这个run是重写
        global num
        if Lock.acquire():  # 锁住线程,
            for i in range(1000000):
                num += 1
            Lock.release()  # 释放锁,
        print(num)


tread_list = []
for i in range(1, 5):
    t = MyTread(i)  # 初始化线程,传递参数
    t.start()  # 开启线程,
    tread_list.append(t)

for t in tread_list:
    t.join()  # 主线程会等待子线程结束了之后才会结束,我们经常会用到这个功能,比如100个线程,我们需要在主线程汇总100个线程的结果就需要用到这个方法

体验死锁的现象:

  • 认知:
  • 1,这个死锁是因为你自己锁使用不当导致的死锁,就是你锁住了,但是你没有及时释放,
  • 而别人又要用这个锁,就用不了了
    2,所以你加锁和解锁,一定要成对出现,
from threading import Thread, Lock
import time

boy_lock = Lock()
girl_lock = Lock()


class Boy(Thread):

    def run(self):  # 这个run是重写
        if boy_lock.acquire():  # 锁住线程,
            print("boy say ...")
            time.sleep(3)
            # boy_lock.release()  # 释放锁,
            if girl_lock.acquire():
                print("girl say sorry")
                girl_lock.release()

            boy_lock.release()  # 释放锁,


class Girl(Thread):

    def run(self):  # 这个run是重写
        if girl_lock.acquire():  # 锁住线程,
            print("boy say ...")
            time.sleep(3)
            if boy_lock.acquire():
                print("boy say sorry")
                boy_lock.release()

            girl_lock.release()


boy = Boy()
boy.start()
girl = Girl()
girl.start()

使用递归锁RLock解决单线程死锁现象

  • 单线程解决死锁,是因为使用不当导致的,可以使用Rlock避免死锁,
  • 递归锁,来解决死锁的问题,你可以看做是钥匙串上面的两把钥匙,
  • 一旦你拿到了一把钥匙,就证明你拿到了整个钥匙串了,
from threading import Thread, Lock, RLock
import time

# boy_lock = Lock()
boy_lock = RLock()


num = 0
class Boy(Thread):

    def run(self):  # 这个run是重写
        global num
        if boy_lock.acquire():  # 锁住线程,
            num += 1
            print(num)
            if boy_lock.acquire():
                num += 100
                print(num)
                boy_lock.release()

            boy_lock.release()  # 释放锁,


boy = Boy()
boy.start()

其他线程的使用

Thread类的其他方法:查看当前线程

import threading
from threading import Thread
import time


def hi(msg, sec):
    print("enter hi() . {} @{}".format(msg, time.strftime("%H:%M:%S")))
    time.sleep(sec)
    print(msg, threading.current_thread())  # 这个是子线程1 <Thread(Thread-1, started 123145483644928)>
    print("{} @{}".format(msg, time.strftime("%H:%M:%S")))
    return sec


print("begin @{}".format(time.strftime("%H:%M:%S")))

for i in range(1, 5):
    t = Thread(target=hi, args=(i, i))  # 注册
    t.start()  # 这是启动了一个线程

print("查看活跃的线程数", threading.active_count())  # 查看活跃的线程数,这个就是5,因为是一个主线程,4个子线程
print(threading.current_thread())  # 这个是主线程<_MainThread(MainThread, started 4680646144)>

print("end at {}".format(time.strftime("%H:%M:%S")))

体验信号量限制线程数

from threading import Thread
import threading
import time


sem = threading.Semaphore(2)


def hi(sec):
    with sem:
        for i in range(10):
            print(threading.current_thread().name, sec)
            time.sleep(1)


for i in range(1, 5):
    Thread(target=hi, args=(i,)).start()

线程池

1,可以使用第三方的模块threadpool,这个需要安装,pip install threadpool
2,可以使用内置模块的线程池,from concurrent.futures import ThreadPoolExecutor

"""
池 —— concurrent.futures
Python标准模块--concurrent.futures

concurrent.futures模块提供了高度封装的异步调用接口,其中:
ThreadPoolExecutor:线程池
ProcessPoolExecutor: 进程池

借助上面两个类,我们可以很方便地创建进程池对象和线程池对象。
p_pool = ProcessPoolExecutor(max_workers=5)  # 创建一个最多5个woker的进程池
t_pool = ThreadPoolExecutor(max_workers=5)  # 创建一个最多5个woker的线程池

可用方法介绍:
# 基本方法
# submit(fn, *args, **kwargs)  提交任务
# map(func, *iterables, timeout=None, chunksize=1)  取代for循环submit的操作

# shutdown(wait=True)  相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

# result(timeout=None)  取得结果
# add_done_callback(fn)  回调函数

"""

线程池的使用:


from concurrent.futures import ThreadPoolExecutor
import time
def func(n):
    time.sleep(2)
    print(n)
    return n*n


tpool = ThreadPoolExecutor(max_workers=5)
# 进程池,启动cpu核数+1.
# 而线程池的启动是cpu核数 * 5 不要超过这个,

t_list = []
for i in range(20):
    t = tpool.submit(func,i)  # 提交一个任务,传递一个参数,
    t_list.append(t)

tpool.shutdown()
# shutdown做了两个事情:
# 1,colse 关闭这个池子,不让有任务进来,
# 2,join是阻塞,直到这个池子的任务执行完,
# 所以是一个shutdown做了两个事情,
print("主进程")
for t in t_list:print(t.result())

全局解释器锁GIL

  • 最后说一说这个全局解释锁
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。
Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。
像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。
所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

简单来说,在Cpython解释器中,因为有GIL锁的存在同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。

同一个数据,多个线程去操作,也会出现问题,所以也有线程锁的概念,
这种机制叫做全局解释器锁,英文GIL,
这种机制的结果就是:同一时间只有一个线程去访问cpu,
你想要访问数据,就必须拿到这个钥匙,
这个锁,锁的是线程,不是锁的某一个数据,
这样不好,因为同一时间cpu上面只有一个线程,这样不能充分的利用cpu
这不是Python语言的问题,这是cPython解释器的问题,如果你有一个jPython解释器就行

这的确是一个弊病,导致Python的多线程形同虚设,
那么为什么不解决呢?
java和c++都是编译性语言,Python是一个解释性语言,php也是,
目前解释性语言就是存在这个问题,这个矛盾还不可调和,

是否把数据加锁就可以了,也是不行的,数据太多了,这么大范围的加锁,最终导致的效率,还不如全局解释器锁的性能好,

常见问题1
我们有了GIL锁为什么还要自己在代码中加锁呢?
还是因为时间片轮转,你取到了数据,但是还没有来得及减1,这个值就被其他的线程拿走了,还是10,

常见问题2
有了GIL的存在,同一时刻同一进程中只有一个线程被执行,进程可以利用多核,但是开销大,
而Python的多线程开销小,但却无法利用多核优势,也就是说Python这语言难堪大用。
解答:
其实编程所解决的现实问题大致分为IO密集型和计算密集型。
对于IO密集型的场景,Python的多线程编程完全OK,而对于计算密集型的场景,
Python中有很多成熟的模块或框架如Pandas等能够提高计算效率。


在cPython解释器下的Python程序,在同一时间,多个线程只能有一个线程在cpu执行,这不意味着多线程就没有意义了,
解答:
因为只有涉及到计算的地方才会使用到CPU,
高CPU:所以在计算类的高cpu利用率的,Python不占优势,
高IO:我们写的代码很多都涉及到这种,
比如qq聊天,处理日志文件,爬取网页,处理web请求,读写数据库,都是高io的,都是有Python用武之地的,
所以Python不能处理计算类高的问题,这不影响他在web编程的作用,
如果你真的需要高并发呢,你可以使用多进程,就不会有GIL锁了,


看看python官方文档,对于python多线程的解释:
由于存在全局解释器锁,同一时刻只有一个线程可以执行 Python 代码(虽然某些性能导向的库可能会去除此限制)。 
如果你想让你的应用更好地利用多核心计算机的计算资源,
推荐你使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。 
但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型。

所以说,这个多线程不是没有用处的,而是要分你干的是什么事情,
注意两个概念,:
1,计算密集型任务,计算密集型就是计算、逻辑判断量非常大而且集中的类型
2,io密集型任务,IO密集型就是磁盘的读取数据和输出数据非常大的时候就是属于IO密集型
而io又分为,磁盘io和网络io,
这个多线程,还是适合io密集型的任务的,