zl程序教程

您现在的位置是:首页 >  后端

当前栏目

泛函编程(12)-数据流-Stream详解编程语言

编程编程语言 详解 12 stream 数据流 泛函
2023-06-13 09:20:38 时间

   在前面的章节中我们介绍了List,也讨论了List的数据结构和操作函数。List这个东西从外表看上去挺美,但在现实中使用起来却可能很不实在。为什么?有两方面:其一,我们可以发现所有List的操作都是在内存中进行的,要求List中的所有元素都必须在操作时存在于内存里。如果必须针对大型数据集进行List操作的话就明显不切实际了。其二,List的抽象算法如折叠算法、map, flatMap等是无法中途跳出的,无论如何都一直进行到底;只有通过递归算法在才能在中途停止运算。但递归算法不够抽象,经常出现重复的代码。最要命的是递归算法会随着数据量增加堆栈内存占用(non-tail-recursive),处理大型数据集同样不实际。以上缺陷使List的应用被局限在小规模的数据集处理范围。

    矛盾的是,List由于内存占用问题不适合大数据集处理,但它的计算模式又是排列数据模式必须的选择。Stream数据类型具备了List的排列数据计算模式但有不需要将全部数据搬到内存里,可以解决以上提到的大数据集处理问题。Stream的特性是通过“延后计算”(lazy evaluation)来实现的。可以想象一下可能的原理:Stream内元素读取是在具体使用时才进行的。不用说,Stream是典型的只读数据类型。既然要继承List的计算模式,那么在结构设计上是否相同呢?我们先看看Stream的结构设计:

1 trait Stream[+A] 

2 case object Empty extends Stream[Nothing] 

3 case class Cons[+A](head: () = A, tail: () = Stream[A]) extends Stream[A]

天啊,简直是活脱脱的List结构嘛。不过Stream的头元素(head)和无头尾(tail)是延后计算的(non-strict)。由于Cons不是普通函数而是一个类,不容许延后计算类参数,所以传入的是一个函数 () = ???。

以上Stream结构设计与List相同;两种状态是用子类来表示的。以下我们探索以下另外一种设计方案:

 1 trait Stream[+A] { 

 2 def uncons: Option[(A, Stream[A])] 

 3 def isEmpty: Boolean = uncons.isEmpty 

 4 } 

 5 object Stream { 

 6 def empty[A]: Stream[A] = new Stream[A] { 

 7 def uncons = None 

 8 } 

 9 def cons[A](h: = A, t: = Stream[A]): Stream[A] = new Stream[A] { 

10 def uncons = Some((h,t)) 

11 } 

12 def apply[A](as: A*): Stream[A] = { 

13 if (as.isEmpty) empty 

14 else cons(as.head, apply(as.tail: _*)) 

15 } 

16 }

以上的设计方案采用了结构封装形式:数据结构uncons,两种状态empty, cons都被封装在类结构里。最起码我们现在可以直接使用= A 来表达延后计算参数了。

实际上Stream就是对一个List的描述,一个类型的声明。它的实例生成延后到了具体使用的时候,此时需要的元素已经搬入内存,成了货真价实的List了:

 

 1 //tail recursive 

 2 def toList_1: List[A] = { 

 3 @annotation.tailrec 

 4 def go(s: Stream[A], acc: List[A]): List[A] = { 

 5 s.uncons match { 

 6 case None = acc 

 7 case Some((h,t)) = go(t,h :: acc) 

 8 } 

 9 } 

10 go(this,Nil).reverse // h :: acc 产生相反顺序 

11 } 

12 //省去reverse 

13 def toListFast: List[A] = { 

14 val buf = new collection.mutable.ListBuffer[A] 

15 @annotation.tailrec 

16 def go(s: Stream[A]): List[A] ={ 

17 s.uncons match { 

18 case Some((h,t)) = { 

19 buf += h 

20 go(t) 

21 } 

22 case _ = buf.toList 

23 } 

24 } 

25 go(this) 

26 } 

27 Stream(1,2.3) // res0: ch5.stream.Stream[Double] = [email protected] 

28 Stream(1,2,3).toList // res1: List[Int] = List(1, 2, 3) 

29 Stream(1,2,3).toList_1 // res2: List[Int] = List(1, 2, 3) 

