zl程序教程

您现在的位置是:首页 >  Python

当前栏目

事件驱动如何理解?什么场景下适合用?Python如何实现一个事件监听器?

2023-02-18 16:23:09 时间

“鸽王”阿巩

这两周属实忙到分身乏术,不过咕咕这么久也属实不应该。听取了某位不愿透露姓名的大佬级资深读者反馈,内容应该更具有实用性,将问题场景带入工作中再去分析怎么实现、目的是什么、这么做有什么优缺点,而不仅仅是干干巴巴的理论框架。感谢大佬的点拨。为了保质保量,为了读者大佬们,也为了我乌黑亮丽的秀发,公众号将改为周更的形式,输出一周的学习和思考,尽我所能产出高质量的文章!日拱一卒,我们开始吧!

产品小姐姐给了阿巩这样的业务需求:游戏服务器希望在玩家升级或者达成某项成就时触发多种效果。比如玩家升级后,全属性将会提高,解锁新的系统,可以学习其他技能且开启新的战斗模式……可以看出,玩家升级后,所有挂钩的业务都要集中在一起,依次被处理。首先我们使用if 玩家升级再逐条添加业务功能代码是完全可以实现的,但是这样写出来的代码耦合度极高,往往是“牵一发而动全身”,一旦后续新业务加入,还要继续插入代码。

为达到解耦效果,引入了事件驱动模型:我们把“玩家升级”包装成一个事件,任何对这个事件感兴趣的“观察者”只需要监听并执行对应逻辑。事件驱动模型也就是我们常说的发布-订阅模型。事件驱动主要包含这三类元素:事件源、事件监听器、事件对象;对应的操作函数中需要包括:监听动作、发送事件、调用监听器响应函数。下面通过一个发布订阅的事件监听器体会下事件驱动:

# -*- coding: utf-8 -*-
from queue import Queue, Empty
from threading import *


class EventManager:
    def __init__(self):
        """初始化事件管理器"""
        # 事件对象列表
        self.__eventQueue = Queue()
        # 事件管理器开关
        self.__active = False
        # 事件处理线程
        self.__thread = Thread(target=self.__run)
        self.count = 0
        # 保存对应事件的响应函数
        self.__handlers = {}

    def __run(self):
        print('{}_Run'.format(self.count))
        while self.__active == True:
            try:
                event = self.__eventQueue.get(block=True, timeout=1)
                self.__eventProcess(event)
            except Empty:
                pass
            self.count += 1

    def __eventProcess(self, event):
        print('{}_EventProcess'.format(self.count))
        if event.type_ in self.__handlers:
            for handler in self.__handlers[event.type_]:
                handler(event)
        self.count += 1

    def start(self):
        print('{}_Start'.format(self.count))
        self.__active = True
        self.__thread.start()
        self.count += 1

    def stop(self):
        print('{}_Stop'.format(self.count))
        self.__active = False
        self.__thread.join()
        self.count += 1

    def addEventListener(self, type_, handler):
        print('{}_addEventListener'.format(self.count))
        try:
            handlerList = self.__handlers[type_]
        except KeyError:
            handlerList = []
            self.__handlers[type_] = handlerList
        if handler not in handlerList:
            handlerList.append(handler)
        print(self.__handlers)
        self.count += 1

    def removeEventListener(self, type_, handler):
        print('{}_removeEventListener'.format(self.count))
        try:
            handlerList = self.handlers[type_]
            if handler in handlerList:
                handlerList.remove(handler)
            if not handlerList:
                del self.__handlers[type_]
        except KeyError:
            pass
        self.count += 1

    def sendEvent(self, event):
        print('{}_SendEvent'.format(self.count))
        self.__eventQueue.put(event)
        self.count += 1


class Event:
    def __init__(self, type_=None):
        self.type_ = type_
        self.dict = {}

以公众号的发布订阅为例,建立测试类

# -*- coding: utf-8 -*-
from eventManager import *

