zl程序教程

您现在的位置是:首页 >  Python

当前栏目

python 使用 asyncio 包处理并发

2023-04-18 16:53:52 时间

文章目录

learn from 《流畅的python》

1. 线程与协程对比

threading

import threading
import itertools
import time
import sys


class Signal:
    go = True


def spin(msg, signal):
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle("|/-\"):  # 无限循环
        status = char + ' ' + msg
        write(status)
        flush()
        write("x08" * len(status))  # x08 退格键,光标移动回去
        time.sleep(0.1)
        if not signal.go:
            break
    write(' ' * len(status) + "x08" * len(status))
    # 使用空格清除状态消息,把光标移回开头


def slow_function():  # 假设是一个耗时的计算过程
    time.sleep(10)  # sleep 会阻塞主线程,释放GIL,创建从属线程
    return 42


def supervisor():  # 该函数,设置从属线程,显示线程对象,运行耗时的计算,最后杀死线程
    signal = Signal()
    spinner = threading.Thread(target=spin, args=("thinking!", signal))
    print("spinner object:", spinner)  # 显示从属线程对象
    spinner.start()  # 启动从属线程
    result = slow_function()  # 运行计算程序,阻塞主线程,从属线程动画显示旋转指针
    signal.go = False  # 改变signal 状态,终止 spin 中的for循环
    spinner.join()  # 等待spinner线程结束
    return result


def main():
    result = supervisor()  # 运行 supervisor
    print("Answer:", result)


if __name__ == '__main__':
    main()

适合 asyncio 的协程要由调用方驱动,并由调用方通过 yield from 调用(语法过时了,新版的用 async / await ) 或者把协程传给 asyncio 包中的某个函数

一篇博文参考:https://www.cnblogs.com/dhcn/p/9032461.html

import asyncio
import itertools
import sys


# https://docs.python.org/3.8/library/asyncio.html
async def spin(msg): # py3.5以后的新语法 async / await,协程函数
    write, flush = sys.stdout.write, sys.stdout.flush
    for char in itertools.cycle("|/-\"):  # 无限循环
        status = char + ' ' + msg
        write(status)
        flush()
        write("x08" * len(status))  # x08 退格键,光标移动回去
        try:
            await asyncio.sleep(0.1)
        except asyncio.CancelledError: # 遇到取消异常,退出循环
            print("cancel")
            break
    write(' ' * len(status) + "x08" * len(status))
    print("end spin")


async def slow_function(): # 协程函数
    print("start IO")
    await asyncio.sleep(3) # 假装进行 IO 操作
    print("end IO  ")
    return 42


async def supervisor():  # 协程函数
    spinner = asyncio.ensure_future(spin("thinking!")) # spinner 排定任务
    print("spinner object:", spinner)  # 显示从属线程对象
    # spinner object: <Task pending coro=<spin() running at D: >
    print("start slow")
    result = await slow_function()
    print("end slow")
    spinner.cancel() # task对象可以取消,抛出CancelledError异常
    return result


def main():
    loop = asyncio.get_event_loop() # 获取事件循环的引用
    result = loop.run_until_complete(supervisor())  # 驱动 supervisor 协程,让它运行完毕
    loop.close()
    print("answer:", result)


if __name__ == '__main__':
    main()

输出:

spinner object: <Task pending coro=<spin() running at D:gitcode >
start slow
start IO
end IO  ng!(期间thinking!在输出,后来被覆盖)
end slow
cancel
end spin
answer: 42

请按任意键继续. . .

2. 使用 asyncio 和 aiohttp 下载

import time
import sys
import os
import asyncio
import aiohttp

POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'


def save_flag(img, filename):  # 保存图像
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def show(text):  # 打印信息
    print(text, end=' ')
    sys.stdout.flush()


async def get_flag(cc):  # 获取图像
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    async with aiohttp.request("GET", url) as resp:
        image = await resp.read()
    return image


async def download_one(cc):
    image = await get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + '.gif')
    return cc


def download_many_(cc_list):
    loop = asyncio.get_event_loop()
    todo = [download_one(cc) for cc in sorted(cc_list)] # 协程对象
    wait_coro = asyncio.wait(todo) # 包装成 task,wait是协程函数,返回协程或者生成器对象
    res, _ = loop.run_until_complete(wait_coro)
    # 驱动协程,返回 第一个元素是一系列结束的期物,第二个元素是一系列未结束的期物
    # loop.close(),好像不需要这句 上面 with 处可能自动关闭了
    return len(res)


def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = '
{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))  # 计时信息


if __name__ == '__main__':
    main(download_many_)

# US RU ID ET BR FR CN PH BD NG DE JP EG TR MX IN PK IR CD VN 
# 20 flags downloaded in 3.88s

3. 避免阻塞型调用

执行硬盘网络 I/O 操作的函数定义为 阻塞型函数

有两种方法能 避免阻塞型调用 中止整个应用程序 的进程:

  • 单独的线程中运行各个阻塞型操作
  • 把每个阻塞型操作 转换成非阻塞的异步调用 使用

4. 使用 asyncio.as_completed