30 Stream(1,2,3).toListFast // res3: List[Int] = List(1, 2, 3)

 

看看,Stream(1,2,3)就是一个声明。我们通过List转换才真正产生了实例。

再看看Stream最基本的一些操作函数:

 

 1 def take(n: Int): Stream[A] = { 

 2 if ( n == 0 ) empty 

 3 else 

 4 uncons match { 

 5 case None = empty 

 6 case Some((h,t)) = cons(h,t.take(n-1)) 

 7 } 

 8 } 

 9 def drop(n: Int): Stream[A] = { 

10 if (n == 0) this 

11 else { 

12 uncons match { 

13 case Some((h,t)) = t.drop(n-1) 

14 case _ = this 

15 } 

16 } 

17 } 

18 Stream(1,2,3) take 2 // res4: ch5.stream.Stream[Int] = [email protected] 

19 (Stream(1,2,3) take 2).toList // res5: List[Int] = List(1, 2) 

20 Stream(1,2,3) drop 2 // res6: ch5.stream.Stream[Int] = [email protected] 

21 (Stream(1,2,3) drop 2).toList // res7: List[Int] = List(3)

 

从操作结果可以确定:Stream的操作也都是对操作的描述,是延后计算的。当元素被搬到List时系统才回真正计算这些Stream元素的值。

不过这些操作函数的实现方式与List基本相像:

 

 1 def takeWhile(f: A = Boolean): Stream[A] = { 

 2 uncons match { 

 3 case None = empty 

 4 case Some((h,t)) = if ( f(h) ) cons(h,t.takeWhile(f)) else empty 

 5 } 

 6 } 

 7 def dropWhile(f: A = Boolean): Stream[A] = { 

 8 uncons match { 

 9 case None = empty 

10 case Some((h,t)) = if ( f(h) ) t.dropWhile(f) else t 

11 } 

12 } 

13 def headOption: Option[A] = uncons match { 

14 case Some((h,t)) = Some(h) 

15 case _ = None 

16 } 

17 def tail: Stream[A] = uncons match { 

18 case Some((h,t)) = t 

19 case _ = empty 

20 } 

22 (Stream(1,2,3,4,5) takeWhile {_ 3}).toList // res8: List[Int] = List(1, 2) 

23 (Stream(1,2,3,4,5) dropWhile {_ 3}).toList // res9: List[Int] = List(4, 5) 

24 Stream(1,2,3,4,5).tail // res10: ch5.stream.Stream[Int] = [email protected] 

25 (Stream(1,2,3,4,5).tail).toList // res11: List[Int] = List(2, 3, 4, 5) 

26 Stream(1,2,3,4,5).headOption // res12: Option[Int] = Some(1)

前面提到过List的折叠算法无法着中途跳出,而Stream通过“延后计算”(lazy evaluation)是可以实现提早终结计算的。我们先看看Stream的右折叠(foldRight)算法:

