zl程序教程

您现在的位置是:首页 >  其他

当前栏目

FunDA(5)- Reactive Streams:Play with Iteratees详解编程语言

编程语言 详解 with Play Streams reactive FunDA
2023-06-13 09:20:38 时间

  FunDA的设计目标就是把后台数据库中的数据搬到内存里,然后进行包括并行运算的数据处理,最后可能再对后台数据库进行更新。如果需要把数据搬到内存的话,那我们就必须考虑内存是否能一次性容纳所有的数据,有必要配合数据处理分部逐步读入,这就是Reactive Stream规范主要目的之一。所以在设计FunDA的数据源(Source)之前必须要考虑实现reacive-data-stream。Slick 3.x版在功能上的突破之一就是实现了对Reactive-Stream API的支持。遗憾的是新版的Slick并没有提供针对data-stream的具体操作函数,官方文档提到可以通过akka-stream或者Play-Iteratee-Reactive-Stream来实现对data-stream的处理操作。Slick是通过db.stream构建一个DatabasePublisher类型来实现Reactive-Stream接口的。Play则提供了stream.IterateeStreams.publisherToEnumerator(SlickDatabasePubliser)转换函数,能够把DatabasePublisher转成Reactive-Stream的数据源(Source)。Play是通过Iteratee来实现对Reactive-Stream的处理操作。我们就在这节讨论一下有关Iteratee的一些原理。在示范前我们必须在build.sbt中增加依赖: com.typesafe.play % play-iteratees-reactive-streams_2.11 % 2.6.0 。所谓Reactive从字面来解释就是互动。Reacive-Stream是指数据产生方(producer)和数据使用方(consumer)之间的互动。大体上是producer通知consumer数据准备完毕可以读取、consumer通知producer读取数据的具体状态,提示是否可以发送数据。下面我们就把Reactive-Stream的基础原理给大家介绍一下:一般我们需要从一个Stream里获取数据时,可以用下面这个界面的read:

trait InputStream { 

 def read(): Byte 

}

这是一种典型的同步操作:read会占用线程直到获取这个Byte。我们可以用callback函数形式来解决这个问题:把一个读取函数传给目标Stream,以一种被动形式来获取这个Byte: 

trait InputStreamHandler { 

 def onByte(byte: Byte) 

}

我们想办法把onByte传给Stream作为一种callback函数。当Stream有了Byte后调用这个onByte函数,在这个onByte函数里是收到Byte后应该进行的运算。不过收到这个Byte代表我们程序状态的一个转变,所以我们可以把上面这个界面写成函数式的:

trait InputStreamHandler { 

 def onByte(byte: Byte): InputStreamHandler 

}

由于状态可能转变,所以我们把当前这个有变化的对象传出来。下面是一个界面实现的例子:

class consume(data: Seq[Byte]) extends InputStreamHandler { 

 def onByte(byte: Byte) = new consume(data :+ byte) 

}

这个例子里我们把读取的Byte汇集到一个Seq里。但是假如Stream准备好了数据后调用我们的callback函数onByte,而我们无法立即完成函数内的运算,导致调用方线程阻塞,影响整个Stream的运转。我们可以用Future来解决这个问题:

trait InputStreamHandle { 

 def onByte(byte: Byte): Future[InputStreamHandle] 

}

这样调用方可以立即返回了。不过,调用方如何把数据发送状态通知数据读取方呢?比如已经完成所有数据发送。我们需要把调用方返回的数据再细化点:

trait Input[+E] 

case class EL[E](e: E) extends Input[E] 

case object EOF extends Input[Nothing] 

case object Empty extends Input[Nothing]

现在这个返回数据是个Input[E]了,是带状态的。返回数据具体类型EL,EOF,Empty从字面就可以理解它们代表的状态了。我们的界面变成了这样:

trait InputStreamHandler[E] { 

 def onInput(input: Input[E]): Future[InputStreamHandler[E]] 

}

