FunDA(2)- Streaming Data Operation:流式数据操作详解编程语言
在上一集的讨论里我们介绍并实现了强类型返回结果行。使用强类型主要的目的是当我们把后端数据库SQL批次操作搬到内存里转变成数据流式按行操作时能更方便、准确、高效地选定数据字段。在上集讨论示范里我们用集合的foreach方式模拟了一个最简单的数据流,并把从数据库里批次读取的数据集转换成一串连续的数据行来逐行使用。一般来说完整的流式数据处理流程包括了从数据库中读取数据、根据读取的每行数据状态再对后台数据库进行更新,包括:插入新数据、更新、删除等。那么在上篇中实现的流式操作基础上再添加一种指令行类型就可以完善整个数据处理流程了,就像下面这个图示:
Database = Query - Collection = Streaming - DataRow = QueryAction(DataRow) - ActionRow = execAction(ActionRow) - Database
如果我们还是以Slick为目标FRM,那么这个ActionRow的类型就是Slick的DBIO[T]了:
1 package com.bayakala.funda.rowtypes 2 import slick.dbio._ 3 object ActionType { 4 type FDAAction[T] = DBIO[T] 5 }
记得有一次在一个Scala讨论区里遇到这样一个问题:如何把a表里的status字段更新成b表的status字段值,转化成SQL语句如下:
update a,b set a.status=b.status where a.id=b.id
那位哥们的问题是如何用Slick来实现对a表的更新,不能用sql ??? interpolation 直接调用SQL语句,可能因为要求compile time语法check保障吧。这个问题用Slick Query还真的不太容易解决(能不能解决就不想费功夫去想了),这是因为FRM的SQL批次处理弱点。如果用FunDA的流式操作思路就会很容易解决了,只要用join Query把b.status读出来再用b.id=a.id逐个更新a.status。刚好,下面我们就示范通过ActionRow来解决这个问题。先用下面这段代码来设置测试数据:
1 import slick.dbio.DBIO 2 import slick.driver.H2Driver.api._ 4 import scala.concurrent.duration._ 5 import scala.concurrent.{Await, Future} 6 import scala.util.{Failure, Success} 7 import scala.concurrent.ExecutionContext.Implicits.global 8 import slick.jdbc.meta.MTable 9 object ActionRowTest extends App { 11 class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA") { 12 def id = column[Int]("id",O.PrimaryKey) 13 def flds = column[String]("aflds") 14 def status = column[Int]("status") 15 def * = (id,flds,status) 16 } 17 val tableA = TableQuery[ATable] 19 class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB") { 20 def id = column[Int]("id",O.PrimaryKey) 21 def flds = column[String]("bflds") 22 def status = column[Int]("status") 23 def * = (id,flds,status) 24 } 25 val tableB = TableQuery[BTable] 27 val insertAAction = 28 tableA ++= Seq ( 29 (1,"aaa",0), 30 (2,"bbb",3), 31 (3,"ccc",1), 32 (4,"ddd",0), 33 (16,"kkk",16) 34 ) 35 val insertBAction = 36 tableB ++= Seq ( 37 (1,"aaa",1), 38 (2,"bbb",2), 39 (3,"ccc",3), 40 (4,"ddd",4), 41 (5,"kkk",5) 42 ) 44 val db = Database.forConfig("h2db") 47 def tableExists(tables: Vector[MTable], tblname: String) = 48 tables.exists {t = t.name.toString.contains(tblname)} 50 def createSchemaIfNotExists(): Future[Unit] = { 51 db.run(MTable.getTables).flatMap { 52 case tables if !tableExists(tables,".TA") !tableExists(tables,".TB") = 53 println("Creating schemas for TA and TB...") 54 db.run((tableA.schema ++ tableB.schema).create) 55 case tables if !tableExists(tables,".TA") = 56 println("Creating schema for TA ...") 57 db.run(tableA.schema.create) 58 case tables if !tableExists(tables,".TB") = 59 println("Creating schema for TB ...") 60 db.run(tableB.schema.create) 61 case _ = 62 println("Schema for TA, TB already created.") 63 Future.successful() 64 } 65 } 67 def insertInitialData(): Future[Unit] = { 68 val cleanInsert = DBIO.seq( 69 tableA.delete, tableB.delete, 70 insertAAction, 71 insertBAction) 72 db.run(cleanInsert).andThen { 73 case Success(_) = println("Data insert completed.") 74 case Failure(e) = println(s"Data insert failed [${e.getMessage}]") 75 } 76 } 78 Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf) 80 val initResult = createSchemaIfNotExists().flatMap {_ = insertInitialData()} 81 Await.ready(initResult,Duration.Inf) 86 }
用join query先把这两个表相关的字段值搬到内存转成强类型行FDADataRow:
1 val selectAB = for { 2 a - tableA 3 b - tableB 4 if (a.id === b.id) 5 } yield (a.id,b.id,a.status,b.status) 7 case class ABRow (id: Int, asts: Int, bsts: Int) 8 def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4) 10 import com.bayakala.funda.rowtypes.DataRowType 12 val loader = FDADataRow(slick.driver.H2Driver, toABRow _) 13 loader.getTypedRows(selectAB.result)(db).foreach {dataRow = 14 println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") 15 }
初始结果如下:
ID:1 Status A = 0, B = 1 ID:2 Status A = 3, B = 2 ID:3 Status A = 1, B = 3 ID:4 Status A = 0, B = 4
现在我们把每条数据行DataRow转成动作行ActionRow。然后把每条DataRow的asts字段值替换成bsts的字段值:
1 import com.bayakala.funda.rowtypes.ActionType.FDAAction 2 def updateAStatus(row: ABRow): FDAAction[Int] = { 3 tableA.filter{r = r.id === row.id} 4 .map(_.status) 5 .update(row.asts) 6 } 9 loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach { 10 actionRow = 11 println(s"${actionRow.toString}") 12 }
显示结果如下:
slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@492691d7 slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@27216cd slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@558bdf1f slick.driver.JdbcActionComponent$UpdateActionExtensionMethodsImpl$$anon$7@8576fa0
现在每条DataRow已经被转化成jdbc action类型了。
下一步我们只需要运行这些ActionRow就可以完成任务了:
1 def execAction(act: FDAAction[Int]) = db.run(act) 3 loader.getTypedRows(selectAB.result)(db) 4 .map(updateAStatus(_)) 5 .map(execAction(_))
现在再看看数据库中的TA表状态:
loader.getTypedRows(selectAB.result)(db).foreach {dataRow = println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") ID:1 Status A = 1, B = 1 ID:2 Status A = 2, B = 2 ID:3 Status A = 3, B = 3 ID:4 Status A = 4, B = 4
我们看到已经正确更新了TA的status字段值。
在这个示范中明显有很多不足之处:如果a.status=b.status应该省略更新步骤。这是因为foreach只能模拟最基本的数据流动。如果我们使用了具备强大功能的Stream工具库如scalaz-stream-fs2,就可以更好控制数据元素的流动。更重要的是scalaz-stream-fs2支持并行运算,那么上面所描述的流程:
Database = Query - Collection = Streaming - DataRow = QueryAction(DataRow) - ActionRow = execAction(ActionRow) - Database
几个 = 环节:Query、Streaming、QueryAction、execAction将可以并行运算,从而实现充分利用多核CPU硬件资源,提高运算效率的目的。下面是这次讨论涉及的源代码:
1 package com.bayakala.funda.rowtypes 3 import scala.concurrent.duration._ 4 import scala.concurrent.Await 5 import slick.driver.JdbcProfile 7 object DataRowType { 8 class FDADataRow[SOURCE, TARGET](slickProfile: JdbcProfile,convert: SOURCE = TARGET){ 9 import slickProfile.api._ 11 def getTypedRows(slickAction: DBIO[Iterable[SOURCE]])(slickDB: Database): Iterable[TARGET] = 12 Await.result(slickDB.run(slickAction), Duration.Inf).map(raw = convert(raw)) 13 } 15 object FDADataRow { 16 def apply[SOURCE, TARGET](slickProfile: JdbcProfile, converter: SOURCE = TARGET): FDADataRow[SOURCE, TARGET] = 17 new FDADataRow[SOURCE, TARGET](slickProfile, converter) 18 } 20 }
6 import scala.util.{Failure, Success} 7 import scala.concurrent.ExecutionContext.Implicits.global 8 import slick.jdbc.meta.MTable 9 object ActionRowTest extends App { 11 class ATable(tag: Tag) extends Table[(Int,String,Int)](tag,"TA") { 12 def id = column[Int]("id",O.PrimaryKey) 13 def flds = column[String]("aflds") 14 def status = column[Int]("status") 15 def * = (id,flds,status) 16 } 17 val tableA = TableQuery[ATable] 19 class BTable(tag: Tag) extends Table[(Int,String,Int)](tag,"TB") { 20 def id = column[Int]("id",O.PrimaryKey) 21 def flds = column[String]("bflds") 22 def status = column[Int]("status") 23 def * = (id,flds,status) 24 } 25 val tableB = TableQuery[BTable] 27 val insertAAction = 28 tableA ++= Seq ( 29 (1,"aaa",0), 30 (2,"bbb",3), 31 (3,"ccc",1), 32 (4,"ddd",0), 33 (16,"kkk",16) 34 ) 35 val insertBAction = 36 tableB ++= Seq ( 37 (1,"aaa",1), 38 (2,"bbb",2), 39 (3,"ccc",3), 40 (4,"ddd",4), 41 (5,"kkk",5) 42 ) 44 val db = Database.forConfig("h2db") 47 def tableExists(tables: Vector[MTable], tblname: String) = 48 tables.exists {t = t.name.toString.contains(tblname)} 50 def createSchemaIfNotExists(): Future[Unit] = { 51 db.run(MTable.getTables).flatMap { 52 case tables if !tableExists(tables,".TA") !tableExists(tables,".TB") = 53 println("Creating schemas for TA and TB...") 54 db.run((tableA.schema ++ tableB.schema).create) 55 case tables if !tableExists(tables,".TA") = 56 println("Creating schema for TA ...") 57 db.run(tableA.schema.create) 58 case tables if !tableExists(tables,".TB") = 59 println("Creating schema for TB ...") 60 db.run(tableB.schema.create) 61 case _ = 62 println("Schema for TA, TB already created.") 63 Future.successful() 64 } 65 } 67 def insertInitialData(): Future[Unit] = { 68 val cleanInsert = DBIO.seq( 69 tableA.delete, tableB.delete, 70 insertAAction, 71 insertBAction) 72 db.run(cleanInsert).andThen { 73 case Success(_) = println("Data insert completed.") 74 case Failure(e) = println(s"Data insert failed [${e.getMessage}]") 75 } 76 } 78 Await.ready(db.run(sql"DROP TABLE TA; DROP TABLE TB".as[String]),Duration.Inf) 80 val initResult = createSchemaIfNotExists().flatMap {_ = insertInitialData()} 81 Await.ready(initResult,Duration.Inf) 84 val selectAB = for { 85 a - tableA 86 b - tableB 87 if (a.id === b.id) 88 } yield (a.id,b.id,a.status,b.status) 90 case class ABRow (id: Int, asts: Int, bsts: Int) 91 def toABRow(raw: (Int,Int,Int,Int)) = ABRow(raw._1,raw._3,raw._4) 93 import com.bayakala.funda.rowtypes.DataRowType.FDADataRow 95 val loader = FDADataRow(slick.driver.H2Driver, toABRow _) 96 loader.getTypedRows(selectAB.result)(db).foreach {dataRow = 97 println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") 98 } 100 import com.bayakala.funda.rowtypes.ActionType.FDAAction 101 def updateAStatus(row: ABRow): FDAAction[Int] = { 102 tableA.filter{r = r.id === row.id} 103 .map(_.status) 104 .update(row.bsts) 105 } 108 loader.getTypedRows(selectAB.result)(db).map(updateAStatus(_)).foreach { 109 actionRow = 110 println(s"${actionRow.toString}") 111 } 113 def execAction(act: FDAAction[Int]) = db.run(act) 115 loader.getTypedRows(selectAB.result)(db) 116 .map(updateAStatus(_)) 117 .map(execAction(_)) 119 loader.getTypedRows(selectAB.result)(db).foreach {dataRow = 120 println(s"ID:${dataRow.id} Status A = ${dataRow.asts}, B = ${dataRow.bsts}") 121 } 123 }
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/12877.html
cjava相关文章
- jQuery实现当拉动滚动条到底部加载数据详解编程语言
- 基于JAVA的违章查询助手数据调用代码实例详解编程语言
- 扩展String JdbcTemplate获得插入数据的主键详解编程语言
- 利用反射及JDBC元数据编写通用的查询方法详解编程语言
- Python3 实现数据读写分离设计详解编程语言
- Python数据清洗基本流程详解编程语言
- HBase 写优化之 BulkLoad 实现数据快速入库详解编程语言
- 判断两个里的数据是否相等详解itxm 2018年08月28日 编程语言 775 0 编程语言
- SpringMVC的控制器接收前端数据的方式详解编程语言
- Java 共享数据读写(多线程)详解编程语言
- php获取excel文件数据详解编程语言
- 获取Excel部分数据并很据项目要求计算适宜性等级综合指数判断该地区的土壤适宜性详解编程语言
- 获取Excel数据(或部分数据)并导出成txt文本格式详解编程语言
- 导出Excel数据详解编程语言
- 数据归一化详解编程语言
- Python3.x:定时获取页面数据存入数据库详解编程语言
- Java数据持久层框架 MyBatis之API学习七(动态 SQL详解)编程语言
- 关于对char类型数据赋予负值的汇编表现详解编程语言
- SAP查看表数据详解编程语言
- HttpClient携带请求JSON数据访问HTTP接口得到响应JSON数据详解编程语言
- 成本中心主数据屏幕增强详解编程语言
- 物料主数据的维护状态详解编程语言
- SAP MM-分包业务讲解(subcontracting)-02-主数据与采购订单的创建-多BOM的业务情况分析详解编程语言
- 结构优化Oracle数据库表结构优化提升数据性能(oracle数据库的表)