Storm-源码分析- hook (backtype.storm.hooks)
源码 分析 hook storm Hooks
2023-09-11 14:16:09 时间
task hook
在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook
当前定义如下事件, emit, cleanup, spoutAck……
用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS)
系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions
public interface ITaskHook { void prepare(Map conf, TopologyContext context); void cleanup(); void emit(EmitInfo info); void spoutAck(SpoutAckInfo info); void spoutFail(SpoutFailInfo info); void boltExecute(BoltExecuteInfo info); void boltAck(BoltAckInfo info); void boltFail(BoltFailInfo info); }
public class EmitInfo { public List Object values; public String stream; public int taskId; public Collection Integer outTasks; public EmitInfo(List Object values, String stream, int taskId, Collection Integer outTasks) { this.values = values; this.stream = stream; this.taskId = taskId; this.outTasks = outTasks; }
1. add hook
在mk-task的时候, 会从storm-conf配置里面读出hooks的class names
创建hook对象, 加入到TopologyContext的_hooks中
(defn mk-task [executor-data task-id] (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] (.addTaskHook ^TopologyContext (:user-context task-data) (- klass Class/forName .newInstance))) )
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext { private List ITaskHook _hooks = new ArrayList ITaskHook public void addTaskHook(ITaskHook hook) { hook.prepare(_stormConf, this); _hooks.add(hook); public Collection ITaskHook getHooks() { return _hooks; }
2. apply hook
当发生相应的事件时, 调用事先注册的hooks
下面的例子是在emit时, 调用相应的hooks
apply-hooks宏实现也很简单, 从topology context中取出hooks列表, 对每个hook调用emit(EmitInfo)
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(defmacro apply-hooks [topology-context method-sym info-form] (let [hook-sym (with-meta (gensym "hook") {:tag backtype.storm.hooks.ITaskHook})] `(let [hooks# (get-context-hooks ~topology-context)] (when-not (hooks-empty? hooks#) (let [info# ~info-form] (fast-list-iter [~hook-sym hooks#] (~method-sym ~hook-sym info#) ))))))
本文章摘自博客园,原文发布日期:2013-07-30
相关文章
- SpringMVC -- 梗概--源码--贰--上传
- 84 爬虫 - scrapy-redis源码分析(dupefilter)
- uni-app - 九宫格老虎机抽奖机插件源码(支持服务端API接口控制最终中奖的奖品,自定义组件可随意配置和控制,带组件文档轻松 DIY 自己的营销页抽奖机)幸运抽奖圆形大转盘插件组件
- Storm-源码分析- Thrift的使用
- Storm-源码分析-Topology Submit-Worker
- Spark源码分析 -- TaskScheduler
- Spark源码分析 -- SchedulableBuilder
- Spark MLlib - Decision Tree源码分析
- 《Ceph源码分析》——第1章,第3节Ceph基本架构图
- Hadoop源码分析之读文件时NameNode和DataNode的处理过程
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- 使用扩展有效对齐 SwiftUI 内容,创建自定义 SwiftUI 方法以快速对齐项目并使您的代码看起来简洁明了(教程含源码)
- SwiftUI 2 redacted新功能超实用的占位符功能(教程含源码)
- SwiftUI 2 实战之组件行满自动换行(教程含源码)
- SwiftUI 阴影基础之 03 动画阴影制造月食效果 (教程含源码)
- NSClickGestureRecognizer (SwiftUI macOS中文文档手册 教程含源码)
- 用c#开发微信 (22) 微信商城 - 微信支付 (c#源码)
- cesium 之加载地形图 Terrain 篇(附源码下载)
- 深入理解Spring源码之剖析AOP(注解配置方式)
- petite-vue源码剖析-逐行解读@vue-reactivity之Map和Set的reactive
- 对FreeRTOS的task.c文件源码的分析笔记(一)
- soundtouch源码分析__based on csdn :
- spring源码分析之cache demo
- solr源码分析之solrclound