Storm-源码分析- hook (backtype.storm.hooks)
2023-03-14 22:34:08 时间
task hook
在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook
当前定义如下事件, emit, cleanup, spoutAck……
用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS)
系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions
public interface ITaskHook { void prepare(Map conf, TopologyContext context); void cleanup(); void emit(EmitInfo info); void spoutAck(SpoutAckInfo info); void spoutFail(SpoutFailInfo info); void boltExecute(BoltExecuteInfo info); void boltAck(BoltAckInfo info); void boltFail(BoltFailInfo info); }
public class EmitInfo { public List<Object> values; public String stream; public int taskId; public Collection<Integer> outTasks; public EmitInfo(List<Object> values, String stream, int taskId, Collection<Integer> outTasks) { this.values = values; this.stream = stream; this.taskId = taskId; this.outTasks = outTasks; } }
1. add hook
在mk-task的时候, 会从storm-conf配置里面读出hooks的class names
创建hook对象, 加入到TopologyContext的_hooks中
(defn mk-task [executor-data task-id] (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance))) )
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext { private List<ITaskHook> _hooks = new ArrayList<ITaskHook>(); public void addTaskHook(ITaskHook hook) { hook.prepare(_stormConf, this); _hooks.add(hook); } public Collection<ITaskHook> getHooks() { return _hooks; } }
2. apply hook
当发生相应的事件时, 调用事先注册的hooks
下面的例子是在emit时, 调用相应的hooks
apply-hooks宏实现也很简单, 从topology context中取出hooks列表, 对每个hook调用emit(EmitInfo)
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(defmacro apply-hooks [topology-context method-sym info-form]
(let [hook-sym (with-meta (gensym "hook") {:tag 'backtype.storm.hooks.ITaskHook})]
`(let [hooks# (get-context-hooks ~topology-context)]
(when-not (hooks-empty? hooks#)
(let [info# ~info-form]
(fast-list-iter [~hook-sym hooks#]
(~method-sym ~hook-sym info#)
))))))
本文章摘自博客园,原文发布日期:2013-07-30
相关文章
- GuardDuty 可视化安全管理
- 06 python编程
- Amazon Lex 推出增强的控制台体验和全新的 V2 API
- 搭载 AWS Graviton 2 处理器的 Amazon EC2 M6g、C6g 以及 R6g 实例,现已在光环新网科技公司运营的 AWS 中国(北京)区域及西云数据运营的 AWS 中国(宁夏)区域正式上线
- 在AWS EKS上发布容器服务(下)
- 在 AWS EKS 平台上发布 K8S 服务(上)
- 探索和体验 Graviton2 高性价比
- 新增功能 – AWS Transfer Family 支持 Amazon Elastic File System
- 在中国区构建自动、弹性、安全的多账号体系——账号自动创建和初始化
- Python optparse模块
- Spring Cloud 的云原生迁移 – AWS 上的混合部署架构(下篇)
- Spring Cloud 的云原生迁移 – AWS 上的混合部署架构(上篇)
- 新功能 – AWS Systems Manager 整合了应用程序管理
- python rpc讲解
- AWS CloudShell – 命令行访问 AWS 资源
- AWS Systems Manager 变更管理器的简介
- Java--128陷阱
- Amazon Location – 为应用程序添加地图和位置感知功能
- 新功能 – AWS Systems Manager 队列管理器
- 宣布 Amazon Managed Service for Grafana(预览版)