zl程序教程

您现在的位置是:首页 >  Python

当前栏目

Python响应式类库RxPy简介

2023-02-18 15:29:20 时间

RxPy是非常流行的响应式框架Reactive X的Python版本,其实这些版本都是一样的,只不过是各个语言的实现不同而已。因此,如果学会了其中一种,那么使用其他的响应式版本也是轻而易举的。之前我就听说过这个框架,最近决定好好研究一下。


基本概念

Reactive X中有几个核心的概念,先来简单介绍一下。


Observable和Observer(可观察对象和观察者)

首先是Observable和Observer,它们分别是可观察对象和观察者。Observable可以理解为一个异步的数据源,会发送一系列的值。Observer则类似于消费者,需要先订阅Observable,然后才可以接收到其发射的值。可以说这组概念是设计模式中的观察者模式和生产者-消费者模式的综合体。


Operator(操作符)

另外一个非常重要的概念就是操作符了。操作符作用于Observable的数据流上,可以对其施加各种各样的操作。更重要的是,操作符还可以链式组合起来。这样的链式函数调用不仅将数据和操作分隔开来,而且代码更加清晰可读。一旦熟练掌握之后,你就会爱上这种感觉的。


Single(单例)

在RxJava和其变体中,还有一个比较特殊的概念叫做Single,它是一种只会发射同一个值的Observable,说白了就是单例。当然如果你对Java等语言比较熟悉,那么单例想必也很熟悉。


Subject(主体)

主体这个概念非常特殊,它既是Observable又是Observer。正是因为这个特点,所以Subject可以订阅其他Observable,也可以将发射对象给其他Observer。在某些场景中,Subject会有很大的作用。


Scheduler(调度器)

默认情况下Reactive X只运行在当前线程下,但是如果有需要的话,也可以用调度器来让Reactive X运行在多线程环境下。有很多调度器和对应的操作符,可以处理多线程场景下的各种要求。


Observer和Observable

先来看看一个最简单的例子,运行的结果会依次打印这些数字。这里的of是一个操作符,可以根据给定的参数创建一个新的Observable。创建之后,就可以订阅Observable,三个回调方法在对应的时机执行。一旦Observer订阅了Observable,就会接收到后续Observable发射的各项值。


from rx import of


ob = of(1, 2, 34, 5, 6, 7, 7)

ob.subscribe(

    on_next=lambda i: print(f'Received: {i}'),

    on_error=lambda e: print(f'Error: {e}'),

    on_completed=lambda: print('Completed')


)

1

2

3

4

5

6

7

8

9

这个例子看起来好像很简单,并且看起来没什么用。但是当你了解了Rx的一些核心概念,就会理解到这是一个多么强大的工具。更重要的是,Observable生成数据和订阅的过程是异步的,如果你熟悉的话,就可以利用这个特性做很多事情。


操作符

在RxPy中另一个非常重要的概念就是操作符了,甚至可以说操作符就是最重要的一个概念了。几乎所有的功能都可以通过组合各个操作符来实现。熟练掌握操作符就是学好RxPy的关键了。操作符之间也可以用pipe函数连接起来,构成复杂的操作链。


from rx import of, operators as op

import rx


ob = of(1, 2, 34, 5, 6, 7, 7)

ob.pipe(

    op.map(lambda i: i ** 2),

    op.filter(lambda i: i >= 10)

).subscribe(lambda i: print(f'Received: {i}'))

1

2

3

4

5

6

7

8

在RxPy中有大量操作符,可以完成各种各样的功能。我们来简单看看其中一些常用的操作符。如果你熟悉Java8的流类库或者其他函数式编程类库的话,应该对这些操作符感到非常亲切。


创建型操作符

首先是创建Observable的操作符,列举了一些比较常用的创建型操作符。


操作符作用

just(n)只包含1个值的Observable

repeated_value(v,n)重复n次值为v的Observable

of(a,b,c,d)包含所有参数的Observable

empty()一个空的Observable

from_iterable(iter)用iterable创建一个Observable

generate(0, lambda x: x < 10, lambda x: x + 1)用初始值和循环条件生成Observable

interval(n)以n秒为间隔定时发送整数序列的Observable

过滤型操作符

过滤型操作符的主要作用是对Observable进行筛选和过滤。


操作符作用

debounce按时间间隔过滤,在范围内的值会被忽略

distinct忽略重复的值

elementAt只发射第n位的值

filter按条件过滤值

first/last发射首/尾值

skip跳过前n个值

take只取前n个值

转换型操作符

操作符作用

flatMap转换多个Observable的值并将它们合并为一个Observable

groupBy对值进行分组,返回多个Observable

map将Observable映射为另一个Observable

scan将函数应用到Observable的每个值上,然后返回后面的值