zl程序教程

您现在的位置是:首页 >  其他

当前栏目

泛函编程(36)-泛函Stream IO:IO数据源-IO Source & Sink

amp编程IO stream source 数据源 36 泛函
2023-09-14 08:57:17 时间

 上期我们讨论了IO处理过程:Process[I,O]。我们说Process就像电视信号盒子一样有输入端和输出端两头。Process之间可以用一个Process的输出端与另一个Process的输入端连接起来形成一串具备多项数据处理功能的完整IO过程。但合成的IO过程两头输入端则需要接到一个数据源,而另外一端则可能会接到一个数据接收设备如文件、显示屏等。我们在这篇简单地先介绍一下IO数据源Source和IO数据接收端Sink。

我们先用一个独立的数据类型来代表数据源Source进行简单的示范说明,这个类型与Process类型没有任何关系:


 1 import ProcessLib._

 2 object SourceSink {

 3 trait Source[O] { //以下helper function都是把Source当作O类的List处理

 4 def | [O2](p: Process[O,O2]): Source[O2] //粘接一个Process p. 向其输入O

 5 def filter(f: O = Boolean): Source[O] = this | Process.filter(f) //向p输入O

 6 def map[O2](f: O = O2): Source[O2] = this | Process.lift(f)

 7 def take(n: Int): Source[O] = this | Process.take(n) //截取前n个O

 8 def takeWhile(f: O = Boolean): Source[O] = this | Process.takeWhile(f)

 9 def drop(n: Int): Source[O] = this | Process.drop(n) //跳过前n个O

10 def dropWhile(f: O = Boolean): Source[O] = this | Process.dropWhile(f) 

11 }

从以上trait可以看到:Source的工作原理就是把一个Process的输入黏贴到Source的输出端。我们可以用这个 | 把一串Process粘到Source的输出,如:Src.proc1.proc2.proc3。不过我们得先把proc1,proc2,proc3定义成Source组件函数,因为Source是一个完全独立的类型。

我们再来看看一个Source特殊案例:


1 case class ResourceR[R,I,O]( //Source的一个只读资源案例

2 acquire: IO[R], //资源使用门户 resource handle

3 release: R = IO[Unit], //完成使用资源后的清理函数

4 step: R = IO[Option[I]], //资源内容读取函数

5 trans: Process[I,O] //输出方式

6 ) extends Source[O] {

7 def | [O2](p: Process[O,O2]): Source[O2] = //实现抽象函数

8 ResourceR(acquire,release,step,trans | p) //每次输入都产生一个ResourceR.它的trans与p进行管道对接

9 }

这是个只读的数据源。我们看到所有的动作都被包嵌在IO类型里,这样可以把副作用的产生延后到一些Source Interpreter来运算。这里我们只要用最简单的IO来说明就可以了:


 1 trait IO[A] { self = 

 2 def run: A

 3 def map[B](f: A = B): IO[B] =

 4 new IO[B] { def run = f(self.run) }

 5 def flatMap[B](f: A = IO[B]): IO[B] =

 6 new IO[B] { def run = f(self.run).run }

 8 object IO {

 9 def unit[A](a: = A): IO[A] = new IO[A] { def run = a }

10 def flatMap[A,B](fa: IO[A])(f: A = IO[B]) = fa flatMap f

11 def apply[A](a: = A): IO[A] = unit(a) // syntax for IO { .. }

12 }

这个IO类型我们在前面的讨论里曾经练习过。

现在我们来看看一个文件读取的ResourceR例子:


 1 object Source {

 2 import java.io._

 3 def lines(fileName: String): Source[String] = //从文件fileName里读取String

 4 ResourceR( //创建一个Source的实例

 5 IO {io.Source.fromFile(fileName) }, //资源

 6 (src: io.Source) = IO { src.close }, //清理

 7 (src: io.Source) = IO { //读取

 8 lazy val iterator = src.getLines

 9 if (iterator.hasNext) Some(iterator.next) else None //读完返回None

10 },

11 Process.passUnchanged) //Process[I,I],读什么输入什么

12 }

现在我们可以这样写一段程序了:


1 Source.lines("input.txt").count.exists{_ = 40000 }

2 // res0: ch15.SourceSink.Source[Boolean] = ResourceR(ch15.SourceSink$IO$$anon$

3 //| 3@762efe5d, function1 , function1 ,Await( function1 ))

噢,记住把count和exists放到Source trait里:


1 def exists(f: O = Boolean): Source[Boolean] = this | Process.exists(f)

2 def count: Source[Int] = this | Process.count

上面的表达式可以说还只是IO过程的描述。实际副作用产生是在interpreter里:


1 def collect: IO[IndexedSeq[O]] = { //读取数据源返回IO[IndexedSeq[O]], 用IO.run来实际运算

 2 def tryOr[A](a: = A)(cleanup: IO[Unit]): A = //运算表达式a, 出现异常立即清理现场

 3 try a catch {case e: Exception = cleanup.run; throw e}

 4 @annotation.tailrec //这是个尾递归算法,根据trans状态

 5 def go(acc: IndexedSeq[O], cleanup: IO[Unit], step: IO[Option[I]], trans: Process[I,O]): IndexedSeq[O] =

 6 trans match {

 7 case Halt() = cleanup.run; acc //停止状态,清理现场

 8 case Emit(out,next) = go(tryOr(out +: acc)(cleanup), cleanup, step, next) //积累acc

 9 case Await(iproc) = tryOr(step.run)(cleanup) match {

10 case None = cleanup.run; acc //读完了清理现场

11 case si = go(acc,cleanup,step,iproc(si)) //读入元素作为Process输入来改变Process状态

14 acquire map {res = go(IndexedSeq(),release(res),step(res),trans)} //开始读取

15 }

注意:无论读取完成或中途失败退出都会导致现场清理以防止资源漏出。可以推断这个interpreter还是很安全的。

与Source同样,我们还是用一个独立的类型Sink来代表数据接收端进行简单说明:


1 trait Sink[I] {

2 def |[I2](p: Process[I2,I]): Sink[I2] //p的输出接到Sink的输入

3 def filter(f: I = Boolean): Sink[I] = this | Process.filter(f) //从p接收I

4 def map[I2](f: I2 = I): Sink[I2] = this | Process.lift(f) //将接收的I2变成I

5 def take(n: Int): Sink[I] = this | Process.take(n) //从p接收前n个I

6 def takeWhile(f: I = Boolean): Sink[I] = this | Process.takeWhile(f)

7 def drop(n: Int): Sink[I] = this | Process.drop(n) //过滤掉首n个I

8 def dropWhile(f: I = Boolean): Sink[I] = this | Process.dropWhile(f)

9 }

这和Source trait及其相似。注意和Process连接是反向的:由p指向Sink。

同样,一个只写的资源实例如下:


1 case class ResourceW[R,I,I2]( //只写资源

2 acquire: IO[R], //资源使用门户, resource handle

3 release: R = IO[Unit], //清理函数

4 rcvf: R = (I2 = IO[Unit]), //接收方式

5 trans: Process[I,I2] //处理过程

6 ) extends Sink[I] {

7 def |[I2](p: Process[I2,I]): Sink[I2] =

8 ResourceW(acquire,release,rcvf,p | trans) //制造一个ResourceW实例,由p到trans

9 }

这个也和ResourceR相似。还是与Process连接方式是反方向的:由p到trans。

以下是一个向文件写入的Sink组件:


 1 object Sink {

 2 import java.io._

 3 def file(fileName: String, append: Boolean = false): Sink[String] = //结果是Sink[String]。必须用interpreter来运算

 4 ResourceW( //是一个ResourceW实例

 5 IO {new FileWriter(fileName,append) }, //创建FileWriter

 6 (w: FileWriter) = IO {w.close}, //释放FileWriter

 7 (w: FileWriter) = (s: String) = IO {w.write(s)}, //写入

 8 Process.passUnchanged //不处理写入数据

10 }

在学习过程中发现,独立于Process类型的Source,Sink类型使IO算法的表达式类型的集成很困难。这也限制了组件的功能。我们无法实现泛函编程简洁高雅的表达形式。在下面的讨论中我们会集中精力分析具备数据源功能的Process,希望在表达方式上能有所进步。




java面试题(十一)IO流篇 3.1 介绍一下Java中的IO流 IO(Input Output)用于实现对数据的输入与输出操作,Java把不同的输入/输出源(键盘、文件、网络等)抽象表述为流(Stream)。流是从起源到接收的有序数据,有了它程序就可以采用同一方式访问不同的输入/输出源。 ● 按照数据流向,可以将流分为输入流和输出流,其中输入流只能读取数据、不能写入数据,而输出流只能写入数据、不能读取数据。 ● 按照数据类型,可以将流分为字节流和字符流,其中字节流操作的数据单元是8位的字节,而字符流操作的数据单元是16位的字符。 ● 按照处理功能,可以将流分为节点流和处理流,其中节点流可以直接从/向一个特定的I