An Introduction to Asynchronous Programming and Twisted (3)
Part 11: Your Poetry is Served
A Twisted Poetry Server
Now that we’ve learned so much about writing clients with Twisted, let’s turn around and re-implement our poetry server with Twisted too. And thanks to the generality of Twisted’s abstractions, it turns out we’ve already learned almost everything we need to know.
class PoetryProtocol(Protocol): def connectionMade(self): self.transport.write(self.factory.poem) self.transport.loseConnection() class PoetryFactory(ServerFactory): protocol = PoetryProtocol def __init__(self, poem): self.poem = poem def main(): options, poetry_file = parse_args() poem = open(poetry_file).read() factory = PoetryFactory(poem) from twisted.internet import reactor port = reactor.listenTCP(options.port or 0, factory, interface=options.iface) reactor.run()
可见server和client基本原理上是一致的, reactor loop侦听事件, 事件到达时使用protocol去处理, factory用于管理protocol, 继承自ServerFactory.
Part 12: A Poetry Transformation Server
这节中实现一个复杂些的server, 根据client发送不同的请求, 将poem做不同的转换并发回client, 这就需要一个协议使得client和server可以正常沟通.
Twisted includes support for several protocols we could use to solve this problem, including XML-RPC, Perspective Broker, and AMP.
但是为了是我们的例子足够简单以至于容易理解, 我们使用自己的一个简单的协议,
<transform-name>.<text of the poem>
当server接收到从客户端发出的这样的request后, 根据transform-name将text of the poem进行相应的transform, 并发送回client.
class TransformProtocol(NetstringReceiver): def stringReceived(self, request): if '.' not in request: # bad request self.transport.loseConnection() return xform_name, poem = request.split('.', 1) self.xformRequestReceived(xform_name, poem) def xformRequestReceived(self, xform_name, poem): new_poem = self.factory.transform(xform_name, poem) if new_poem is not None: self.sendString(new_poem) self.transport.loseConnection() class TransformFactory(ServerFactory): protocol = TransformProtocol def __init__(self, service): self.service = service def transform(self, xform_name, poem): thunk = getattr(self, 'xform_%s' % (xform_name,), None) if thunk is None: # no such transform return None try: return thunk(poem) except: return None # transform failed def xform_cummingsify(self, poem): return self.service.cummingsify(poem) class TransformService(object): def cummingsify(self, poem): return poem.lower() def main(): service = TransformService() factory = PoetryFactory(service) from twisted.internet import reactor port = reactor.listenTCP(options.port or 0, factory, interface=options.iface) reactor.run()
来看看这段代码,
首先, TransformProtocol继承自NetstringReceiver而非Protocol, NetstringReceiver是一种专门用来处理string的协议, 这儿可以使用和继承Twisted开发框架提供的各种协议来简化代码, 而不用每次从头开发, 这就是使用框架的好处.
在TransformProtocol中对于poem具体的transform逻辑上, 调用self.factory.transform, 把变数扔给factory, 而保持protocol的高度抽象, transform逻辑变化,添减, 都保持protocol不需要有任何改动.
其次, 在TransformFactory中, 使用python强大的getattr来避免使用大量的if…else.
但这儿只提供了cummingsify service, 如果要增加或删除service, TransformFactory和TransformService难免需要修改...
这段代码已经写的不错...不过缺少些Twisted的感觉...如果加上deferred的callback机制, 应该可以写出更highlevel的代码.
Part 13: Deferred All The Way Down
Introduction
Recall poetry client 5.1 from Part 10.The client used a Deferred to manage a callback chain that included a call to a poetry transformation engine. In client 5.1, the engine was implemented as a synchronous function call implemented in the client itself.
Client5.1中异步去获取poem, 然后调用callback函数cummingsify做transform, 现在我们在Part12中实现了TransformService, 即poem transform也要用异步的方式让服务器去完成.
这其实是个比较自然的想法, 由于reactor的特性, 任何callback都必须是unblock的, 但实际上, 很多callback处理是需要花费较长的时间的, 这个时候在callback内也必须异步处理, 来保证callback本身的unblock, 即callback本身也无法直接返回结果, 而只能返回deferred对象.
如下图, 当碰到这种inner deferred时,
The outer deferred needs to wait until the inner deferred is fired. Of course, the outer deferred can’t block either, so instead the outer deferred suspends the execution of the callback chain and returns control to the reactor
And how does the outer deferred know when to resume? Simple — by adding a callback/errback pair to the inner deferred. Thus, when the inner deferred is fired the outer deferred will resume executing its chain. If the inner deferred succeeds (i.e., it calls the callback added by the outer deferred), then the outer deferred calls its N+1 callback with the result. And if the inner deferred fails (calls the errback added by the outer deferred), the outer deferred calls the N+1errback with the failure.
下面这段代码给出了怎么样封装inner deferred来提供异步callback,
class TransformClientProtocol(NetstringReceiver): def connectionMade(self): self.sendRequest(self.factory.xform_name, self.factory.poem) def sendRequest(self, xform_name, poem): self.sendString(xform_name + '.' + poem) def stringReceived(self, s): self.transport.loseConnection() self.poemReceived(s) def poemReceived(self, poem): self.factory.handlePoem(poem) class TransformClientFactory(ClientFactory): protocol = TransformClientProtocol def __init__(self, xform_name, poem): self.xform_name = xform_name self.poem = poem self.deferred = defer.Deferred() def handlePoem(self, poem): d, self.deferred = self.deferred, None d.callback(poem) def clientConnectionLost(self, _, reason): if self.deferred is not None: d, self.deferred = self.deferred, None d.errback(reason) clientConnectionFailed = clientConnectionLost class TransformProxy(object): """ I proxy requests to a transformation service. """ def __init__(self, host, port): self.host = host self.port = port def xform(self, xform_name, poem): factory = TransformClientFactory(xform_name, poem) from twisted.internet import reactor reactor.connectTCP(self.host, self.port, factory) return factory.deferred def cummingsify(poem): d = proxy.xform('cummingsify', poem) def fail(err): print >>sys.stderr, 'Cummingsify failed!' return poem return d.addErrback(fail)
最后这个函数就是封装好的异步callback, 大家可以和之前part10的callback对比一下...
def cummingsify(poem): print 'First callback, cummingsify' poem = engine.cummingsify(poem) return poem def cummingsify_failed(err): if err.check(GibberishError): print 'Second errback, cummingsify_failed, use original poem' return err.value.args[0] #return original poem return err
再来看一下part10中的callback顺序图, 此时cummingsify为异步callback, cummingsify_failed被加到inner deferred中, 当这个inner deferred被fired时, outer deferred会根据inner deferred情况去调用, got_poem或poem_failed. 其中具体过程似乎是透明的...或者说我也不清楚
作者在这儿也没有讲清, 个人认为这儿如果能参照Part10给个完整的代码例子, 会更清晰一些...
Part 14: When a Deferred Isn’t
We’ll make a caching proxy server. When a client connects to the proxy, the proxy will either fetch the poem from the external server or return a cached copy of a previously retrieved poem.
这儿可见, 如果是直接从cache返回的话可以直接同步处理, 如需要去external server获取的话就需要异步处理.
这样的有时需要同步, 有时需要异步的情况, 怎么办?
如下代码中, get_poem可能返回的是poem, 也有可能是deferred对象, 对于调用者怎么处理...
class ProxyService(object): poem = None # the cached poem def __init__(self, host, port): self.host = host self.port = port def get_poem(self): if self.poem is not None: print 'Using cached poem.' return self.poem print 'Fetching poem from server.' factory = PoetryClientFactory() factory.deferred.addCallback(self.set_poem) from twisted.internet import reactor reactor.connectTCP(self.host, self.port, factory) return factory.deferred def set_poem(self, poem): self.poem = poem return poem class PoetryProxyProtocol(Protocol): def connectionMade(self): d = maybeDeferred(self.factory.service.get_poem) d.addCallback(self.transport.write) d.addBoth(lambda r: self.transport.loseConnection()) class PoetryProxyFactory(ServerFactory): protocol = PoetryProxyProtocol def __init__(self, service): self.service = service
使用maybeDeferred来解决这个问题, 这个函数会把poem也封装成一个already-fired deferred
- If the function returns a deferred,
maybeDeferred
returns that same deferred, or - If the function returns a Failure,
maybeDeferred
returns a new deferred that has been fired (via.errback
) with that Failure, or - If the function returns a regular value,
maybeDeferred
returns a deferred that has already been fired with that value as the result, or - If the function raises an exception,
maybeDeferred
returns a deferred that has already been fired (via.errback()
) with that exception wrapped in a Failure.
- If the function returns a deferred,
An already-fired deferred may fire the new callback (or errback, depending on the state of the deferred) immediately, i.e., right when you add it.
或者使用succeed函数, The defer.succeed
function is just a handy way to make an already-fired deferred given a result.
def get_poem(self): if self.poem is not None: print 'Using cached poem.' # return an already-fired deferred return succeed(self.poem) print 'Fetching poem from server.' factory = PoetryClientFactory() factory.deferred.addCallback(self.set_poem) from twisted.internet import reactor reactor.connectTCP(self.host, self.port, factory) return factory.deferred
本文章摘自博客园,原文发布日期:2011-09-15
相关文章
- 在 Go 里用 CGO?这 7 个问题你要关注!
- 9款优秀的去中心化通讯软件 Matrix 的客户端
- 求职数据分析,项目经验该怎么写
- 在OKR中,我看到了数据驱动业务的未来
- 火山引擎云原生大数据在金融行业的实践
- OpenHarmony富设备移植指南(二)—从postmarketOS获取移植资源
- 《数据成熟度指数》报告:64%的企业领袖认为大多数员工“不懂数据”
- OpenHarmony 小型系统兼容性测试指南
- 肯睿中国(Cloudera):2023年企业数字战略三大趋势预测
- 适用于 Linux 的十大命令行游戏
- GNOME 截图工具的新旧截图方式
- System76 即将推出的 COSMIC 桌面正在酝酿大变化
- 2GB 内存 8GB 存储即可流畅运行,Windows 11 极致精简版系统 Tiny11 发布
- 迎接 ecode:一个即将推出的具有全新图形用户界面框架的现代、轻量级代码编辑器
- loongarch架构介绍(三)—地址翻译
- Go 语言怎么解决编译器错误“err is shadowed during return”?
- 敏捷:可能被开发人员遗忘的部分
- Denodo预测2023年数据管理和分析的未来
- 利用数据推动可持续发展
- 在 Vue3 中实现 React 原生 Hooks(useState、useEffect),深入理解 React Hooks 的