zl程序教程

您现在的位置是:首页 >  Python

当前栏目

再讲Python不能做游戏后端开发我揍你嗷!​ Twisted——基于事件驱动的Python网络框架

2023-02-18 16:23:09 时间

阿巩

在大家知道阿巩做游戏后端开发后最常有的对话是:你转做C++了吗,我说是Python,然后对面意味深长的叹口气,哦~不过Python慢啊;性能不如静态语言;Python适合写写脚本巴拉巴拉……硬了,拳头硬了,于是就有了这个标题。

标题容易挨打,点进来的大哥大嫂先消消气,容我说下Python能做游戏后端的理由。首先后端包括客户端和服务器端,客户端和服务器端再分为引擎、基础框架和逻辑层,至于核心引擎那必然得是C++老大哥,不过毕竟老大哥,像是逻辑层这样的繁杂工作Python来做就可以了。(据说猪厂也是这样实践的。)

对于服务器,用纯Python实现优势有很多:首先是不停服更新,这保证了服务器稳定运行,游戏线上跑着同时我们修着bug,开发效率大大提升的同时成本和风险也大幅降低,热更上线也不需要用户客户端整包更新;还有对于配置文件,产品小姐姐只需要给张Excel表就好,根据表中数据写成json对应到Python的字典键值对,服务端和客户端只需用这一个配置类就行;Python中的GIL锁确实让Python失去了在多线程领域竞争的资格,不过游戏逻辑部分很少有计算密集型任务,而且多线程也会增加代码逻辑的复杂度(如果非要用也可以用协程或者多进程解决)。

那既然说到性能了,对于追求服务器程序性能的应用有什么适用的Python框架吗?那就是今天和大家分享的Twisted框架,它支持许多常见的传输及应用层协议,包括TCP、UDP、SSL/TLS、HTTP、FTP等,这也意味着能为客户端和服务器端提供自定义开发工具。那为什么就说它能保证高效能通信呢?

Twisted在不同的操作系统平台上利用了不同的底层技术:在Windows中,基于IO完成端口技术保证了底层高效地将I/O事件通知给框架及应用程序;在Linux中采用epoll技术,它能显著提高在大量并发连接中只有少量活跃的情况下CPU利用率。Twisted框架采用Reactor设计模式,它的核心是Reactor的事件循环,监听网络、文件系统以及定时器等事件,并提供统一处理接口,使得事件能被快速响应。

在上一篇事件驱动中介绍过:对于不需要同步处理的多任务,我们可以使用事件驱动。那么在Twisted中使得程序设计可以采用事件驱动机制得益于Deferred(延迟)对象,它是一个管理回调函数的对象,我们可以向该对象添加需要回调的函数,同时可以指定该组回调函数何时被调用。

from twisted.internet import defer
from twisted.python import failure
import sys

d = defer.Deferred()  # 定义defer实例
def printSquare(d):  # 正常处理函数
    print("Square of %d is %d" % (d, d * d))

def processError(f):  # 错误处理函数
    print("Error with process ")
d.addCallback(printSquare)  # 添加正常处理的回调函数
d.addErrback(processError)  # 添加错误处理回调函数
# 开始调用defer
if len(sys.argv) > 1 and sys.argv[1] == 'call_error':
    f = failure.Failure(Exception('my Exception'))
    d.errback(f)  # 调用错误处理函数processError
else:
    d.callback(4)  # 调用正常处理函数printSquare(4)

代码围绕twisted.internet.defer.Deffered对象展开。

Defer中可以管理两种回调函数:Deffered.addCallback()正常处理函数和Deffered.addErrback错误处理函数。两种回调函数可以通过Deffered.callback()和Deffered.errback()进行调用。

另外可以给一个Deffer对象赋予多个正常或错误处理的回调函数,这样在Defer对象内部形成正常处理函数链和错误处理函数链,示例代码如下。

from twisted.internet import defer

d = defer.Deferred()

def printSquare(d):
    print("Square of %d is %d", (d, d * d))
    return d

def processError(f):
    print("error with process")

def printTwice(d):
    print("Twice of %d is %d", (2 * d))
    return d

d.addCallback(printSquare)
d.addErrback(processError)
d.addCallback(printTwice)

d.callback(5)
# Square of %d is %d (5, 25)
# Twice of %d is %d 10

Deffered的主要成员函数包括:

addCallback(self, callback, *args, **kwargs)

给Defer对象添加正常处理回调函数,需要至少有一个输入参数

addErrback(self, errback, *args, **kwargs)

给Defer对象添加错误处理回调函数,errback为错误处理函数名,需要至少有一个输入参数

addBoth(self, callback, *args, **kwargs)

回调函数同时作为正常和错误处理回调函数添加到Defer对象中

chainDeffered(self, d)

将另一个Defer对象的正常和错误处理回调函数添加到本Defer对象中。本函数是单向的

callback(self, result)

调用正常处理函数链,result是传递给第一个正常处理回调函数的参数

errback(self, fail=None)

调用错误处理函数链,result是传递给第一个错误处理回调函数的参数。

pause(self)和unpause(self)

pause(self)和unpause(self) 用来暂停和继续调用链

Defer为什么要分别管理两条回调函数调用链?因为调用链函数之间除了简单的顺序调用关系,还存在交叉调用关系,两条为了对回调过程提供更好的可控性,调用流程图如下:

