Storm ack和fail机制再论
之前对这个的理解有些问题,今天用到有仔细梳理了一遍,记录一下
首先开启storm tracker机制的前提是,
1. 在spout emit tuple的时候,要加上第3个参数messageid
2. 在配置中acker数目至少为1
3. 在bolt emit的时候,要加上第二个参数anchor tuple,以保持tracker链路
流程,
1. 当tuple具有messageid时,spout会把该tuple加到pending list里面
并发消息给acker,通知acker开始tracker这条tuple
2. 然后再后续的bolt的处理逻辑中,你必须显式的ack或fail所有处理的tuple
如果这条tuple在整个DAG图上都成功执行了,那么acker会发现该tuple的track异或值为0
于是acker会发ack_message给spout
当然如果在DAG图上任意一个节点bolt上fail,那么acker会认为该tuple fail
于是acker会发fail_message给spout
3. 当spout收到ack或fail message如何处理,
首先是从pending list里面删掉这条tuple,因为无论ack或fail,只要得到结果,这条tuple就没有继续被cache的必要了
然后做的事是调用spout.ack或spout.fail
所以系统默认是不会做任何事的,甚至是fail后的重发,你也需要在fail里面自己实现
如何实现后面看
4. 如果一条tuple没有被ack或fail,最终是会超时的
Spout会根据system tick去rotate pending list,对于每个过时的tuple,都调用spout.fail
下面的问题就是如何做fail重发,
这个必须用户通过自己处理fail来做,系统是不会自己做的,
public void fail(Object msgId)
看看系统提供的接口,只有msgId这个参数,这里的设计不合理,其实在系统里是有cache整个msg的,只给用户一个messageid,用户如何取得原来的msg
貌似需要自己cache,然后用这个msgId去查询,太坑爹了
阿里自己的Jstorm会提供
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }
这样更合理一些, 可以直接取得系统cache的msg values
相关文章
- MariaDB 10.3首推系统版本表,误删数据不用跑路了!
- 七年一剑 华丽蜕变:WOT2018揭秘技术背后的真相
- 区块链真相如何?这篇文章说透了!
- 利用DB实现分布式锁的思路
- 区块链技术如果融合到各个行业,将如何改变我们的生活?
- 区块链创新离不开一流的工程技术能力
- Shiro整合springboot,freemaker,redis(含权限系统完整源码)
- 区块链如何提升食品安全,这有一份详细报告
- 教你玩转MyRocks/RocksDB—STATISTICS与后台线程篇
- 公开,公正,公平,区块链的试金石
- 循序渐进学习如何在MariaDB中配置主从复制?
- 区块链难理解?这里有一篇初学者指南
- 物联网设备安全导读
- 实录 | 苹果库克对话清华经管院长钱颖一:硅谷该多向中国学习
- ROS机器人程序设计(原书第2版)2.4.13 启动文件
- Rabbitmq的网络层浅析
- Clojure的transient集合
- Bash Shellshock事件:CVE-2014-6271资料汇总
- 详解Clojure的递归(下)——相互递归和trampoline
- Programming Cljr – working with Java