界面实现例子变成下面这样:

class consume(data: Seq[Byte]) extends InputStreamHandler[Byte] { 

 def onInput(input: Input[Byte]) = input match { 

 case EL(byte) = Future.successful(new consume(data :+ byte)) 

 case _ = Future.successful(this) 

}

上面这个例子中返回Future很是别扭,我们可以这样改善界面InputStreamHandler定义:

trait InputStreamHandler[E] { 

 def onByte[B](cont: (Input[E] = InputStreamHandler[E]) = Future[B]): Future[B] 

}

现在我们可以这样实现那个例子:

class consume(data: Seq[Byte]) extends InputStreamHandler[Byte] { 

 def onByte[B](cont: (Input[Byte] = InputStreamHandler[Byte]) = Future[B]) = cont { 

 case EL(byte) = new consume(data :+ byte) 

 case _ = this 

}

现在用起来顺手多了吧。从上面这些例子中我们可以得出一种“推式”流模式(push-model-stream): 由目标stream向读取方推送数据。但Reactive-Stream应该还具备反向通告机制,比如读取方如何通知目标stream已经完成读取操作或者暂时无法再接受数据、又或者可以接受数据了。

现在我们对Reactive-Streams有了个大概的印象:这个模式由两方组成,分别是:数据源(在push-model中就是数据发送方)以及数据消耗方,分别对应了Iteratee模式的Enumerator和Iteratee。也就是说:Enumerator负责发送,Iteratee负责接收。用Iteratee实现Reactive-Streams时必须实现Enumerator和Iteratee之间的双向通告机制。实际上Iteratee描述了如何消耗Enumerator传过来的数据:比如把数据串接起来(concat)或者相加汇总等。在消耗数据的过程中Iteratee也必须负责与Enumerator沟通以保证数据传输的顺利进行。那么Iteratee又应该如何与Enumerator沟通呢?为了实现这种沟通功能,我们再设计一个trait:

trait Step[E,+A] 

case class Done[+A,E](a: A, remain: Input[E]) extends Step[E,A] 

case class Cont[E,+A](k: Input[E] = InputStreamHandler[E,A]) extends Step[E,A] 

case class Error[E](msg: String, loc:Input[E]) extends Step[E,Nothing]

Step代表Iteratee的操作状态:Done代表完成,返回运算结果A,remain是剩余的输入、Cont代表可以用k来获取数据、Error返回错误信息msg以及出错地点loc。现在我们可以重新定义InputStreamHandler:

trait InputStreamHandler[E,A] { 

 def onInput[A](step: Step[E,A] = Future[A]): Future[A] 

}

界面实现例子Consume如下:

class Consume(data: Seq[Byte]) extends InputStreamHandler[Byte,Seq[Byte]] { 

