Akka(19): Stream:组合数据流,组合共用-Graph modular composition详解编程语言
akka-stream的Graph是一种运算方案,它可能代表某种简单的线性数据流图如:Source/Flow/Sink,也可能是由更基础的流图组合而成相对复杂点的某种复合流图,而这个复合流图本身又可以被当作组件来组合更大的Graph。因为Graph只是对数据流运算的描述,所以它是可以被重复利用的。所以我们应该尽量地按照业务流程需要来设计构建Graph。在更高的功能层面上实现Graph的模块化(modular)。按上回讨论,Graph又可以被描述成一种黑盒子,它的入口和出口就是Shape,而内部的作用即处理步骤Stage则是用GraphStage来形容的。下面是akka-stream预设的一些基础数据流图:
上面Source,Sink,Flow代表具备线性步骤linear-stage的流图,属于最基础的组件,可以用来构建数据处理链条。而Fan-In合并型,Fan-Out扩散型则具备多个输入或输出端口,可以用来构建更复杂的数据流图。我们可以用以上这些基础Graph来构建更复杂的复合流图,而这些复合流图又可以被重复利用去构建更复杂的复合流图。下面就是一些常见的复合流图:
注意上面的Composite Flow(from Sink and Source)可以用Flow.fromSinkAndSource函数构建:
def fromSinkAndSource[I, O](sink: Graph[SinkShape[I], _], source: Graph[SourceShape[O], _]): Flow[I, O, NotUsed] = fromSinkAndSourceMat(sink, source)(Keep.none)
这个Flow从流向来说先Sink再Source是反的,形成的Flow上下游间无法协调,即Source端终结信号无法到达Sink端,因为这两端是相互独立的。我们必须用CoupledTermination对象中的fromSinkAndSource函数构建的Flow来解决这个问题:
/** * Allows coupling termination (cancellation, completion, erroring) of Sinks and Sources while creating a Flow them them. * Similar to `Flow.fromSinkAndSource` however that API does not connect the completion signals of the wrapped stages. object CoupledTerminationFlow { @deprecated("Use `Flow.fromSinkAndSourceCoupledMat(..., ...)(Keep.both)` instead", "2.5.2") def fromSinkAndSource[I, O, M1, M2](in: Sink[I, M1], out: Source[O, M2]): Flow[I, O, (M1, M2)] = Flow.fromSinkAndSourceCoupledMat(in, out)(Keep.both)
从上面图列里的Composite BidiFlow可以看出:一个复合Graph的内部可以是很复杂的,但从外面看到的只是简单的几个输入输出端口。不过Graph内部构件之间的端口必须按照功能逻辑进行正确的连接,剩下的就变成直接向外公开的界面端口了。这种机制支持了层级式的模块化组合方式,如下面的图示:
最后变成:
在DSL里我们可以用name( ??? )来分割模块:
val nestedFlow = Flow[Int].filter(_ != 0) // an atomic processing stage .map(_ - 2) // another atomic processing stage .named("nestedFlow") // wraps up the Flow, and gives it a name val nestedSink = nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow .named("nestedSink") // wrap it up // Create a RunnableGraph val runnableGraph = nestedSource.to(nestedSink)
在下面这个示范里我们自定义一个某种功能的流图模块:它有2个输入和3个输出。然后我们再使用这个自定义流图模块组建一个完整的闭合流图:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import scala.collection.immutable object GraphModules { def someProcess[I, O]: I = O = i = i.asInstanceOf[O] case class TwoThreeShape[I, I2, O, O2, O3]( in1: Inlet[I], in2: Inlet[I2], out1: Outlet[O], out2: Outlet[O2], out3: Outlet[O3]) extends Shape { override def inlets: immutable.Seq[Inlet[_]] = in1 :: in2 :: Nil override def outlets: immutable.Seq[Outlet[_]] = out1 :: out2 :: out3 :: Nil override def deepCopy(): Shape = TwoThreeShape( in1.carbonCopy(), in2.carbonCopy(), out1.carbonCopy(), out2.carbonCopy(), out3.carbonCopy() //a functional module with 2 input 3 output def TwoThreeGraph[I, I2, O, O2, O3] = GraphDSL.create() { implicit builder = val balancer = builder.add(Balance[I](2)) val flow = builder.add(Flow[I2].map(someProcess[I2, O2])) TwoThreeShape(balancer.in, flow.in, balancer.out(0), balancer.out(1), flow.out) val closedGraph = GraphDSL.create() {implicit builder = import GraphDSL.Implicits._ val inp1 = builder.add(Source(List(1,2,3))).out val inp2 = builder.add(Source(List(10,20,30))).out val merge = builder.add(Merge[Int](2)) val mod23 = builder.add(TwoThreeGraph[Int,Int,Int,Int,Int]) inp1 ~ mod23.in1 inp2 ~ mod23.in2 mod23.out1 ~ merge.in(0) mod23.out2 ~ merge.in(1) mod23.out3 ~ Sink.foreach(println) merge ~ Sink.foreach(println) ClosedShape object TailorGraph extends App { import GraphModules._ implicit val sys = ActorSystem("streamSys") implicit val ec = sys.dispatcher implicit val mat = ActorMaterializer() RunnableGraph.fromGraph(closedGraph).run() scala.io.StdIn.readLine() sys.terminate()这个自定义的TwoThreeGraph是一个复合的流图模块,是可以重复使用的。注意这个~ 符合的使用:akka-stream只提供了对预设定Shape作为连接对象的支持如:
def ~ [Out](junction: UniformFanInShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~ [Out](junction: UniformFanOutShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~ [Out](flow: FlowShape[T, Out])(implicit b: Builder[_]): PortOps[Out] = {...} def ~ (to: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), b.add(to).in) def ~ (to: SinkShape[T])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), to.in) ...所以对于我们自定义的TwoThreeShape就只能使用直接的端口连接了:
def ~ [U : T](to: Inlet[U])(implicit b: Builder[_]): Unit = b.addEdge(importAndGetPort(b), to)以上的过程显示:通过akka的GraphDSL,对复合型Graph的构建可以实现形象化,大部分工作都在如何对组件之间的端口进行连接。我们再来看个较复杂复合流图的构建过程,下面是这个流图的图示:
可以说这是一个相对复杂的数据处理方案,里面甚至包括了数据流回路(feedback)。无法想象如果用纯函数数据流如scalaz-stream应该怎样去实现这么复杂的流程,也可能根本是没有解决方案的。但用akka GraphDSL可以很形象的组合这个数据流图;
import GraphDSL.Implicits._ RunnableGraph.fromGraph(GraphDSL.create() { implicit builder = val A: Outlet[Int] = builder.add(Source.single(0)).out val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2)) val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1)) val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2)) val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2)) val G: Inlet[Any] = builder.add(Sink.foreach(println)).in C ~ F A ~ B ~ C ~ F B ~ D ~ E ~ F E ~ G ClosedShape })另一个端口连接方式的版本如下:
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder = val B = builder.add(Broadcast[Int](2)) val C = builder.add(Merge[Int](2)) val E = builder.add(Balance[Int](2)) val F = builder.add(Merge[Int](2)) Source.single(0) ~ B.in; B.out(0) ~ C.in(1); C.out ~ F.in(0) C.in(0) ~ F.out B.out(1).map(_ + 1) ~ E.in; E.out(0) ~ F.in(1) E.out(1) ~ Sink.foreach(println) ClosedShape })如果把上面这个复杂的Graph切分成模块的话,其中一部分是这样的:
这个开放数据流复合图可以用GraphDSL这样构建:
val partial = GraphDSL.create() { implicit builder = val B = builder.add(Broadcast[Int](2)) val C = builder.add(Merge[Int](2)) val E = builder.add(Balance[Int](2)) val F = builder.add(Merge[Int](2)) C ~ F B ~ C ~ F B ~ Flow[Int].map(_ + 1) ~ E ~ F FlowShape(B.in, E.out(1)) }.named("partial")// Convert the partial graph of FlowShape to a Flow to get // access to the fluid DSL (for example to be able to call .filter()) val flow = Flow.fromGraph(partial) // Simple way to create a graph backed Source val source = Source.fromGraph( GraphDSL.create() { implicit builder = val merge = builder.add(Merge[Int](2)) Source.single(0) ~ merge Source(List(2, 3, 4)) ~ merge // Exposing exactly one output port SourceShape(merge.out) // Building a Sink with a nested Flow, using the fluid DSL val sink = { val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow") nestedFlow.to(Sink.head) // Putting all together val closed = source.via(flow.filter(_ 1)).to(sink)和scalaz-stream不同的还有akka-stream的运算是在actor上进行的,除了大家都能对数据流元素进行处理之外,akka-stream还可以通过actor的内部状态来维护和返回运算结果。这个运算结果在复合流图中传播的过程是可控的,如下图示:返回运算结果是通过viaMat, toMat来实现的。简写的via,to默认选择流图左边运算产生的结果。
12842.html
cjava
相关文章
- python开发的简单窗口界面的倒计时界面详解编程语言
- 如何用jQuery禁用浏览器的前进后退按钮详解编程语言
- JavaMail 发送邮件类详解编程语言
- android 异步加载图片详解编程语言
- java Struts2和Freemarker整合应用静态页面详解编程语言
- java使用itext按页码拆分pdf文件详解编程语言
- js子窗体调用父窗体函数的问题详解编程语言
- java基础学习总结——static关键字详解编程语言
- .NET Core 2.0 Preview 2为开发人员带来改进详解编程语言
- [linux] C语言Linux系统编程-做成守护进程详解编程语言
- Python条件控制详解编程语言
- Spring Aop 源码实现原理分析详解编程语言
- Android AIDL 一探究竟详解编程语言
- Java 动态代理及 RPC 框架介绍详解编程语言
- WeakHashMap实现原理及源码分析详解编程语言
- java三大框架项目和Redis组合使用详解编程语言
- Struts标签的组合使用小例详解shasha 2018年12月11日 编程语言 592 0 编程语言
- Python参数组合详解编程语言
- 图片在DIV中垂直居中的显示方法详解编程语言
- JVM内存结构详解编程语言
- Java NIO(2):NIO概述详解编程语言
- Java 数组的三种创建方法详解编程语言
- python-set详解编程语言
- RH_STRUC_GET获取组织数据注意事项详解编程语言
- C++类的继承中构造函数和析构函数调用顺序例子详解编程语言
- javascript jssdk微信上传一张图片的方法详解编程语言
- css滚动条占空间的解决方法详解编程语言
- php转java 系列1 Spring boot安装教程详解编程语言
- 【ACM】组合数 – 全排列详解编程语言
- django的数据库操作回顾详解编程语言