Redis+Hbase+RocketMQ 实际使用问题案例分享
2023-03-20 15:28:20 时间
需求
- 将Hbase数据,解析后推送到RocketMQ。
- redis使用list数据类型,存储了需要推送的数据的RowKey及表名。
简单画个流程图就是:
分析及确定方案
Redis
- 明确list中元素结构
{"rowkey":rowkey,"table":table}
解析出rowkey; - 一次取多个元素加快效率;
- 取了之后放入重试队列,并删除原来的元素;
- 处理数据永远是重试队列里的,成功之后删除,失败就加上重试次数并重新放回;
- 明确从list中取值所使用的redis命令;范围获取
LRANGE
;范围删除(留下指定范围的数据)LTRIM
;判断list长度LLEN
;加入listRPUSH
;删除LREM
等等; - 从Hbase获取数据失败和发送到mq失败都令重试次数加一;
- 每次碰到重试次数不为0的数据都休眠1s;
- 设置最大重试次数,达到限制后丢弃;
- 考虑客户redis部署方式,单机、主从、集群、哨兵等;
- 选择合适的客户端,Jedis、Redisson、Lettuce等;
- 编写不同的操作代码,也可以利用配置文件、环境变量、工厂模式等适配各种部署模式;
Hbase
- 基本理论知识学习(原来没接触过),rowkey是没条数据的主键,限定符是字段名,列族是多个限定名的集合等;当时看这个觉得不错https://juejin.cn/post/6844903797655863309
- 因为是不停读取数据、链接、Table不用close,可以缓存起来,没必要每次都创建;
- 确定批量获取数据方式为批量
Get
,没用scan
; - 了解解析方式,一些网上的解析试了之后会乱码,这边用的是它自带的
CellUtil.clone
相关方法; - 考虑所有都没数据时休眠10s;
RocketMQ
- 有现成的发送代码,公司封装好的;
- 调整发送的速度、太快了服务端会吃不消(获取Hbase数据速度太快了,最开始没限制一会儿就入了百万数据),设置超时时间(默认3s);
- 调整服务端的内存、线程数等参数;
实现
配置
#server configuration
server.port=8896
#log config
logging.file.path=./logs
#redis-standalone
redis.standalone.host=
redis.standalone.port=6379
redis.standalone.password=
redis.standalone.enable=true
#redis-cluster
redis.cluster.nodes=
redis.cluster.password=
redis.cluster.timeout=30000
redis.cluster.enable=false
# Zookeeper 集群地址,逗号分隔
hbase.zookeeper.quorum=
# Zookeeper 端口
hbase.zookeeper.property.clientPort=2181
# 消息目的rocketmq地址
rocketmq.server.host=
# 发送消息间隔时间,防止发送过快mq受不了
rocketmq.send.interval.millisec=10
# 每次从redis读取数据量限制。
data.access.redisDataSize=100
# 失败数据重试次数,超过的直接丢弃
data.access.retryNum=10
# 需要接入的表,需要发送到rocketmq的topic和在redis中的key的映射。xxx.xxx.xxx[topic]=redisKey
data.access.topicKeyMap[weibo_hbase]=data:sync:notice:suanzi:weibo:back
data.access.topicKeyMap[wechat_hbase]=data:sync:notice:suanzi:wechat:back
部分代码
获取配置,其余的直接@Value("${}")
:
@Setter
@Getter
@Configuration
@ConfigurationProperties(prefix = "data.access")
public class AccessRedisMqConfig {
/**
* key:topic; value:redis的key
*/
private Map<String, String> topicKeyMap = new HashMap<>();
/**
* 一次从redis中读取数据量限制
*/
private long redisDataSize = 50;
/**
* 失败数据重试次数
*/
private int retryNum = 10;
}
开启接入:
@Component
public class AdapterRunner implements ApplicationRunner {
@Resource
private DataAccessService dataAccessService;
@Override
public void run(ApplicationArguments args) {
System.out.println("项目已启动,开始接入数据到RocketMQ……");
dataAccessService.accessData2Mq();
}
}
其他代码其实也在分析里了。
踩坑
- mq发送问题
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: invokeAsync call timeout
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeAsync(NettyRemotingClient.java:525)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.sendMessageAsync(MQClientAPIImpl.java:523)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.onExceptionImpl(MQClientAPIImpl.java:610)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.access$100(MQClientAPIImpl.java:167)
at org.apache.rocketmq.client.impl.MQClientAPIImpl$1.operationComplete(MQClientAPIImpl.java:572)
at org.apache.rocketmq.remoting.netty.ResponseFuture.executeInvokeCallback(ResponseFuture.java:54)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$2.run(NettyRemotingAbstract.java:319)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
上面分析也说了,注意发送速度,有多少资源就接入多快。还有注意相关三个端口是否开放。
总结
程序很简单,主要涉及方案的是,获取redis的list数据时,是考虑效率,及加入重试策略,保证数据不丢失等。
相关文章
- 从本体论开始说起——运营商关系图谱的构建及应用
- 如何成为一名数据科学家?
- 从未见过的堂兄杀了人,你的DNA是关键证据
- 20个安全可靠的免费数据源,各领域数据任你挑
- 20个安全可靠的免费数据源,各领域数据任你挑
- 阿里云李飞飞:All in Cloud时代,云原生数据库优势明显
- 基于Hadoop生态系统的一高性能数据存储格式CarbonData(性能篇)
- 大数据告诉你:10年漫威,到底有多少角色
- TigerGraph:实时图数据库助力金融风控升级
- Splunk利用Splunk Connected Experiences和Splunk Business Flow 扩大数据访问
- 大数据开发常见的9种数据分析手段
- 以免在景区看人,我爬了5W条全国景点门票数据...
- 【实战解析】基于HBase的大数据存储在京东的应用场景
- 数据科学家告诉你哪些计算机科学书籍是你应该看的
- Kafka作为大数据的核心技术,你了解多少?
- Spring Boot 整合 Redis 实现缓存操作
- 大数据学习必须掌握的五大核心技术有哪些?
- 基于Antlr在Apache Flink中实现监控规则DSL化的探索实践
- 甲骨文再次被Gartner评为分析型数据管理解决方案魔力象限领导者
- 爬取吴亦凡微博102118条转发数据,扒一扒流量的真假