 def onInput(step: Step[Byte,Seq[Byte]] = Future[Seq[Byte]]) = step(Cont { 

 case EL(byte) = new Consume(data :+ byte) 

 case EOF = new InputStreamHandler[Byte,Seq[Byte]] { 

 def onInput(step: Step[Byte,Seq[Byte]] = Future[Seq[Byte]]) = step(Done(data,Empty)) 

 case Empty = this 

}

这个版本最大的区别在于当收到Stream发送的EOF信号后返回Done通知完成操作,可以使用运算结果data了。这个InputStreamHandler就是个Iteratee,它描述了如何使用(消耗)接收到的数据。我们可以把界面定义命名为下面这样:

trait Iteratee[E,+A] { 

 def onInput[B](folder: Step[E,A] = Future[B]): Future[B] 

}

实际上Iteratee模式与下面这个函数很相像:

def foldLeft[F[_],A,B](ax: F[A])(z: B)(f: (B,A) = B): B 

F[A]是个数据源,我们不需要理会它是如何产生及发送数据的,我们只关注如何去处理收到的数据。在这个函数里(B,A)= B就是具体的数据消耗方式。foldLeft代表了一种推式流模式(push-model-stream)。至于如何产生数据源,那就是Enumerator要考虑的了。

 好了,我们先看看Iteratee正式的类型款式:Iteratee[E,A],E是数据元素类型,A是运算结果类型。trait Iteratee 有一个抽象函数:

/** 

 * Computes a promised value B from the state of the Iteratee. 

 * The folder function will be run in the supplied ExecutionContext. 

 * Exceptions thrown by the folder function will be stored in the 

 * returned Promise. 

 * If the folder function itself is synchronous, its better to 

 * use `pureFold()` instead of `fold()`. 

 * @param folder a function that will be called on the current state of the iteratee 

 * @param ec the ExecutionContext to run folder within 

 * @return the result returned when folder is called 

 def fold[B](folder: Step[E, A] = Future[B])(implicit ec: ExecutionContext): Future[B]

不同功能的Iteratee就是通过定义不同的fold函数构成的。fold是个callback函数提供给Enumerator。folder的输入参数Step[E,A]代表了当前Iteratee的三种可能状态: 

object Step { 

 case class Done[+A, E](a: A, remaining: Input[E]) extends Step[E, A] 

 case class Cont[E, +A](k: Input[E] = Iteratee[E, A]) extends Step[E, A] 

 case class Error[E](msg: String, input: Input[E]) extends Step[E, Nothing] 

}

当状态为Cont[E,A]时,Enumerator就会用这个k: Input[E]= Iteratee[E,A]函数把Input[E]推送给Iteratee。我们从一个简单的Enumerator就可以看出:

 

 /** 

 * Creates an enumerator which produces the one supplied 

 * input and nothing else. This enumerator will NOT 

 * automatically produce Input.EOF after the given input. 

 def enumInput[E](e: Input[E]) = new Enumerator[E] { 

 def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = 

 i.fold { 

 case Step.Cont(k) = eagerFuture(k(e)) 

 case _ = Future.successful(i) 

 }(dec) 

 }

 

或者:

 

/** 

 * Create an Enumerator from a set of values 

 * Example: 

 * {{{ 

 * val enumerator: Enumerator[String] = Enumerator("kiki", "foo", "bar") 

 * }}} 

 def apply[E](in: E*): Enumerator[E] = in.length match { 

 case 0 = Enumerator.empty 

 case 1 = new Enumerator[E] { 

 def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = i.pureFoldNoEC { 

 case Step.Cont(k) = k(Input.El(in.head)) 

 case _ = i 

 case _ = new Enumerator[E] { 

 def apply[A](i: Iteratee[E, A]): Future[Iteratee[E, A]] = enumerateSeq(in, i) 

----- 

private def enumerateSeq[E, A]: (Seq[E], Iteratee[E, A]) = Future[Iteratee[E, A]] = { (l, i) = 

 l.foldLeft(Future.successful(i))((i, e) = 

 i.flatMap(it = it.pureFold { 

 case Step.Cont(k) = k(Input.El(e)) 

 case _ = it 

 }(dec))(dec)) 

 }

我们可以通过定义fold函数来获取不同功能的Iteratee。下面就是一个直接返回恒量值Iteratee的定义过程:

val doneIteratee = new Iteratee[String,Int] { 

 def fold[B](folder: Step[String,Int] = Future[B])(implicit ec: ExecutionContext): Future[B] = { 

 folder(Step.Done(21,Input.EOF)) 

}

这个Iteratee不会消耗任何输入,直接就返回21。实际上我们可以直接用Done.apply来构建这个doneIteratee:

val doneIteratee = Done[String,Int](21,Input.Empty)

我们也可以定义一个只消耗一个输入元素的Iteratee:

val consumeOne = new Iteratee[String,String] { 

 def fold[B](folder: Step[String,String] = Future[B])(implicit ec: ExecutionContext): Future[B] = { 

 folder(Step.Cont { 

 case Input.EOF = Done("OK",Input.EOF) 

 case Input.Empty = this 

 case Input.El(e) = Done(e,Input.EOF) 

}

同样,我们也可以用Cont构建器来构建这个consumeOne:

val consumeOne1 = Cont[String,String](in = Done("OK",Input.EOF))

从上面这些例子里我们可以推敲folder函数应该是在Enumerator里定义的,看看下面这个Enumerator例子:

val enumerator = new Enumerator[String] { 

 // some messages 

 val items = 1 to 10 map (i = i.toString) 

 var index = 0 

 override def apply[A](i: Iteratee[String, A]): 

 Future[Iteratee[String, A]] = { 

 i.fold( 

 // the folder 

 step = { 

 step match { 

 // iteratee is done, so no more messages 

 // to send 

 case Step.Done(result, remaining) = { 

 println("Step.Done") 

 Future(i) 

 // iteratee can consume more 

 case Step.Cont(k: (Input[String] = Iteratee[String, A])) 

 = { 

 println("Step.Cont") 

 // does enumerator have more messages ? 

 if (index items.size) { 

 val item = items(index) 

 println(s"El($item)") 

 index += 1 

 // get new state of iteratee 

 val newIteratee = k(Input.El(item)) 

 // recursive apply 

 apply(newIteratee) 

 } else { 

 println("EOF") 

 Future(k(Input.EOF)) 

 // iteratee is in error state 

 case Step.Error(message, input: Input[String]) = { 

 println("Step.Error") 

 Future(i) 

 }

下面我们示范一个完整的例子: 

val userIteratee = new Iteratee[String, Unit] { 

 override def fold[B](folder: (Step[String, Unit]) = Future[B]) 

 (implicit ec: ExecutionContext): Future[B] = { 

 // accumulator 

 val buffer: ListBuffer[String] = ListBuffer() 

 // the step function 

 def stepFn(in: Input[String]): Iteratee[String, Unit] = { 

 in match { 

 case Input.Empty = this 

 case Input.EOF = Done({ 

 println(s"Result ${buffer.mkString("--")}") 

 }, Input.Empty) 

 case Input.El(el) = { 

 buffer += el 

 Cont(stepFn) 

 // initial state - iteratee ready to accept input 

 folder(Step.Cont(stepFn)) 

} // userIteratee : play.api.libs.iteratee.Iteratee[String,Unit] = [email protected] 

val usersEnum = Enumerator("Tiger","John","Jimmy","Kate","Chris") 

 // usersEnum : play.api.libs.iteratee.Enumerator[String] = [email protected] 

(usersEnum | userIteratee) // Result Tiger--John--Jimmy--Kate--Chris res0: scala.concurrent.Future[Unit] = Success(())

Enumerator usersEnum把输入推送给userIteratee、userIteratee在完成时直接把它们印了出来。在play-iterate库Iteratee对象里有个fold函数(Iteratee.fold)。这是个通用的函数,可以轻松实现上面这个userIteratee和其它的汇总功能Iteratee。Iteratee.fold函数款式如下: 

def fold[E, A](state: A)(f: (A, E) = A): Iteratee[E, A]

我们可以用这个fold函数来构建一个相似的Iteratee:

val userIteratee2 = Iteratee.fold(List[String]())((st, el:String) = st :+ el) 

 // userIteratee2 : play.api.libs.iteratee.Iteratee[String,List[String]] = Cont( function1 ) 

(usersEnum | userIteratee2).foreach {x = println(x)} 

 //| List(Tiger, John, Jimmy, Kate, Chris)

下面是另外两个用fold函数的例子:

val inputLength: Iteratee[String,Int] = { 

 Iteratee.fold[String,Int](0) { (length, chars) = length + chars.length } 

 // inputLength : play.api.libs.iteratee.Iteratee[String,Int] = Cont( function1 ) 

Await.result((usersEnum | inputLength),Duration.Inf) 

 // res1: Int = 23 

val consume: Iteratee[String,String] = { 

 Iteratee.fold[String,String]("") { (result, chunk) = result ++ chunk } 

 // consume : play.api.libs.iteratee.Iteratee[String,String] = Cont( function1 ) 

Await.result((usersEnum | consume),Duration.Inf) 

 // res2: String = TigerJohnJimmyKateChris

从以上的练习里我们基本摸清了定义Iteratee的两种主要模式:

1、构建新的Iteratee,重新定义fold函数,如上面的userIteratee及下面这个上传大型json文件的例子:

object ReactiveFileUpload extends Controller { 

 def upload = Action(BodyParser(rh = new CsvIteratee(isFirst = true))) { 

 request = 

 Ok("File Processed") 

case class CsvIteratee(state: Symbol = Cont, input: Input[Array[Byte]] = Empty, lastChunk: String = "", isFirst: Boolean = false) extends Iteratee[Array[Byte], Either[Result, String]] { 

 def fold[B]( 

 done: (Either[Result, String], Input[Array[Byte]]) = Promise[B], 

 cont: (Input[Array[Byte]] = Iteratee[Array[Byte], Either[Result, String]]) = Promise[B], 

 error: (String, Input[Array[Byte]]) = Promise[B] 

 ): Promise[B] = state match { 

 case Done = 

 done(Right(lastChunk), Input.Empty) 

 case Cont = cont(in = in match { 

 case in: El[Array[Byte]] = { 

 // Retrieve the part that has not been processed in the previous chunk and copy it in front of the current chunk 

 val content = lastChunk + new String(in.e) 

 val csvBody = 

 if (isFirst) 

 // Skip http header if it is the first chunk 

 content.drop(content.indexOf("/r/n/r/n") + 4) 

 else content 

 val csv = new CSVReader(new StringReader(csvBody), ;) 

 val lines = csv.readAll 

 // Process all lines excepted the last one since it is cut by the chunk 

 for (line - lines.init) 

 processLine(line) 

 // Put forward the part that has not been processed 

 val last = lines.last.toList.mkString(";") 

 copy(input = in, lastChunk = last, isFirst = false) 

 case Empty = copy(input = in, isFirst = false) 

 case EOF = copy(state = Done, input = in, isFirst = false) 

 case _ = copy(state = Error, input = in, isFirst = false) 

 case _ = 

 error("Unexpected state", input) 

 def processLine(line: Array[String]) = WS.url("http://localhost:9200/affa/na/").post( 

 toJson( 

 Map( 

 "date" - toJson(line(0)), 

 "trig" - toJson(line(1)), 

 "code" - toJson(line(2)), 

 "nbjours" - toJson(line(3).toDouble) 

}

二、直接定义Cont:

/** 

 * Create an iteratee that takes the first element of the stream, if one occurs before EOF 

 def head[E]: Iteratee[E, Option[E]] = { 

 def step: K[E, Option[E]] = { 

 case Input.Empty = Cont(step) 

 case Input.EOF = Done(None, Input.EOF) 

 case Input.El(e) = Done(Some(e), Input.Empty) 

 Cont(step) 

 }

及:

def fileIteratee(file: File): Iteratee[String, Long] = { 

 val helper = new FileNIOHelper(file) 

 def step(totalLines: Long)(in: Input[String]): Iteratee[String, Long] = in match { 

 case Input.EOF | Input.Empty = 

 if(debug) println("CLOSING CHANNEL") 

 helper.close() 

 Done(totalLines, Input.EOF) 

 case Input.El(line) = 

 if(debug) println(line) 

 helper.write(line) 

 Cont[String, Long](i = step(totalLines+1)(i)) 

 // initiates iteration by initialize context and first state (Cont) and launching iteration 

 Cont[String, Long](i = step(0L)(i)) 

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

12874.html

cjava