EVENT_ARTICAL = "Event_Artical"


class PublicAccounts:
    def __init__(self, eventManager):
        self.__eventManager = eventManager

    def WriteNewArtical(self):
        event = Event(type_=EVENT_ARTICAL)
        event.dict["artical"] = u'Python实现一个事件监听器\n'

        self.__eventManager.sendEvent(event)
        print(u'"才浅的每日Python"公众号发送新文章\n')


class Listener:
    def __init__(self, username):
        self.__username = username

    def ReadArtical(self, event):
        print(u'%s 收到文章' % self.__username)
        print(u'正在阅读新文章内容:%s' % event.dict['artical'])


def test():
    listener1 = Listener("Antonia")
    listener2 = Listener("Steve")
    listener3 = Listener("JOJO")
    eventManager = EventManager()
    eventManager.addEventListener(EVENT_ARTICAL, listener1.ReadArtical)
    eventManager.addEventListener(EVENT_ARTICAL, listener2.ReadArtical)
    eventManager.addEventListener(EVENT_ARTICAL, listener3.ReadArtical)
    eventManager.start()

    publicAcc = PublicAccounts(eventManager)
    timer = Timer(2, publicAcc.WriteNewArtical)
    timer.start()


if __name__ == '__main__':
    test()

代码运行结果如下

0_addEventListener
{'Event_Artical': [<bound method Listener.ReadArtical of <__main__.Listener instance at 0x1075f5758>>]}
1_addEventListener
{'Event_Artical': [<bound method Listener.ReadArtical of <__main__.Listener instance at 0x1075f5758>>, <bound method Listener.ReadArtical of <__main__.Listener instance at 0x1075f5830>>]}
2_addEventListener
{'Event_Artical': [<bound method Listener.ReadArtical of <__main__.Listener instance at 0x1075f5758>>, <bound method Listener.ReadArtical of <__main__.Listener instance at 0x1075f5830>>, <bound method Listener.ReadArtical of <__main__.Listener instance at 0x1075f5878>>]}
3_Start
4_Run
5_SendEvent
"才浅的每日Python"公众号发送新文章

6_EventProcess
Antonia 收到文章
正在阅读新文章内容:Python实现一个事件监听器

Steve 收到文章
正在阅读新文章内容:Python实现一个事件监听器

JOJO 收到文章
正在阅读新文章内容:Python实现一个事件监听器

addEventListener用来将事件和监听器进行绑定,注册事件的处理器到处理器列表中;Start启动事件管理器,启动事件处理线程;Run运行引擎,设置事件阻塞时间为1秒;SendEvent发送事件,向事件队列中存入事件;EventProcess用于处理事件,检查是否存在该事件进行监听的处理函数,若存在则按顺序将事件传递给处理函数执行。

什么场景下适合用事件驱动呢?概括来讲“不需要同步处理的多任务处理就可以使用事件驱动”。那么事件驱动与单线程和多线程之间有什么关系呢?

  • 单线程:同步模型中任务按顺序执行,如果其中某个任务因I/O而阻塞,其他所有任务都需要等待,直到它完成后才能依次执行,即需要被串行化处理。
  • 多线程:多个线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。但是弊端是需要写额外代码去保护共享资源,防止被多个线程同时访问,需要引入线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题。
  • 事件驱动:在一个单独的线程控制中,当处理I/O操作时,注册一个回调到事件循环中,回调中描述了如何处理某个事件,然后当I/O操作完成时继续执行。事件循环轮询所有事件,并当事件到来时分配给待处理事件以回调函数。不需要额外线程,不必担心线程安全问题。

参考:

https://blog.csdn.net/brucewong0516/article/details/84031715

https://criss.blog.csdn.net/article/details/85000570

Twisted是用Python实现的基于事件驱动的网络引擎框架,下一章将和大家分享下Twisted框架相关内容。日拱一卒,我们下期见!