1 def foldRight[B](z: B)(op: (A, = B) = B): B = { 

2 uncons match { 

3 case None = z 

4 case Some((h,t)) = op(h,t.foldRight(z)(op)) 

6 }

这个与List的foldRight简直一模样嘛,不同的只有op函数的第二个参数是延后计算的 = B。秘密就在这个延后计算的B上。看看下面图示:

 泛函编程(12)-数据流-Stream详解编程语言

由于op的第二个参数B是延后计算的,那么t.foldRight(z)(op)这个表达式的计算就是延后的,系统可以决定先不计算这个表达式从而得到了一个中间停顿的结果。

函数exists是在碰到第一个符合条件的元素时马上终止的。我们通常使用递归算法来实现exists的这个特性。现在我们也可以用右折叠算法达到同样效果:

1 def exists(p: A = Boolean): Boolean = { 

2 foldRight(false){(a,b) = p(a) || b } 

3 }

注意:当p(a)=true时系统不再运算b,所以整个运算停了下来。

同样,用foldRight来实现forAll:

1 def forAll(p: A = Boolean): Boolean = { 

2 foldRight(true){(a,b) = p(a) b} 

3 }

当我们遇到数据结构只能存一个元素如Option,Either时我们用map2来对接两个结构。当我们遇到能存多个元素的数据结构如List,Tree时我们就会用append来对接。Stream是一个多元素的数据结构,我们需要实现append:

1 //把两个Stream连接起来 

2 def append[B : A](b: Stream[B]): Stream[B] = { 

3 uncons match { 

4 case None = b 

5 case Some((h,t)) = cons(h, t.append(b)) 

8 //append简写 

9 def #++[B : A](b: Stream[B]): Stream[B] = append(b)

1 (Stream(1,2) #++ Stream(3,4,5)).toList // res14: List[Int] = List(1, 2, 3, 4, 5)

标准装备函数实现:

 1 //用递归算法 

 2 def flatMap[B](f: A = Stream[B]): Stream[B] = { 

 3 uncons match { 

 4 case None = empty 

 5 case Some((h,t)) = f(h) #++ t.flatMap(f) 

 6 } 

 7 } 

 8 //用foldRight实现 

 9 def flatMap_1[B](f: A = Stream[B]): Stream[B] = { 

10 foldRight(empty[B]){(h,t) = f(h) #++ t} 

11 } 

12 //用递归算法 

13 def filter(p: A = Boolean): Stream[A] = { 

14 uncons match { 

15 case None = empty 

16 case Some((h,t)) = if(p(h)) cons(h,t.filter(p)) else t.filter(p) 

17 } 

18 } 

19 //用foldRight实现 

20 def filter_1(p: A = Boolean): Stream[A] = { 

21 foldRight(empty[A]){(h,t) = if(p(h)) cons(h,t) else t} 

22 } 

23 (Stream(1,2,3,4,5) map {_ + 10}).toList // res15: List[Int] = List(11, 12, 13, 14, 15) 

24 (Stream(1,2,3,4,5) flatMap {x = Stream(x+10)}).toList 

25 // res16: List[Int] = List(11, 12, 13, 14, 15) 

26 (Stream(1,2,3,4,5) flatMap_1 {x = Stream(x+10)}).toList 

27 // res17: List[Int] = List(11, 12, 13, 14, 15) 

28 (Stream(1,2,3,4,5) filter {_ 3}).toList // res18: List[Int] = List(1, 2) 

29 (Stream(1,2,3,4,5) filter_1 {_ 3}).toList // res19: List[Int] = List(1, 2)

看来都备齐了。

我们再看看List与Stream还有什么别的值得关注的区别。先从一个List操作的例子开始:

1 scala List(1,2,3,4) map (_ + 10) filter (_ % 2 == 0) map (_ * 3) 

2 List(36,42)

根据List的特性,每个操作都会立即完成,产生一个结果List,然后接着下一个操作。我们试着约化:

1 List(1,2,3,4) map (_ + 10) filter (_ % 2 == 0) map (_ * 3) 

2 List(11,12,13,14) filter (_ % 2 == 0) map (_ * 3) 

3 List(12,14) map (_ * 3) 

4 List(36,42)

实际上这个运算遍历(traverse)了List三次。一次map操作产生了中间List(11,12,13,14),二次操作filter产生了List(12,14),三次操作map产生最终结果List(36,42)。实际上我们如果把遍历这个List的方式变一下:变成每次走一个元素,连续对这个元素进行三次操作,直到走完整个List。这样我们在一个遍历过程就可以完成全部三个操作。Stream恰好是一个元素一个元素走的,因为下面的元素处于延后计算状态。我们试着用Stream来证明:

 1 Stream(1,2,3,4).map(_ + 10).filter(_ % 2 == 0) 

 2 (11 #:: Stream(2,3,4).map(_ + 10)).filter(_ % 2 == 0) 

 3 Stream(2,3,4).map(_ + 10).filter(_ % 2 == 0) 

 4 (12 #:: Stream(3,4).map(_ + 10)).filter(_ % 2 == 0) 

 5 12 #:: Stream(3,4).map(_ + 10).filter(_ % 2 == 0) 

 6 12 #:: (13 #:: Stream(4).map(_ + 10)).filter(_ % 2 == 0) 

 7 12 #:: Stream(4).map(_ + 10).filter(_ % 2 == 0) 

 8 12 #:: (14 #:: Stream().map(_ + 10)).filter(_ % 2 == 0) 

 9 12 #:: 14 #:: Stream().map(_ + 10).filter(_ % 2 == 0) 

10 12 #:: 14 #:: Stream()

以上的#::是cons的操作符号。

 

原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12971.html

cgojava