import collections
import time
import sys
import os
import asyncio
from http import HTTPStatus

import aiohttp
from aiohttp import web
import tqdm

POP20_CC = ('CN IN US ID BR PK NG BD RU JP ' 'MX PH VN ET EG DE IR TR CD FR').split()
BASE_URL = 'http://flupy.org/data/flags'
DEST_DIR = './'
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000


class FetchError(Exception):
    def __init__(self, country_code):
        self.country_code = country_code


def save_flag(img, filename):  # 保存图像
    path = os.path.join(DEST_DIR, filename)
    with open(path, 'wb') as fp:
        fp.write(img)


def show(text):  # 打印信息
    print(text, end=' ')
    sys.stdout.flush()


async def get_flag(cc):  # 获取图像
    url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
    async with aiohttp.request("GET", url) as resp:
        if resp.status == 200:
            image = await resp.read()
            return image
        elif resp.status == 404:
            raise web.HTTPNotFound()
        else:
            raise aiohttp.WebSocketError(code=resp.status, message=resp.reason)


async def download_one(cc, semaphore, verbose):
    try:
        async with semaphore:
            image = await get_flag(cc)
    except web.HTTPNotFound:
        status = HTTPStatus.NOT_FOUND
        msg = "not found"
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        save_flag(image, cc.lower() + '.gif')
        status = HTTPStatus.OK
        msg = "OK"
    if verbose and msg:
        print(cc, msg)
    return (status, cc)


async def downloader_coro(cc_list, verbose, concur_req):  # 协程函数
    counter = collections.Counter()
    semaphore = asyncio.Semaphore(value=concur_req)  # 最多可以使用这个计数器的协程个数
    todo = [download_one(cc, semaphore, verbose=True) for cc in sorted(cc_list)]  # 协程对象列表
    todo_iter = asyncio.as_completed(todo)  # 获取迭代器,会在期物运行结束后返回期物
    if not verbose:
        todo_iter = tqdm.tqdm(todo_iter, total=len(cc_list))  # 迭代器传给tqdm,显示进度条
    for future in todo_iter:  # 迭代器运行结束的期物
        try:
            res = await future  # 获取期物对象的结果
        except FetchError as exc:
            country_code = exc.country_code
            try:
                error_msg = exc.__cause__.args[0]
            except IndexError:
                error_msg = exc.__cause__.__class__.__name__
            if verbose and error_msg:
                msg = '*** Error for {}: {}'
                print(msg.format(country_code, error_msg))
            status = HTTPStatus.error
        else:
            status = res[0]
        counter[status] += 1  # 记录结果
    return counter  # 返回计数器


def download_many_(cc_list, verbose, concur_req):
    loop = asyncio.get_event_loop()
    coro = downloader_coro(cc_list, verbose=verbose, concur_req=concur_req)
    # 实例化 downloader_coro协程,然后通过 run_until_complete 方法把它传给事件循环
    counts = loop.run_until_complete(coro)
    # loop.close() # 好像不需要这句 上面 with 处可能自动关闭了
    return counts


def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC, True, MAX_CONCUR_REQ)
    elapsed = time.time() - t0
    msg = '
{} flags downloaded in {:.2f}s'
    print(msg.format(count, elapsed))  # 计时信息


if __name__ == '__main__':
    main(download_many_)

5. 使用Executor对象,防止阻塞事件循环

  • loop.run_in_executor 方法把阻塞的作业(例如保存文件)委托给线程池做
async def download_one(cc, semaphore, verbose):
    try:
        async with semaphore:
            image = await get_flag(cc)
    except web.HTTPNotFound:
        status = HTTPStatus.NOT_FOUND
        msg = "not found"
    except Exception as exc:
        raise FetchError(cc) from exc
    else:
        # 因此保存文件时,整个应用程序都会冻结,为了避免,使用下面方法
        loop = asyncio.get_event_loop()  # 获取事件循环对象的引用
        loop.run_in_executor(None,  # 方法的第一个参数是 Executor 实例;
                             # 如果设为 None,使用事件循环的默认 ThreadPoolExecutor 实例
                             save_flag, image, cc.lower() + ".gif")
                            #  余下的参数是可调用的对象,以及可调用对象的位置参数
        status = HTTPStatus.OK
        msg = "OK"
    if verbose and msg:
        print(cc, msg)
    return (status, cc)

6. 从回调到期物和协程

  • 如果一个操作需要依赖之前操作的结果,那就得嵌套回调
def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)


def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)


def stage3(response3):
    tep3(response3)


api_call1(request1, stage1)

好的写法:

async def three_stages(request1): 
    response1 = await api_call1(request1) 
    # 第一步 
    request2 = step1(response1) 
    response2 = await  api_call2(request2) 
    # 第二步 
    request3 = step2(response2) 
    response3 = await  api_call3(request3)
    # 第三步 
    step3(response3) 
loop.create_task(three_stages(request1)) # 必须显式调度执行

协程 必须使用 事件循环 显式排定 协程的执行时间

异步系统避免用户级线程的开销,这是它能比多线程系统管理更多并发连接的主要原因