Akka(39): Http:File streaming-文件交换详解编程语言
所谓文件交换指的是Http协议中服务端和客户端之间文件的上传和下载。Akka-http作为一种系统集成工具应该具备高效率的数据交换方式包括文件交换和数据库表行的上传下载。Akka-http的数据交换模式支持流式操作:代表交换数据可以是一种无限长度流的元素。这种模式首先解决了纯Http大数据通过Multipart传输所必须进行的数据分段操作和复杂的消息属性设定等需要的技术门槛,再者用户还可以很方便的使用Akka-stream对数据进行深度处理,免去了数据转换的麻烦。更重要的是:Akka-http还支持reactive-stream,可以避免由传输速率所产生的种种问题。在本篇我们讨论利用Akka-http进行文件的双向传递。
任何文件的内容储存格式无论在硬盘、内存或者数据线上都是一堆bytes。文件交换流程包括读取文件里的bytes,传送这些bytes,最终把这些bytes写入文件。我们看到这里每个环节操作目标都是bytes,所以可能在程序里是不需要任何数据转换过程的。Akka提供了一组文件读写函数,如下:
def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]] = fromPath(f, chunkSize, startPosition = 0) def fromPath(f: Path, chunkSize: Int, startPosition: Long): Source[ByteString, Future[IOResult]] = Source.fromGraph(new FileSource(f, chunkSize, startPosition, DefaultAttributes.fileSource, sourceShape("FileSource"))) def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]] = toPath(f, options, startPosition = 0) def toPath(f: Path, options: Set[OpenOption], startPosition: Long): Sink[ByteString, Future[IOResult]] = Sink.fromGraph(new FileSink(f, startPosition, options, DefaultAttributes.fileSink, sinkShape("FileSink")))
我们看到:fromPath类型是Source[ByteSgtring,_],toPath类型是Sink[ByteString,_],直接就是流型式,应该可以直接放入Http消息的Entity中,如下:
def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher") val file = Paths.get(filePath) FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")) limitableByteSource(loadFile) }
fileStream是Source[ByteString,_]可以直接放进Entity:
val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/text") val textData = HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/Users/tiger-macpro/downloads/A4.TIF",256) )
我们把fileStream放入了HttpRequest中。对于HttpResponse可以用下面的方式:
val route = pathPrefix("file") { (get path("text" / Remaining)) { fp = withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/users/tiger-macpro/" + fp, 256)) }
注意:complete进行了HttpResponse的构建。因为Entity.dataByes就是Source[ByteString,_],所以我们可以直接把它导入Sink:
entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))) .onComplete { case _ = println(s"Download file saved to: $destPath") }
上面我们提过FileIO.toPath就是一个Sink。由于我们的目的是大型的文件交换,所以无论上传下载都使用了withoutSizeLimit:
val route = pathPrefix("file") { (get path("exchange" / Remaining)) { fp = withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/users/tiger-macpro/" + fp, 256)) } ~ (post path("exchange")) { withoutSizeLimit { extractDataBytes { bytes = val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath))) onComplete(fut) { _ = complete(s"Save upload file to: $destPath")
好了下面的示范代码里对字符型或二进制文件都进行了交换的示范操作:
服务端:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.model._ import akka.http.scaladsl.model.HttpEntity._ import java.nio.file._ object FileServer extends App { implicit val httpSys = ActorSystem("httpSystem") implicit val httpMat = ActorMaterializer() implicit val httpEC = httpSys.dispatcher def fileStream(filePath: String, chunkSize: Int) = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher") val file = Paths.get(filePath) FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")) limitableByteSource(loadFile) val destPath = "/users/tiger-macpro/downloads/A4-1.TIF" val route = pathPrefix("file") { (get path("exchange" / Remaining)) { fp = withoutSizeLimit { complete( HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/users/tiger-macpro/" + fp, 256)) } ~ (post path("exchange")) { withoutSizeLimit { extractDataBytes { bytes = val fut = bytes.runWith(FileIO.toPath(Paths.get(destPath))) onComplete(fut) { _ = complete(s"Save upload file to: $destPath") val (port, host) = (8011,"localhost") val bindingFuture = Http().bindAndHandle(route,host,port) println(s"Server running at $host $port. Press any key to exit ...") scala.io.StdIn.readLine() bindingFuture.flatMap(_.unbind()) .onComplete(_ = httpSys.terminate()) }
客户端:
import akka.actor._ import akka.stream._ import akka.stream.scaladsl._ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpEntity.limitableByteSource import akka.http.scaladsl.model._ import java.nio.file._ import akka.util.ByteString import scala.util._ object FileClient extends App { implicit val sys = ActorSystem("ClientSys") implicit val mat = ActorMaterializer() implicit val ec = sys.dispatcher def downloadFileTo(request: HttpRequest, destPath: String) = { val futResp = Http(sys).singleRequest(request) futResp .andThen { case Success([email protected](StatusCodes.OK, _, entity, _)) = entity.dataBytes.runWith(FileIO.toPath(Paths.get(destPath))) .onComplete { case _ = println(s"Download file saved to: $destPath") } case Success([email protected](code, _, _, _)) = println(s"Download request failed, response code: $code") r.discardEntityBytes() case Success(_) = println("Unable to download file!") case Failure(err) = println(s"Download failed: ${err.getMessage}") val dlFile = "Downloads/readme.txt" val downloadText = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile) downloadFileTo(downloadText, "/users/tiger-macpro/downloads/sample.txt") scala.io.StdIn.readLine() val dlFile2 = "Downloads/image.png" val downloadText2 = HttpRequest(uri = s"http://localhost:8011/file/exchange/" + dlFile2) downloadFileTo(downloadText2, "/users/tiger-macpro/downloads/sample.png") scala.io.StdIn.readLine() def uploadFile(request: HttpRequest, dataEntity: RequestEntity) = { val futResp = Http(sys).singleRequest( request.copy(entity = dataEntity) futResp .andThen { case Success([email protected](StatusCodes.OK, _, entity, _)) = entity.dataBytes.map(_.utf8String).runForeach(println) case Success([email protected](code, _, _, _)) = println(s"Upload request failed, response code: $code") r.discardEntityBytes() case Success(_) = println("Unable to Upload file!") case Failure(err) = println(s"Upload failed: ${err.getMessage}") def fileStream(filePath: String, chunkSize: Int): Source[ByteString,Any] = { def loadFile = { // implicit val ec = httpSys.dispatchers.lookup("akka.http.blocking-ops-dispatcher") val file = Paths.get(filePath) FileIO.fromPath(file, chunkSize) .withAttributes(ActorAttributes.dispatcher("akka.http.blocking-ops-dispatcher")) limitableByteSource(loadFile) val uploadText = HttpRequest(HttpMethods.POST,uri = s"http://localhost:8011/file/exchange") val textData = HttpEntity( ContentTypes.`application/octet-stream`, fileStream("/Users/tiger-macpro/downloads/readme.txt",256) uploadFile(uploadText,textData) scala.io.StdIn.readLine() sys.terminate()
相关文章
- Http通过header传递参数_http contenttype
- 使用WinHttp接口实现HTTP协议Get、Post和文件上传功能「建议收藏」
- 1-STM32F103+ESP8266+Air302远程升级篇(自建物联网平台)--STM32F103通过ESP8266使用http或https下载程序文件(支持外部flash备份),升级程序(单片机
- python库——h5py读取h5文件「建议收藏」
- 【经验】使用http访问一个链接提示400的错误,但是在浏览器访问没问题(server returned HTTP Response code :400 fro URL:),怎么解决
- 【Android】使用Android开发应用过程中遇到ViewGroup的简单效以及aw和assets文件夹下的文件(Http协议的底层工作)
- TP6.0 自定义命令创建类文件
- Spring Boot 实现万能文件在线预览,已开源,真香!!
- WordPress 技巧:使用 WP_Http 直接下载文件
- Linux文件INode:深入理解文件系统底层存储结构(linux文件inode)
- python通过http下载文件的方法详解编程语言
- 解析MySQL文件以调整端口配置(mysql端口配置文件)
- 问题解决Linux系统下Jar文件路径问题(linuxjar路径)
- 深入浅出:Linux 架构学习 HTTP 协议(http协议linux)
- Linux快速搭建超强HTTP服务器(linux搭建http服务器)
- JSP多个文件打包下载代码
- C#实现word文件下载的代码
- php读取excel文件示例分享(更新修改excel)
- java使用http实现文件下载学习示例