Scalaz(50)- scalaz-stream: 安全的无穷运算-running infinite stream freely详解编程语言
scalaz-stream支持无穷数据流(infinite stream),这本身是它强大的功能之一,试想有多少系统需要通过无穷运算才能得以实现。这是因为外界的输入是不可预料的,对于系统本身就是无穷的,比如键盘鼠标输入什么时候终止、网站上有多少网页、数据库中还有多少条记录等等。但对无穷数据流的运算又引发了新的挑战。我们知道,fp程序的主要运算方式是递归算法,这是个问题产生的源泉:极容易掉入StackOverflowError陷阱。相信许多人对scalaz-stream如何实现无穷数据的运算安全都充满了好奇和疑问,那我们就在本篇讨论中分析一下scalaz-stream的具体运算方式。
scalaz-stream是由Process类型组件链接而成。Process是个状态机器(state machine)由Emit、Await、Append、Halt几个状态组成。值得注意的是这几个状态都是结构化的:
case class Emit[+O](seq: Seq[O]) extends HaltEmitOrAwait[Nothing, O] with EmitOrAwait[Nothing, O] case class Await[+F[_], A, +O]( req: F[A] , rcv: (EarlyCause // A) = Trampoline[Process[F, O]] @uncheckedVariance , preempt : A = Trampoline[Process[F,Nothing]] @uncheckedVariance = (_:A) = Trampoline.delay(halt:Process[F,Nothing]) ) extends HaltEmitOrAwait[F, O] with EmitOrAwait[F, O] { case class Halt(cause: Cause) extends HaltEmitOrAwait[Nothing, Nothing] with HaltOrStep[Nothing, Nothing] case class Append[+F[_], +O]( head: HaltEmitOrAwait[F, O] , stack: Vector[Cause = Trampoline[Process[F, O]]] @uncheckedVariance ) extends Process[F, O] { }
首先这些结构代表了Process类型其中的某种状态,而且要注意Await和Append的连接函数运算结果是Trampoline类型的,说明运算这两个连接函数可以避免StackOverflowError,实现安全运行。同时仔细观察可以发现用这些状态结构是可以实现point和flatMap函数的:
def point(o: O): Process[Nothing,O] = Emit(o) * Generate a `Process` dynamically for each output of this `Process`, and * sequence these processes using `append`. final def flatMap[F2[x] : F[x], O2](f: O = Process[F2, O2]): Process[F2, O2] = { // Util.debug(s"FMAP $this") this match { case Halt(_) = this.asInstanceOf[Process[F2, O2]] case Emit(os) if os.isEmpty = this.asInstanceOf[Process[F2, O2]] case Emit(os) = os.tail.foldLeft(Try(f(os.head)))((p, n) = p ++ Try(f(n))) case [email protected](_, _, _) = aw.extend(_ flatMap f) case [email protected](p, n) = ap.extend(_ flatMap f)
以上证实了Process就是Free Monad。Free Monad可以实现函数结构化,通过heap置换stack,可以在固定的堆栈空间内运行任何规模的程序,有效解决运行递归算法造成的StackOverflowError问题。值得注意的是不但Await和Append这两个状态转换方式是结构化的,它们的连接函数(continuation)运算结果也是包嵌在Trampoline里的。也就是说这样的设计保证了无论在翻译多层的Process状态组合或者运算超长Process链接的stream都可以避免StackOverflowError。
我们来详细了解一下具体的scalaz-stream程序实现方式:在之前的讨论里介绍了通过Free Monad编程的特点是算式/算法关注分离。我们可以说用Process组合成stream就是所谓的算式:对程序功能的描述。而算法具体来说应该由两部分组成:程序翻译和运算,把程序功能描述翻译成Free Monad结构然后运算这些结构里的函数。连续的算法会被翻译成多层的结构。那么翻译和运算就可能会同时进行:翻译一层即运算一层。所以我称算法(interpreter)为译算器:代表翻译和运算。对于无穷运算程序,compiler只能用Process类型的构建器(constructor)把程序翻译成Process的初始状态,然后译算器(interpreter)会一边继续进一步翻译一边运算结果。我们先从分析Process的运算器(runner)Process.runLog作业模式开始:
/** * Collect the outputs of this `Process[F,O]`, given a `Monad[F]` in * which we can catch exceptions. This function is not tail recursive and * relies on the `Monad[F]` to ensure stack safety. final def runLog[F2[x] : F[x], O2 : O](implicit F: Monad[F2], C: Catchable[F2]): F2[Vector[O2]] = { runFoldMap[F2, Vector[O2]](Vector(_))( F, C, // workaround for performance bug in Vector ++ Monoid.instance[Vector[O2]]((a, b) = a fast_++ b, Vector()) }
runLog是runFoldMap函数的一个特殊施用:
/** * Collect the outputs of this `Process[F,O]` into a Monoid `B`, given a `Monad[F]` in * which we can catch exceptions. This function is not tail recursive and * relies on the `Monad[F]` to ensure stack safety. final def runFoldMap[F2[x] : F[x], B](f: O = B)(implicit F: Monad[F2], C: Catchable[F2], B: Monoid[B]): F2[B] = { def go(cur: Process[F2, O], acc: B): F2[B] = { cur.step match { case s: Step[F2,O]@unchecked = (s.head, s.next) match { case (Emit(os), cont) = F.bind(F.point(os.foldLeft(acc)((b, o) = B.append(b, f(o))))) { nacc = go(cont.continue.asInstanceOf[Process[F2,O]], nacc) case (awt:Await[F2,Any,O]@unchecked, cont) = awt.evaluate.flatMap(p = go(p +: cont, acc)) case Halt(End) = F.point(acc) case Halt(Kill) = F.point(acc) case Halt(Error(rsn)) = C.fail(rsn) go(this, B.zero) }
这里面又引用了step函数和Step类型:
/** * Run one step of an incremental traversal of this `Process`. * This function is mostly intended for internal use. As it allows * a `Process` to be observed and captured during its execution, * users are responsible for ensuring resource safety. final def step: HaltOrStep[F, O] = { val empty: Emit[Nothing] = Emit(Nil) @tailrec def go(cur: Process[F,O], stack: Vector[Cause = Trampoline[Process[F,O]]], cnt: Int) : HaltOrStep[F,O] = { if (stack.nonEmpty) cur match { case Halt(End) if cnt = 0 = Step(empty,Cont(stack)) case Halt(cause) = go(Try(stack.head(cause).run), stack.tail, cnt - 1) case Emit(os) if os.isEmpty = Step(empty,Cont(stack)) case emt@(Emit(os)) = Step(emt,Cont(stack)) case [email protected](_,_,_) = Step(awt,Cont(stack)) case Append(h,st) = go(h, st fast_++ stack, cnt - 1) } else cur match { case [email protected](cause) = hlt case [email protected](os) if os.isEmpty = halt0 case [email protected](os) = Step(emt,Cont(Vector.empty)) case [email protected](_,_,_) = Step(awt,Cont(Vector.empty)) case Append(h,st) = go(h,st, cnt - 1) go(this,Vector.empty, 10) // *any* value = 1 works here. higher values improve throughput but reduce concurrency and fairness. 10 is a totally wild guess * Intermediate step of process. * Used to step within the process to define complex combinators. case class Step[+F[_], +O](head: EmitOrAwait[F, O], next: Cont[F, O]) extends HaltOrStep[F, O] { def toProcess : Process[F,O] = Append(head.asInstanceOf[HaltEmitOrAwait[F,O]],next.stack) /** * Continuation of the process. Represents process _stack_. Used in conjunction with `Step`. case class Cont[+F[_], +O](stack: Vector[Cause = Trampoline[Process[F, O]]] @uncheckedVariance) { /** * Prepends supplied process to this stack def +:[F2[x] : F[x], O2 : O](p: Process[F2, O2]): Process[F2, O2] = prepend(p) /** alias for +: */ def prepend[F2[x] : F[x], O2 : O](p: Process[F2, O2]): Process[F2, O2] = { if (stack.isEmpty) p else p match { case app: Append[[email protected], [email protected]] = Append[F2, O2](app.head, app.stack fast_++ stack) case emt: Emit[[email protected]] = Append(emt, stack) case awt: Await[[email protected], _, [email protected]] = Append(awt, stack) case [email protected](_) = Append(hlt, stack) }
Step也是一个结构(case class),代表了一个完整连接的运算步骤:head为当前Emit或Await状态;next是另一种结构Cont,能引导下一个状态。stack代表一串状态连接函数:由当前状态终结原因推导到下一个状态。step函数的作用是判断当前Process状态是否符合构建Step结构条件,返回HaltOrStep类型结果,即:如当前Process状态不符合构建Step条件即进入Halt状态。从step函数中go函数的流程可以得出:当前状态为Emit或者Await时直接转成单步Step(没有下一个状态,next为空)。当前Process状态为Append时才会产生next不为空的Step(意思是完成当前状态运算后产生的结果会引导下一个状态)。很明显,这个step包含了翻译的作用:当前状态为Append时把它翻译成一个连续的Step:next这个Cont结构不为空,而Cont可以被转换成Process[F,O]:
/** * Converts this stack to process, that is used * when following process with normal termination. def continue: Process[F, O] = prepend(halt)
我们再看看runFoldMap里这段代码:
def go(cur: Process[F2, O], acc: B): F2[B] = { cur.step match { case s: Step[F2,O]@unchecked = (s.head, s.next) match { case (Emit(os), cont) = F.bind(F.point(os.foldLeft(acc)((b, o) = B.append(b, f(o))))) { nacc = go(cont.continue.asInstanceOf[Process[F2,O]], nacc) case (awt:Await[F2,Any,O]@unchecked, cont) = awt.evaluate.flatMap(p = go(p +: cont, acc)) case Halt(End) = F.point(acc) case Halt(Kill) = F.point(acc) case Halt(Error(rsn)) = C.fail(rsn)
如果当前状态是个多步的Step(next不为空):运算当前步骤后递归式重复对下面的步骤进行翻译,即重复 - go- step,同时对翻译的步骤进行运算。
下面我们再看看compiler是如何产生Process初始状态的:
1 emit(3) // res3: scalaz.stream.Process0[Int] = Emit(Vector(3)) 2 emitAll(Seq(1,2,3)) // res4: scalaz.stream.Process0[Int] = Emit(List(1, 2, 3)) 3 Process(1,2,3) // res5: scalaz.stream.Process0[Int] = Emit(WrappedArray(1, 2, 3)) 4 emitAll(Seq(1,2,3)).toSource // res6: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Emit(List(1, 2, 3))
compiler对这几个简单的Process描述都产生了所谓的单步结构。runLog可以直接运算Emit结构内的元素然后终止。再看看需要从外部获取数据的Source:
1 await(Task.delay(3))(emit) // res8: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected], function1 , function1 ) 2 eval(Task.delay {3}) // res9: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected], function1 , function1 )
compiler产生的是Await结构。Await结构内确切的内容是:
// Await{task, {o = Emit(o)}, {o = Halt(End)}}
对这个结构runLog会先运算task得出3,然后Emit(3),之后正常终止Halt(End)。
我们跟着再观察一个连续运算的例子:
1 emit(1) ++ emit(2) // res7: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(1)),Vector( function1 )) 2 //Append{Emit(Vector(1)), Vector({case End = Emit(Vector(2)) case c = Halt(c)})} 4 emit(1) ++ emit(2) ++ emit(3) // res8: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(1)),Vector( function1 , function1 )) 5 //Append{Emit(Vector(1)), Vector({case End = Emit(Vector(2)) case c = Halt(c)}, 6 // {case End = Emit(Vector(3)) case c = Halt(c)}}
对于 ++ 操作,compile产生了Append结构。结构内容如上所述。
用递归运算产生了下面的Await结构:
1 await(Task.delay(3))(emit) // res9: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected], function1 , function1 ) 2 eval(Task.delay {3}) // res10: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected], function1 , function1 ) 3 // Await{task, {o = Emit(o)}, {o = Halt(End)}} 5 await(Task.delay(1))(i = await(Task.delay(2))(j = emit(i+j))) 6 // res11: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Await([email protected], function1 , function1 ) 7 // Await{task, {o = Await {task1, {o1 = emit(o+o1)}, {o1 = Halt(End)}} 8 // , {o = Halt(End)}
以上这几个例子里的stream都是明确声明(declarative stream)的,属于有限规模数据。下面我们正式来介绍一下无穷数据流(infinite stream)的具体实现方式。之前我们认识到repeat是个无穷函数:p.repeat表示无限复制Process p。我们看看compiler是如何处理它的:
1 emit(2).repeat // res12: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(2)),Vector( function1 ))
repeat可以用Append结构表示:
case class Append[+F[_], +O]( head: HaltEmitOrAwait[F, O] , stack: Vector[Cause = Trampoline[Process[F, O]]] @uncheckedVariance ) extends Process[F, O] {
再看看repeat函数的实现方法:
/** * Run this process until it halts, then run it again and again, as * long as no errors or `Kill` occur. final def repeat: Process[F, O] = this.append(this.repeat) * If this process halts due to `Cause.End`, runs `p2` after `this`. * Otherwise halts with whatever caused `this` to `Halt`. final def append[F2[x] : F[x], O2 : O](p2: = Process[F2, O2]): Process[F2, O2] = { onHalt { case End = p2 case cause = Halt(cause) }
repeat通过append函数产生了个Append结构。append函数的作用是在上一个Process正常结束时继续运算p2,否则终止Halt。在repeat函数里这个p2就是repeat运算自己。用普通话解释:完成上一个运算后继续不断重复地再运算它。这就是一个典型的无穷数据源了。同时我们可以预测到Append结构里的内容:
1 emit(2).repeat // res12: scalaz.stream.Process[Nothing,Int] = Append(Emit(Vector(2)),Vector( function1 )) 2 // [email protected]{Emit(Vector(2)),Vector({case End = app case c = Halt(c)})}
app代表当前Append。按这样的原理我们可以编写一下无穷数据产生函数:
1 def dup(i: Int): Process[Task,Int] = await(Task.delay(i))(j = emit(j) ++ dup(j)) 2 // dup: (i: Int)scalaz.stream.Process[scalaz.concurrent.Task,Int] 3 dup(5).take(5).runLog.run // res13: Vector[Int] = Vector(5, 5, 5, 5, 5) 5 def inc(start: Int): Process[Task,Int] = await(Task.delay(start))(i = emit(i) ++ inc(i+1)) 6 // inc: (start: Int)scalaz.stream.Process[scalaz.concurrent.Task,Int] 7 inc(5).take(5).runLog.run // res14: Vector[Int] = Vector(5, 6, 7, 8, 9)
我们知道最终这两个函数会产生Append结构,所以确定能够在固定的堆栈空间内运算这些Append结构内的连接函数(continuation),实现安全无穷运算。
原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/12898.html
cgojavamac相关文章
- 域控安全基础.md
- 从Black Hat Speaker到国内外研究者:强化学习的安全应用
- Ninjutsu_v3_08_2020-安全渗透系统安装
- 如何使用IBCS虚拟专线和haproxy搭建高性能、高可用、高安全的本地数据中心?
- 安全沙箱技术的原理、应用程序的安全性和稳定性
- 安全人员发现基于.NET平台且使用开源项目的勒索软件详解编程语言
- Scalaz(56)- scalaz-stream: fs2-安全运算,fs2 resource safety详解编程语言
- shiro安全框架异常退出清除缓存信息详解编程语言
- 作用域安全的构造函数详解编程语言
- SimpleDateFormat 线程不安全及解决方案详解编程语言
- java的多线程安全,ReentrantLock与synchronized锁详解编程语言
- Java 单例模式的线程安全实现详解编程语言
- 智能合约的安全详解编程语言
- MySQL数据库备份:建立安全备份目录(mysql数据库备份目录)
- 利用 Linux 秘钥认证,实现安全登录(linux使用秘钥认证)
- 修改Oracle主机名IP修改实现安全运行(oracle主机名ip)
- 安全工具Linux ATK安全保障:强大的安全防护神器(linuxatk)
- MySQL安全设置:限制外网访问(mysql限制外网访问)
- MSSQL 管理:用户权限实现安全管理(mssql 用户权限设置)
- Oracle安全关闭轻松安装RPM(oracle关闭rpm)
- Oracle支付版权费合理又安全(Oracle使用版权费)
- 人工智能助力锂电池技术,美科学家找到了更安全的固体电解质
- 服务器安全设置之系统服务篇