泛函编程(38)-泛函Stream IO:IO Process in action
在前面的几节讨论里我们终于得出了一个概括又通用的IO Process类型Process[F[_],O]。这个类型同时可以代表数据源(Source)和数据终端(Sink)。在这节讨论里我们将针对Process[F,O]的特性通过一些应用实例来示范它的组合性(composibility)和由数据源到接收终端IO全过程的功能完整性。
我们已经在前面的讨论中对IO Process的各种函数组合进行了调研和尝试,现在我们先探讨一下数据源设计方案:为了实现资源使用的安全性和IO程序的可组合性,我们必须保证无论在完成资源使用或出现异常时数据源都能得到释放,同时所有副作用的产生都必须延后直至Interpreter开始运算IO程序时。
我们先试试一个读取文件字符内容的组件:
1 import java.io.{BufferedReader, FileReader} 2 def readFile(fileName: String): Process[IO,String] = 3 await[IO,BufferedReader,String](IO{new BufferedReader(new FileReader(fileName))}){ 4 case Left(err) = Halt(err) 5 case Right(r) = { 6 lazy val next: Process[IO,String] = await(IO{r.readLine}) { 7 case Left(err2) = await(IO{r.close}){_ = Halt[IO,String](err2)}() 8 case Right(line) = emit(line, next) 9 }() 10 next 12 }()
注意以下几个问题:首先是所有的IO动作都是通过await函数来实现的,再就是所有产生副作用的语句都被包嵌在IO{}里,这是典型的延迟运算。我们先来看看这个await函数:
1 def await[F[_],A,O](req: F[A])( 2 rcvfn: Either[Throwable,A] = Process[F,O] = (a: Either[Throwable,A]) = Halt[F,O](End)) 3 (fallback: Process[F,O] = Halt[F,O](End), 4 onError: Process[F,O] = Halt[F,O](End)): Process[F,O] = Await(req,rcvfn,fallback,onError)
req是个F[A],在例子里就是IO[A]。把这个函数精简表述就是: await(IO(iorequest))(ioresult = Process)。从这个函数的款式可以看出:在调用await函数的时候并不会产生副作用,因为产生副作用的语句是包嵌在IO{}里面的。我们需要一个interpreter来运算这个IO{...}产生副作用。await的另一个输入参数是 iorequest = Process:iorequest是运算iorequest返回的结果:可以是A即String或者是个异常Exception。所以我们可以用PartialFunction来表达:
{ case Left(err) = Process[IO,???]; case Right(a) = Process[IO,???] }
我们用PartialFunction来描述运算iorequest后返回正常结果或者异常所对应的处理办法。
上面的例子readFile就是用这个await函数来打开文件: IO {new BufferedReader(new FileReader(fileName))}
读取一行:IO {r.readLine},如果读取成功则发送emit出去: case Right(line) = emit(line,next),
如果出现异常则关闭文件:case Left(err) = IO {r.close},注意我们会使用异常End来代表正常完成读取:
1 case object End extends Exception 2 case object Kill extends Exception
以上的Kill是强行终止信号。刚才说过,我们需要用一个interpreter来运算readFile才能正真产生期望的副作用如读写文件。现在我们就来了解一个interpreter:
1 def collect[O](src: Process[IO,O]): IndexedSeq[O] = { 2 val E = java.util.concurrent.Executors.newFixedThreadPool(4) 3 def go(cur: Process[IO,O], acc: IndexedSeq[O]): IndexedSeq[O] = cur match { 4 case Halt(e) = acc 5 case Emit(os,ns) = go(ns, acc ++ os) 6 case Halt(err) = throw err 7 case Await(rq,rf,fb,fl) = 8 val next = 9 try rf(Right(unsafePerformIO(rq)(E))) 10 catch { case err: Throwable = rf(Left(err)) } 11 go(next, acc) 14 try go(src, IndexedSeq()) 15 finally E.shutdown 16 }
首先要注意的是这句:unsafePerformIO(rq)(E)。它会真正产生副作用。当Process[IO,O] src当前状态是Await的时候就会进行IO运算。运算IO产生的结果作为Await的rf函数输入参数,正如我们上面描述的一样。所以,运算IO{iorequest}就是构建一个Await结构把iorequest和转换状态函数rf放进去就像这样:
await(iorequest)(rf)(fb,fl) = Await(ioreques,rf,fb,fl),
然后返回到collect,collect看到src状态是Await就会运算iorequest然后再运行rf。
我们的下一个问题是如何把文件里的内容一行一行读入而不是一次性预先全部搬进内存,这样我们可以读一行,处理一行,占用最少内存。我们再仔细看看readFile的这个部分:
1 def readFile(fileName: String): Process[IO,String] = 2 await[IO,BufferedReader,String](IO{new BufferedReader(new FileReader(fileName))}){ 3 case Left(err) = Halt(err) 4 case Right(r) = { 5 lazy val next: Process[IO,String] = await(IO{r.readLine}) { 6 case Left(err2) = Halt[IO,String](err2) //await(IO{r.close}){_ = Halt[IO,String](err2)}() 7 case Right(line) = emit(line, next) 8 }() 9 next 11 }()
如果成功创建BufferedReader,运算IO产生Right(r)结果;运算IO{r.readLine}后返回最终结果next。next可能是Halt(err)或者Emit(line,next)。如果这样分析那么整个readFile函数也就会读入文件的第一行然后emit输出。记着泛函编程特点除了递归算法之外还有状态机器(state machine)方式的程序运算。在以上例子里的运算结果除输出值line外还有下一个状态next。再看看以下这个组件:
1 //状态进位,输出Process[F,O2] 2 final def drain[O2]: Process[F,O2] = this match { 3 case Halt(e) = Halt(e) //终止 4 case Emit(os,ns) = ns.drain //运算下一状态ns,输出 5 case Await(rq,rf,fb,cl) = Await(rq, rf andThen (_.drain)) //仍旧输出Await 6 }
这个drain组件实际上起到了一个移动状态的作用。如果我们这样写:readFile("myfile.txt").drain 那么在我们上面的例子里readFile返回Emit(line,next);drain接着readFile输出状态就会运算next,这样程序又回到readFile next的IO{r.readline}运算中了。如果我们在drain组件前再增加一些组件:
1 readFile("farenheit.txt").filter(line = !line.startsWith("#").map(line = line.toUpperCase).drain
那么我们就会得到读取一行字符;过滤起始为#的行;转成大写字符;返回再读一行交替循环这样的效果了。
很明显readFile实在太有针对性了。函数类型款式变的复杂可读性低。我们需要一种更概括的形式来实现泛函编程语言的简练而流畅表达形式。
我们首先应该把IO运算方式重新定义一下。用await函数显得太复杂:
1 //await 的精简表达形式 2 def eval[F[_],A](fa: F[A]): Process[F,A] = //运算F[A] 3 await[F,A,A](fa){ 4 case Left(err) = Halt(err) 5 case Right(a) = emit(a, Halt(End)) 6 }() 7 def evalIO[A](ioa: IO[A]) = eval[IO,A](ioa) //运算IO[A] 8 //确定终结的运算 9 def eval_[F[_],A,B](fa: F[A]): Process[F,B] = eval[F,A](fa).drain[B] //运算F[A]直到终止
如此运算IO只需要这样写:eval(iorequest),是不是精简多了。
再来一个通用安全的IO资源使用组件函数:
1 def resource[R,O]( //通用IO程序运算函数 2 acquire: IO[R])( //获取IO资源。open file 3 use: R = Process[IO,O])( //IO运算函数 readLine 4 release: R = Process[IO,O]): Process[IO,O] = //释放资源函数 close file 5 eval(acquire) flatMap { r = use(r).onComplete(release(r)) } 7 def resource_[R,O]( //与resource一样,只是运算realease直至终止 8 acquire: IO[R])( //获取IO资源。open file 9 use: R = Process[IO,O])( //IO运算函数 readLine 10 release: R = IO[Unit]): Process[IO,O] = //释放资源函数 close file 11 resource(acquire)(use)(release andThen (eval_[IO,Unit,O]))
以下是个套用resource组件的例子:从一个文件里逐行读出,在完成读取或出现异常时主动释放资源:
1 def lines(fileName: String): Process[IO,String] = //从fileName里读取 2 resource 3 {IO {io.Source.fromFile(fileName)}} //占用资源 4 {src = //使用资源。逐行读取 5 lazy val iter = src.getLines 6 def nextLine = if (iter.hasNext) Some(iter.next) else None //下一行 7 lazy val getLines: Process[IO,String] = //读取 8 eval(IO{nextLine}) flatMap { //运算IO 9 case None = Halt(End) //无法继续读取:完成或者异常 10 case Some(line) = emit(line, getLines) //读取然后发送 12 getLines 14 {src = eval_ (IO{src.close})} //释放资源
现在我们应该可以很简练但又不失清楚详尽地描述一段IO程序:
打开文件fahrenheit.txt
读取一行字符
过滤空行或者以#开始的字行,可通过的字行代表亨氏温度数
把亨氏温度转换成摄氏温度数
这里面的温度转换函数如下:
1 def fahrenheitToCelsius(f: Double): Double = 2 (f - 32) * 5.0/9.0
那么整个程序就可以这样写了:
1 lines("fahrenheit.txt"). 2 filter(line = !line.startsWith("#") !line.trim.isEmpty). 3 map(line = fahrenheitToCelsius(line.toDouble).toString). 4 drain
这段代码是不是很清晰的描述了其所代表的功能,不错!
现在到了了解IO过程的另一端:Sink的时候了。我们如果需要通过Process来实现输出功能的话,也就是把Source[O]的这个O发送输出到一个Sink。实际上我们也可以用Process来表达Sink,先看一个简单版本的Sink如下:
1 type SimpleSink[F[_],O] = Process[F,O = F[Unit]]
SimpleSink就是一个IO Process,它的输出是一个 O = F[Unit]函数,用一个例子来解释:
1 def simpleWriteFile(fileName: String, append: Boolean = false) : SimpleSink[IO, String] = 2 resource[FileWriter, String = IO[Unit]] 3 {IO {new FileWriter(fileName,append)}} //acquire 4 {w = IO{(s:String) = IO{w.write(s)}}} //use 5 {w = IO{w.close}} //release
下面是个可使用的Sink:
1 type Sink[F[_],O] = Process[F, O = Process[F,Unit]] 3 import java.io.FileWriter 5 def fileW(file: String, append: Boolean = false): Sink[IO,String] = 6 resource[FileWriter, String = Process[IO,Unit]] 7 { IO { new FileWriter(file, append) }} 8 { w = stepWrite { (s: String) = eval[IO,Unit](IO(w.write(s))) }} //重复循环逐行写 9 { w = eval_(IO(w.close)) } 11 /* 一个无穷循环恒量stream. */ 12 def stepWrite[A](a: A): Process[IO,A] = 13 eval(IO(a)).flatMap { a = Emit(a, stepWrite(a)) } 通过Emit的下一状态重复运算IO(a)
我们需要实现逐行输出,所以用这个stepWrite来运算IO。stepWrite是通过返回Emit来实现无穷循环的。
下一步是把Sink和Process对接起来。我们可以用以下的to组件来连接:
1 def to[O2](sink: Sink[F,O]): Process[F,Unit] = 2 join { (this zipWith sink)((o,f) = f(o)) }
1 def join[F[_],A](p: Process[F,Process[F,A]]): Process[F,A] = 2 p.flatMap(pa = pa)
现在我们可以在前面例子里的Process过程中再增加一个写入celsius.txt的组件:
1 val converter: Process[IO,Unit] = 2 lines("fahrenheit.txt"). //读取 3 filter(line = !line.startsWith("#") !line.trim.isEmpty). //过滤 4 map(line = fahrenheitToCelsius(line.toDouble).toString). //温度转换 5 pipe(intersperse("\n")). //加end of line 6 to(fileW("celsius.txt")). //写入 7 drain //继续循环
上面的Sink类型运算IO后不返回任何结果(Unit)。但有时我们希望IO运算能返回一些东西,如运算数据库query之后返回结果集,那我们需要一个新的类型:
1 type Channel[F[_],I,O] = Process[F, I = Process[F,O]]
Channel和Sink非常相似,差别只在Process[F,O]和Process[F,Unit]。
我们用Channel来描述一个数据库查询:
1 import java.sql.{Connection, PreparedStatement, ResultSet} 3 def query(conn: IO[Connection]): 4 Channel[IO, Connection = PreparedStatement, Map[String,Any]] = //Map === Row 5 resource_ //I Connection = PreparedStatement 6 { conn } //打开connection 7 { conn = constant { (q: Connection = PreparedStatement) = //循环查询 8 resource_ 9 { IO { //运行query 10 val rs = q(conn).executeQuery 11 val ncols = rs.getMetaData.getColumnCount 12 val cols = (1 to ncols).map(rs.getMetaData.getColumnName) 13 (rs, cols) 14 }} 15 { case (rs, cols) = //读取纪录Row 16 def step = 17 if (!rs.next) None 18 else Some(cols.map(c = (c, rs.getObject(c): Any)).toMap) 19 lazy val rows: Process[IO,Map[String,Any]] = //循环读取 20 eval(IO(step)).flatMap { 21 case None = Halt(End) 22 case Some(row) = Emit(row, rows) //循环运算rows函数 24 rows 26 { p = IO { p._1.close } } // close the ResultSet 27 }} 28 { c = IO(c.close) }
以下提供更多的应用示范:
从一个文件里读取存放亨氏温度的文件名后进行温度转换并存放到celsius.txt中
1 val convertAll: Process[IO,Unit] = (for { 2 out - fileW("celsius.txt").once // out的类型是String = Process[IO,Unit] 3 file - lines("fahrenheits.txt") //fahrenheits.txt里保存了一串文件名 4 _ - lines(file). //动态打开文件读取温度记录 5 map(line = fahrenheitToCelsius(line.toDouble)). //温度系统转换 6 flatMap(celsius = out(celsius.toString)) //输出 7 } yield ()) drain //继续循环
输出到多个.celsius文件:
1 val convertMultisink: Process[IO,Unit] = (for { 2 file - lines("fahrenheits.txt") //读取文件名称 3 _ - lines(file). //打开文件读取温度数据 4 map(line = fahrenheitToCelsius(line.toDouble)). //温度系统转换 5 map(_ toString). 6 to(fileW(file + ".celsius")) //写入文件 7 } yield ()) drain
我们可以按需要在处理过程中增加处理组件:
1 val convertMultisink2: Process[IO,Unit] = (for { 2 file - lines("fahrenheits.txt") 3 _ - lines(file). 4 filter(!_.startsWith("#")). //过滤#开始字串 5 map(line = fahrenheitToCelsius(line.toDouble)). 6 filter(_ 0). // 过滤0度以下温度 7 map(_ toString). 8 to(fileW(file + ".celsius")) 9 } yield ()) drain 10
java面试题(十一)IO流篇 3.1 介绍一下Java中的IO流 IO(Input Output)用于实现对数据的输入与输出操作,Java把不同的输入/输出源(键盘、文件、网络等)抽象表述为流(Stream)。流是从起源到接收的有序数据,有了它程序就可以采用同一方式访问不同的输入/输出源。 ● 按照数据流向,可以将流分为输入流和输出流,其中输入流只能读取数据、不能写入数据,而输出流只能写入数据、不能读取数据。 ● 按照数据类型,可以将流分为字节流和字符流,其中字节流操作的数据单元是8位的字节,而字符流操作的数据单元是16位的字符。 ● 按照处理功能,可以将流分为节点流和处理流,其中节点流可以直接从/向一个特定的I
相关文章
- 【Linux系统编程】文件IO操作
- python异步IO编程(一)
- Go http编程
- 泛函编程(37)-泛函Stream IO:通用的IO处理过程-Free Process
- 泛函编程(36)-泛函Stream IO:IO数据源-IO Source & Sink
- 泛函编程(33)-泛函IO:Free Functor - Coyoneda
- 泛函编程(32)-泛函IO:IO Monad
- 泛函编程(31)-泛函IO:Free Monad-Running free
- C语言编程的环境以及架构
- Linux IO多路复用之epoll网络编程及源码(转)
- 8-2-1python语法基础-并发编程-线程-创建线程,线程冲突(锁),线程通信(condition,队列),线程池,定时器
- python后端面试第二部分:网络编程和并发编程--长期维护
- Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
- Java IO编程
- Atitit.互联网 软件编程 数据库方面 架构 大牛 牛人 attilax总结
- Error:Execution failed for task ‘:app:transformNative_libsWithStripDebugSymbolForDebug‘.> java.io.IO
- 编程,不止有代码,还有艺术
- python爬虫多线程编程
- Python编程:Flask-BasicAuth实现Authentication登录认证
- Python编程:设置Python解释器不生成字节码pyc文件
- Python编程:aiohttp和requests网络io性能比较
- Kotlin 函数式编程思想 FP in Kotlin
- 【Java 并发编程】Java 创建线程池的正确姿势: Executors 和 ThreadPoolExecutor 详解...
- 编程算法 - 有序双循环链表的插入 代码(C)
- 002-Unix网络编程-五种IO模型,阻塞IO、非阻塞IO、多路复用IO、信号驱动IO以及异步IO和高性能IO设计模式:Reactor和Proactor
- Unix网络编程 高级IO套接字设置超时
- LabVIEW编程LabVIEW控制研华PCI-1756例程与相关资料
- LabVIEW编程LabVIEW开发控制FLEX温湿度压力传感器例程与相关资料
- 学编程有80%的人不知正确方法:这两套妙招助你事半功倍!
- 扩展知识点----Linux中看门狗应用编程
- 系统编程(八)文件IO标准库API fgetc fputc feof fseek
- 系统编程(七)文件IO标准库API fopen fprintf fclose
- 系统编程(六)文件IO之常用API readv writev
- 系统编程(二)文件IO之常用API open write lseek read close
- 系统编程(一)初识文件IO