其中实线为回调函数正常返回时的继续调用路径,虚线为处理函数中产生异常时的后续调用路径。

我们再将Deffer对象和reactor的延时调用机制结合在一起,来实现异步调用函数。

from twisted.internet import reactor, defer

def printSquare(d):
    print("Square of %d is %d" % (d, d * d))
    return d

def printTwice(d):
    print("Twice of %d is %d", (2 * d))
    return d

def makeDefer():
    d = defer.Deferred()
    d.addCallback(printSquare)
    d.addCallback(printTwice)
    reactor.callLater(2, d.callback, 5)  # 配置延时2s调用

makeDefer()
reactor.run()
# Square of 5 is 25
# Twice of %d is %d 10
# 挂起运行

makeDefer函数内定义了调用链执行的逻辑关系,其中 reactor.callLater(2, d.callback, 5)表示在reactor.run()运行后的2后,twisted框架才去调用callback对应的两个函数(printSquare,printTwice)。

callLater()函数原型如下

def callLater(delay, callable, *args, **kw):
  pass

delay定义延时调用秒数,如果为0则是立即调用;callable为被调用的函数名及其参数。

通过reactor.callLater(4, reactor.stop)定义4秒后调用函数reactor.stop(),还可以实现定时退出Twisted消息循环。


下面我们通过一个实时通信的广播系统模型介绍下用Twisted框架开发基于TCP的网络应用的方法:

首先Twisted提供了基本的通信编程封装,这里先介绍下Transports。它代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性,比如TCP、UDP和Unix套接字。对应方法如下:

write

以非阻塞的方式按顺序依次将数据写到物理连接上

writeSequence

将一个字符串列表写到物理连接上

loseConnection

将所有挂起的数据写入,然后关闭连接

getPeer

取得连接中对端的地址信息

getHost

取得连接中本端的地址信息

Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:

makeConnection

在transport对象和服务器之间建立一条连接

connectionMade

连接建立起来后调用

dataReceived

接收数据时调用

connectionLost

关闭连接时调用

广播系统服务器

针对Twisted的Protocol、Factory等类进行编程,定义它们的子类并重写connectionMade和dataReceived进行事件化处理。

from twisted.internet.protocol import Protocol
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import TCP4ServerEndpoint
from twisted.internet import reactor

clients = []  # 保存所有客户端连接
# Protocol的子类
class Spreader(Protocol):
    def __init__(self, factory):
        self.factory = factory

    def connectionMade(self):
        self.factory.numProtocols = self.factory.numProtocols + 1  # 对连接的客户端进行计数
        self.transport.write(
            (u"欢迎来到Spread Site,您是第%d个客户端用户!\n" %
             (self.factory.numProtocols,).encode('utf8')))
        print(f"new connect {self.factory.numProtocols}")
        clients.append(self)  # 将self保存到clients列表中

    def connectionLost(self, reason):
        clients.remove()
        print(f"lost connect: {self.connect_id}")

    def dataReceived(self, data):
        if data == "close":
            self.transport.loseConnection()
            print(f"{self.connect_id} closed")
        else:
            print(f"spreading message from {self.connect_id, self.data}")
            for client in clients:
                if client != self:
                    # 将收到的数据通过Protocol.transport.write()函数分发给除自己以外的所有客户端
                    client.transport.write(data)
# Factory的子类
class SpreadFactory(Factory):
    def __init__(self):
        self.numProtocols = 0  # 将客户端计数器置0

    def buildProtocol(self, addr):
        return Spreader(self)  # 建立Protocol子类的实例

endpoint = TCP4ServerEndpoint(reactor, 8007)  # 定义服务器监听端口
endpoint.listen(SpreadFactory())  # 指定子类实例
reactor.run()  # 挂起运行

广播客户端

Twisted同样提供了基于Protocol类的TCP客户端编程方法。

from twisted.internet.protocol import Protocol, ClientFactory
from twisted.internet import reactor
import sys
import datetime

class Echo(Protocol):
    def connectionMade(self):
        print("connect to server!")

    def dataReceived(self, data):
        print("got message: ", data.decode('utf8'))
        reactor.callLater(5, self.say_hello)

    def connectionLost(self, reason):
        print("Disconnected from server!")

    def say_hello(self):
        if self.transport.connected:
            self.transport.write(
                (u"hello, I'm %s %s" % (sys.argv[1], datetime.datetime.now())).encode('utf-8'))

class EchoClientFactory(ClientFactory):
    def __init__(self):
        self.protocol = None

    def startedConnecting(self, connector):
        print("Start to connect.")

    def buildProtocol(self, addr):
        self.protocol = Echo()
        return self.protocol

    def clientConnectionLost(self, connector, reason):
        print("Lost Connection. Reason:", reason)

    def clientConnectionFailed(self, connector, reason):
        print("Connection failed. Reason:", reason)

host = "127.0.0.1"
port =8007
factory = EchoClientFactory()
reactor.connectTCP(host, port, factory)
reactor.run()

执行顺序如下:

  • 建立连接

ClientFactory.startedConnecting()

Protocol.connectionMade()

  • 已连接

用Protocol.dataReceived()接受消息

用Protocol.transport.write()发送消息

  • 连接断开

Protocol.connectionLost()

ClientFactory. clientConnectionLost()

即建立连接时先执行ClientFactory中回调,然后执行Protocol中回调,连接断开时正好相反。

END