如何应对Spark-Redis行海量数据插入、查询作业时碰到的问题
摘要:由于redis是基于内存的数据库,稳定性并不是很高,尤其是standalone模式下的redis。于是工作中在使用Spark-Redis时也会碰到很多问题,尤其是执行海量数据插入与查询的场景中。
海量数据查询
Redis是基于内存读取的数据库,相比其它的数据库,Redis的读取速度会更快。但是当我们要查询上千万条的海量数据时,即使是Redis也需要花费较长时间。这时候如果我们想要终止select作业的执行,我们希望的是所有的running task立即killed。
Spark是有作业调度机制的。SparkContext是Spark的入口,相当于应用程序的main函数。SparkContext中的cancelJobGroup函数可以取消正在运行的job。
/** * Cancel active jobs for the specified group. See `org.apache.spark.SparkContext.setJobGroup` * for more information. */ def cancelJobGroup(groupId: String) { assertNotStopped() dagScheduler.cancelJobGroup(groupId) }
按理说取消job之后,job下的所有task应该也终止。而且当我们取消select作业时,executor会throw TaskKilledException,而这个时候负责task作业的TaskContext在捕获到该异常之后,会执行killTaskIfInterrupted。
// If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. val killReason = reasonIfKilled if (killReason.isDefined) { // Throw an exception rather than returning, because returning within a try{} block // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl // exception will be caught by the catch block, leading to an incorrect ExceptionFailure // for the task. throw new TaskKilledException(killReason.get) }
/** * If the task is interrupted, throws TaskKilledException with the reason for the interrupt. */ private[spark] def killTaskIfInterrupted(): Unit
但是Spark-Redis中还是会出现终止作业但是task仍然running。因为task的计算逻辑最终是在RedisRDD中实现的,RedisRDD的compute会从Jedis中取获取keys。所以说要解决这个问题,应该在RedisRDD中取消正在running的task。这里有两种方法:
方法一:参考Spark的JDBCRDD,定义close(),结合InterruptibleIterator。
def close() { if (closed) return try { if (null != rs) { rs.close() } } catch { case e: Exception => logWarning("Exception closing resultset", e) } try { if (null != stmt) { stmt.close() } } catch { case e: Exception => logWarning("Exception closing statement", e) } try { if (null != conn) { if (!conn.isClosed && !conn.getAutoCommit) { try { conn.commit() } catch { case NonFatal(e) => logWarning("Exception committing transaction", e) } } conn.close() } logInfo("closed connection") } catch { case e: Exception => logWarning("Exception closing connection", e) } closed = true } context.addTaskCompletionListener{ context => close() } CompletionIterator[InternalRow, Iterator[InternalRow]]( new InterruptibleIterator(context, rowsIterator), close())
方法二:异步线程执行compute,主线程中判断task isInterrupted
try{ val thread = new Thread() { override def run(): Unit = { try { keys = doCall } catch { case e => logWarning(s"execute http require failed.") } isRequestFinished = true } } // control the http request for quite if user interrupt the job thread.start() while (!context.isInterrupted() && !isRequestFinished) { Thread.sleep(GetKeysWaitInterval) } if (context.isInterrupted() && !isRequestFinished) { logInfo(s"try to kill task ${context.getKillReason()}") context.killTaskIfInterrupted() } thread.join() CompletionIterator[T, Iterator[T]]( new InterruptibleIterator(context, keys), close)
我们可以异步线程来执行compute,然后在另外的线程中判断是否task isInterrupted,如果是的话就执行TaskContext的killTaskIfInterrupted。防止killTaskIfInterrupted无法杀掉task,再结合InterruptibleIterator:一种迭代器,以提供任务终止功能。通过检查[TaskContext]中的中断标志来工作。
海量数据插入
我们都已经redis的数据是保存在内存中的。当然Redis也支持持久化,可以将数据备份到硬盘中。当插入海量数据时,如果Redis的内存不够的话,很显然会丢失部分数据。这里让使用者困惑的点在于: 当Redis已使用内存大于最大可用内存时,Redis会报错:command not allowed when used memory > ‘maxmemory’。但是当insert job的数据大于Redis的可用内存时,部分数据丢失了,并且还没有任何报错。
因为不管是Jedis客户端还是Redis服务器,当插入数据时内存不够,不会插入成功,但也不会返回任何response。所以目前能想到的解决办法就是当insert数据丢失时,扩大Redis内存。
总结
Spark-Redis是一个应用还不是很广泛的开源项目,不像Spark JDBC那样已经商业化。所以Spark-Redis还是存在很多问题。相信随着commiter的努力,Spark-Redis也会越来越强大。
相关文章
- Redis脑裂为何会导致数据丢失?
- Redis【13】-修改数据库后,如何保证Redis与数据库的数据一致性
- 图解 Redis丨这就是 RDB 快照,能记录实际数据的
- 【Docker 基础教程】容器数据持久化(三)------ Redis的基础配置
- 【数据库开发】Redis key-value内存数据库介绍
- 关于Redis的数据清理
- redis数据删除策略和逐出策略
- redis应用于基于时间顺序的数据操作 ,而不关注具体时间
- redis应用于各种结构型和非结构型高热度数据访问加速
- Redis学习之路(001)- Redis介绍以及安装(Linux)
- Redis日常操作命令小结
- 一个Redis dump文件的简要分析过程
- OpenEuler2203 基于容器和本地文件部署Redis Cluster的过程以及简单性能测试
- 2022-04-14 redis-operator业界方案分析
- 2021-12-15 [redis] 使用Prometheus收集性能数据
- redis——数据结构和对象的使用介绍
- 通过redis协议构建脏字过滤微服务
- 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战
- ssm redis 数据字典在J2EE中的多种应用与实现
- flink将数据写入redis
- 认识 MySQL 和 Redis 的数据一致性问题
- 大数据Flink(十六):流批一体API Connectors Redis
- Laravel Redis操作大全
- Redis和Memcached比较
- 因在缓存对象中增加字段,而导致Redis中取出缓存转化成Java对象时出现反序列化失败的问题
- 在SpringBoot中存放session到Redis
- ELKStack入门篇(三)之logstash收集日志写入redis
- centos7搭建redis集群