8-2-1python语法基础-并发编程-线程-创建线程,线程冲突(锁),线程通信(condition,队列),线程池,定时器
2023-09-14 08:59:04 时间
线程
为什么要使用多线程
- 多线程的目的为了并发,并发是为了性能,效率,这一点要提高到最核心最基础的认识里面,
- 举例:比如一个6层楼,要找一个人,如果1个人搜索就要6层依次搜寻,如果6个人就是6倍的效率提高
如何使用python实现多线程?
- 很多语言都有多线程,我们现在是关注的python如何来实现多线程
- 我要有一个自己的例子,就是多线程验证ip可用性的例子
- 先上代码:
import redis
import queue
import requests
import threading
import logging
logging.basicConfig(level=logging.INFO,
format=
# 日志的时间
'%(asctime)s'
# 日志级别名称 : 当前行号
' %(levelname)s [%(filename)s : %(lineno)d ]'
# 日志信息
' : %(message)s'
# 指定时间格式
, datefmt='[%Y/%m/%d %H:%M:%S]')
logging = logging.getLogger(__name__)
# 第一步,把数据取出来,在redis
conn = redis.Redis(host="127.0.0.1", port="6379")
# proxy_list = conn.hgetall("use_proxy")
# proxy_list = conn.hvals("use_proxy")
proxy_list = conn.hkeys("use_proxy")
# logging.info(proxy_list)
# 第二步,把数据存入队列
proxy_queue = queue.Queue()
for proxy in proxy_list:
proxy_queue.put(str(proxy, encoding="utf-8"))
queue_size = proxy_queue.qsize()
# logging.info(queue_size)
# logging.info(proxy_queue.get())
# 第三步,多线程验证
# 1,先写一个类用来验证ip
class TreadCheck(threading.Thread):
def __init__(self, target_queue, thread_name):
threading.Thread.__init__(self, name=thread_name)
self.target_queue = target_queue
self.thread_name = thread_name
def run(self):
logging.info("{} 开始".format(self.name))
while True:
try:
proxy = self.target_queue.get(block=False)
except queue.Empty:
logging.info("{} 结束".format(self.name))
break
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0',
'Accept': '*/*',
'Connection': 'keep-alive',
'Accept-Language': 'zh-CN,zh;q=0.8'}
proxies = {
"http": "http://" + str(proxy),
"https": "https://" + str(proxy)
}
# 代理验证目标网站
url_http = "http://httpbin.org"
url_https = "https://www.qq.com"
http_code = False
https_code = False
try:
res_http = requests.head(url=url_http, headers=headers, proxies=proxies, timeout=10)
if res_http.status_code == 200:
http_code = True
except Exception as e:
http_code = False
try:
res_https = requests.head(url=url_https, headers=headers, proxies=proxies, timeout=10)
logging.info(res_https.status_code)
if res_https.status_code == 200:
https_code = True
except Exception as e:
https_code = False
logging.info(
"{} http_status:{} https_status:{} 代理:{} ".format(self.name, str(http_code).ljust(6),
str(https_code).ljust(6), proxy.ljust(25)))
# 2,使用多线程
thread_list = []
for index in range(20):
thread_list.append(TreadCheck(proxy_queue, "thread_" + str(index).zfill(2)))
for thread in thread_list:
thread.setDaemon(True)
thread.start()
for thread in thread_list:
thread.join()
示例代码解析
- 通过上面的示例代理,来学习一下python的多线程
创建线程
- 认知:
- 创建线程有两种方式,一种基于函数创建,一种基于类创建
- 简单的多线程可以使用函数来实现,
- 复杂的多线程,需要类的方式来实现,
第一种方式:基于构造函数创建
from threading import Thread
import time
def hi(msg, sec):
print("enter hi() . {} @{}".format(msg, time.strftime("%H:%M:%S")))
time.sleep(sec)
print("{} @{}".format(msg, time.strftime("%H:%M:%S")))
return sec
print("begin @{}".format(time.strftime("%H:%M:%S")))
for i in range(1, 5):
t = Thread(target=hi, args=(i, i)) # 注册
t.start() # 这是启动了一个线程
# hi(i, i)
print("end at {}".format(time.strftime("%H:%M:%S")))
# 使用单线程的这个结果应该是,1+2+3+4=10秒
# 使用多线程的这个结果应该是,4秒
创建线程第二种方式:基于类创建
- 注意两点:
- 1,要继承Thread
- 2,要重写run方法
from threading import Thread
import time, os
class MyTread(Thread):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self): # 这个run是重写
print("enter hi() . {} @{}".format(self.arg, time.strftime("%H:%M:%S")))
time.sleep(self.arg)
print("{} @{}".format(self.arg, time.strftime("%H:%M:%S")))
# print(1, os.getpid())
print("begin @{}".format(time.strftime("%H:%M:%S")))
for i in range(1, 5):
t = MyTread(i) # 初始化线程,传递参数
t.start() # 开启线程,
# print("主线程", os.getpid()) # 打印子进程和主进程的进程号,都是一样的
print("end at {}".format(time.strftime("%H:%M:%S")))
通过一个例子再次体验多线程的加速
- 认知;
- 使用多线程加速的例子,龟速和秒速
- 如果不是多线程,那就是一个一个排队,
- 如果是多线程,就是并发,那就是成倍的提高速度,
from threading import Thread
import time
def hi(sec):
time.sleep(1)
print(sec)
print("begin @{}".format(time.strftime("%H:%M:%S")))
for i in range(256):
t = Thread(target=hi, args=(i,)) # 注册
t.start() # 这是启动了一个线程
# hi(i)
print("end at {}".format(time.strftime("%H:%M:%S")))
setDaemon方法
-
当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,
-
当设置多线程时,主线程会创建多个子线程,
-
当主线程结束时根据子线程daemon(设置thread.setDaemon(True))属性值的不同
-
可能会发生下面的两种情况之一:
-
False
-
如果某个子线程的daemon属性为False,
-
主线程结束时会检测该子线程是否结束,如果该子线程还在运行,则主线程会等待它完成后再退出;
-
在python中,默认情况下(其实就是setDaemon(False)),
-
True
-
如果某个子线程的daemon属性为True,
-
设置为true,意思就是把主线程A设置为守护线程,
-
这时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.
-
这就是setDaemon方法的含义,
-
这基本和join是相反的。
-
可能出现的情况就是,子线程的任务还没有完全执行结束,就被迫停止,
-
如果需要修改daemon值,必须在调用start()方法启动线程之前进行设置。
-
示例
from threading import Thread
import time
def func1(name):
while True:
print(11111111)
time.sleep(1)
def func2(name):
print(2222222)
time.sleep(5)
if __name__ == '__main__':
t=Thread(target=func1,args=('andy',))
t.daemon = True # 主线程代码结束,子线程随之结束,
# 不加守护线程,主线程就会等待子线程的结束,然后主线程才会结束,
t.start()
t2=Thread(target=func2,args=('lucy',))
t2.start()
# 主线程会等待子线程结束
print('主线程')
print(t.is_alive())
join方法
- 必须要深刻理解join:不同地方起到不同的作用
- 一定要写对地方,否则起不到并发的效果
- 主线程A中,创建了子线程B,并且在主线程A中调用了B.join(),
- 那么,主线程A会在调用的地方阻塞,直到子线程B完成操作后,才可以接着往下执行。
- 一般是设置守护线程为true,然后设置join,这样保证子线程结束的时候,子线程是结束的
- 而且也能保证主线程是在每一个子线程结束之后才会结束
from threading import Thread
import time
class MyTread(Thread):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self): # 这个run是重写
print("enter hi() . {} @{}".format(self.arg, time.strftime("%H:%M:%S")))
time.sleep(self.arg)
print("{} @{}".format(self.arg, time.strftime("%H:%M:%S")))
print("begin @{}".format(time.strftime("%H:%M:%S")))
tread_list = []
for i in range(1, 5):
t = MyTread(i) # 初始化线程,传递参数
t.start() # 开启线程,
tread_list.append(t)
# t.join # 如果你把t.join 放到这里,就会导致线程是一个一个的顺序执行的,这就和普通的for循环一样了,起不到并发的意义了,所以不能放到这个地方,
for t in tread_list:
t.join() # 主线程会等待子线程结束了之后才会结束,我们经常会用到这个功能,比如100个线程,我们需要在主线程汇总100个线程的结果就需要用到这个方法
print("end at {}".format(time.strftime("%H:%M:%S")))
为什么要使用queue队列?
- 线程通信-----通过队列实现
- 这是因为线程会出现冲突,当然可以加锁解决线程冲突
- 但是加锁会自己写代码,不方便,我们可以使用队列
- 队列内置了很多的锁,可以保证数据安全,
- queue队列 :使用import queue,用法与进程Queue一样
import queue
q = queue.Queue()
q.put()
q.put_nowait() # 这个会报错
q.get()
q.get_nowait() # 这个会报错,
# 队列的特点:先进先出,
queue.LifoQueue() # 栈,先进后出,
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
queue.PriorityQueue() # 优先级队列,
# put进入一个元组
# 元组的第一个元素是优先级,(通常也可以是数字,或者也可以是非数字之间的比较) 数字越小,优先级越高
q.put(20, 1)
q.put(10, 2) # 数字越小,优先级越高,优先级一样,就是按照和ASk码排,
q.put(30, 3)
print(q.get())
print(q.get())
print(q.get())
# 总结:
# 1,普通队列
# 2,栈
# 3,优先级队列,
# 这三种都不会出现多线程抢占资源,
线程总结
- 上面的是最为重要的,是必须要理解的,下面的可以慢慢理解,
为什么会有进程?
主要是能够同时处理多个任务,多个任务还要进行切换,时间片轮转
为什么会有线程?
进程并不是执行任务的最小单元,每一个进程里面有都一个线程,我们叫做主线程,
早期没有线程,一个进程只能干一个任务,如果有多个任务,只能多起进程,进程太多不行的,
进程内部的内存是共享的,所以需要线程,否则还要涉及到进程间的通信,这比较浪费资源
所以线程的出现解决了两个问题:
1,进程内部的通信
2,一个进程可以处理多个任务,
# 进程和线程的对比:
进程和线程的区别?
进程拥有一个完整的虚拟地址空间,不依赖于线程而独立存在;
反之,线程是进程的一部分,没有自己的地址空间,与进程内的其他线程一起共享分配给该进程的所有资源。
1,进程之间的内存是独立的,但是一个进程之内的线程是可以共享的,
2,进程之间切换是慢于线程之间的切换的,
关系问题,资源问题,分工问题,效率问题,数据问题,
第一个结论要记住:进程先有的,然后才有线程,线程是进程的一部分,线程是依赖于进程的,没有进程就没有线程,
第二个结论要记住:一个进程里面一定有一个主线程,进程是一个资源分配的单位,真正执行代码的时候还是线程,
第三个结论要记住:进程需要开辟新的资源空间,所以多线程需要很大的资源才可以实现多任务,但是线程只需要很少的资源就可以实现多任务,
第四个结论要记住:进程之间是数据独立的,所以涉及到进程间的通信问题,但是一个进程的线程之间是资源共享的,
比如:
一个网易云音乐,开启了之后就是一个进程,
使用这个网易云音乐既要下载音乐,又要播放音乐,这就是两个线程,
线程冲突和解决
- 上面只是提到了使用队列解决线程冲突,
- 还是要深度理解一下这个线程冲突,和其他的解决方案
体验线程冲突的例子
- 为什么会出现这种冲突的情况,就是因为多个线程同时操作一个资源的时候就会出现这个问题,
- 线程冲突还可以用来生成随机数的,
- 但是有GIL锁啊,为什么还会产生这种情况?
- 还是因为时间片轮转,你取到了数据,还没有来得及操作,这个值就被其他的线程拿走了,
from threading import Thread
num = 0
class MyTread(Thread):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self): # 这个run是重写
global num
for i in range(1000000):
num += 1
print(num)
tread_list = []
for i in range(1, 5):
t = MyTread(i) # 初始化线程,传递参数
t.start() # 开启线程,
tread_list.append(t)
for t in tread_list:
t.join() # 主线程会等待子线程结束了之后才会结束,我们经常会用到这个功能,比如100个线程,我们需要在主线程汇总100个线程的结果就需要用到这个方法
互斥锁解决线程冲突
- 认识:
- 加锁降低了性能,但是保证了数据的安全性,
- 互斥锁和join的区别:在线程启动阶段就加入join实现串行也能避免线程冲突,
- 但是明显这种效率太低,很明显是加锁的效率更高.
- 这种很常见,比如同时操作一个文件,
from threading import Thread,Lock
num = 0
Lock = Lock()
class MyTread(Thread):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self): # 这个run是重写
global num
if Lock.acquire(): # 锁住线程,
for i in range(1000000):
num += 1
Lock.release() # 释放锁,
print(num)
tread_list = []
for i in range(1, 5):
t = MyTread(i) # 初始化线程,传递参数
t.start() # 开启线程,
tread_list.append(t)
for t in tread_list:
t.join() # 主线程会等待子线程结束了之后才会结束,我们经常会用到这个功能,比如100个线程,我们需要在主线程汇总100个线程的结果就需要用到这个方法
体验死锁的现象:
- 认知:
- 1,这个死锁是因为你自己锁使用不当导致的死锁,就是你锁住了,但是你没有及时释放,
- 而别人又要用这个锁,就用不了了
2,所以你加锁和解锁,一定要成对出现,
from threading import Thread, Lock
import time
boy_lock = Lock()
girl_lock = Lock()
class Boy(Thread):
def run(self): # 这个run是重写
if boy_lock.acquire(): # 锁住线程,
print("boy say ...")
time.sleep(3)
# boy_lock.release() # 释放锁,
if girl_lock.acquire():
print("girl say sorry")
girl_lock.release()
boy_lock.release() # 释放锁,
class Girl(Thread):
def run(self): # 这个run是重写
if girl_lock.acquire(): # 锁住线程,
print("boy say ...")
time.sleep(3)
if boy_lock.acquire():
print("boy say sorry")
boy_lock.release()
girl_lock.release()
boy = Boy()
boy.start()
girl = Girl()
girl.start()
使用递归锁RLock解决单线程死锁现象
- 单线程解决死锁,是因为使用不当导致的,可以使用Rlock避免死锁,
- 递归锁,来解决死锁的问题,你可以看做是钥匙串上面的两把钥匙,
- 一旦你拿到了一把钥匙,就证明你拿到了整个钥匙串了,
from threading import Thread, Lock, RLock
import time
# boy_lock = Lock()
boy_lock = RLock()
num = 0
class Boy(Thread):
def run(self): # 这个run是重写
global num
if boy_lock.acquire(): # 锁住线程,
num += 1
print(num)
if boy_lock.acquire():
num += 100
print(num)
boy_lock.release()
boy_lock.release() # 释放锁,
boy = Boy()
boy.start()
其他线程的使用
Thread类的其他方法:查看当前线程
import threading
from threading import Thread
import time
def hi(msg, sec):
print("enter hi() . {} @{}".format(msg, time.strftime("%H:%M:%S")))
time.sleep(sec)
print(msg, threading.current_thread()) # 这个是子线程1 <Thread(Thread-1, started 123145483644928)>
print("{} @{}".format(msg, time.strftime("%H:%M:%S")))
return sec
print("begin @{}".format(time.strftime("%H:%M:%S")))
for i in range(1, 5):
t = Thread(target=hi, args=(i, i)) # 注册
t.start() # 这是启动了一个线程
print("查看活跃的线程数", threading.active_count()) # 查看活跃的线程数,这个就是5,因为是一个主线程,4个子线程
print(threading.current_thread()) # 这个是主线程<_MainThread(MainThread, started 4680646144)>
print("end at {}".format(time.strftime("%H:%M:%S")))
体验信号量限制线程数
from threading import Thread
import threading
import time
sem = threading.Semaphore(2)
def hi(sec):
with sem:
for i in range(10):
print(threading.current_thread().name, sec)
time.sleep(1)
for i in range(1, 5):
Thread(target=hi, args=(i,)).start()
线程池
1,可以使用第三方的模块threadpool,这个需要安装,pip install threadpool
2,可以使用内置模块的线程池,from concurrent.futures import ThreadPoolExecutor
"""
池 —— concurrent.futures
Python标准模块--concurrent.futures
concurrent.futures模块提供了高度封装的异步调用接口,其中:
ThreadPoolExecutor:线程池
ProcessPoolExecutor: 进程池
借助上面两个类,我们可以很方便地创建进程池对象和线程池对象。
p_pool = ProcessPoolExecutor(max_workers=5) # 创建一个最多5个woker的进程池
t_pool = ThreadPoolExecutor(max_workers=5) # 创建一个最多5个woker的线程池
可用方法介绍:
# 基本方法
# submit(fn, *args, **kwargs) 提交任务
# map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作
# shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
# result(timeout=None) 取得结果
# add_done_callback(fn) 回调函数
"""
线程池的使用:
from concurrent.futures import ThreadPoolExecutor
import time
def func(n):
time.sleep(2)
print(n)
return n*n
tpool = ThreadPoolExecutor(max_workers=5)
# 进程池,启动cpu核数+1.
# 而线程池的启动是cpu核数 * 5 不要超过这个,
t_list = []
for i in range(20):
t = tpool.submit(func,i) # 提交一个任务,传递一个参数,
t_list.append(t)
tpool.shutdown()
# shutdown做了两个事情:
# 1,colse 关闭这个池子,不让有任务进来,
# 2,join是阻塞,直到这个池子的任务执行完,
# 所以是一个shutdown做了两个事情,
print("主进程")
for t in t_list:print(t.result())
全局解释器锁GIL
- 最后说一说这个全局解释锁
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。
Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。
像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。
所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。
所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。
简单来说,在Cpython解释器中,因为有GIL锁的存在同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势。
同一个数据,多个线程去操作,也会出现问题,所以也有线程锁的概念,
这种机制叫做全局解释器锁,英文GIL,
这种机制的结果就是:同一时间只有一个线程去访问cpu,
你想要访问数据,就必须拿到这个钥匙,
这个锁,锁的是线程,不是锁的某一个数据,
这样不好,因为同一时间cpu上面只有一个线程,这样不能充分的利用cpu
这不是Python语言的问题,这是cPython解释器的问题,如果你有一个jPython解释器就行
这的确是一个弊病,导致Python的多线程形同虚设,
那么为什么不解决呢?
java和c++都是编译性语言,Python是一个解释性语言,php也是,
目前解释性语言就是存在这个问题,这个矛盾还不可调和,
是否把数据加锁就可以了,也是不行的,数据太多了,这么大范围的加锁,最终导致的效率,还不如全局解释器锁的性能好,
常见问题1
我们有了GIL锁为什么还要自己在代码中加锁呢?
还是因为时间片轮转,你取到了数据,但是还没有来得及减1,这个值就被其他的线程拿走了,还是10,
常见问题2
有了GIL的存在,同一时刻同一进程中只有一个线程被执行,进程可以利用多核,但是开销大,
而Python的多线程开销小,但却无法利用多核优势,也就是说Python这语言难堪大用。
解答:
其实编程所解决的现实问题大致分为IO密集型和计算密集型。
对于IO密集型的场景,Python的多线程编程完全OK,而对于计算密集型的场景,
Python中有很多成熟的模块或框架如Pandas等能够提高计算效率。
在cPython解释器下的Python程序,在同一时间,多个线程只能有一个线程在cpu执行,这不意味着多线程就没有意义了,
解答:
因为只有涉及到计算的地方才会使用到CPU,
高CPU:所以在计算类的高cpu利用率的,Python不占优势,
高IO:我们写的代码很多都涉及到这种,
比如qq聊天,处理日志文件,爬取网页,处理web请求,读写数据库,都是高io的,都是有Python用武之地的,
所以Python不能处理计算类高的问题,这不影响他在web编程的作用,
如果你真的需要高并发呢,你可以使用多进程,就不会有GIL锁了,
看看python官方文档,对于python多线程的解释:
由于存在全局解释器锁,同一时刻只有一个线程可以执行 Python 代码(虽然某些性能导向的库可能会去除此限制)。
如果你想让你的应用更好地利用多核心计算机的计算资源,
推荐你使用 multiprocessing 或 concurrent.futures.ProcessPoolExecutor。
但是,如果你想要同时运行多个 I/O 密集型任务,则多线程仍然是一个合适的模型。
所以说,这个多线程不是没有用处的,而是要分你干的是什么事情,
注意两个概念,:
1,计算密集型任务,计算密集型就是计算、逻辑判断量非常大而且集中的类型
2,io密集型任务,IO密集型就是磁盘的读取数据和输出数据非常大的时候就是属于IO密集型
而io又分为,磁盘io和网络io,
这个多线程,还是适合io密集型的任务的,
相关文章
- 【小5聊】C#基础之Response.ContentType响应内容类型[通俗易懂]
- 零基础学Java(11)自定义类[通俗易懂]
- Google黑客基础语法学习与使用
- 大数据必学Java基础(六十二):ConcurrentMap并发容器对比
- 阿里内部“千万级”高并发进阶笔记,基础+实战+解决方案都有了
- MySQL基础之事务【事务操作,四大特性,并发事务问题,隔离级别】
- Go语言实战之映射的内部实现和基础功能
- 用CNN做基础模型,可变形卷积InternImage实现检测分割新纪录!
- Java并发编程基础
- Java-基础项目HelloServlet
- 【学习笔记】黑马程序员Node.js全套入门教程 | 基础篇
- Java基础语法1
- Linux基础:ubuntu如何开启22端口的实现
- PostgreSQL 数据库基础 之 Sequence序列的使用介绍
- spring的自动装配基础详解编程语言
- JAVA多线程和并发基础面试问答(翻译)