zl程序教程

您现在的位置是:首页 >  工具

当前栏目

Storm-源码分析- hook (backtype.storm.hooks)

源码 分析 hook storm Hooks
2023-09-11 14:16:09 时间

task hook

在某些task事件发生时, 如果用户希望执行一些额外的逻辑, 就需要使用hook

当前定义如下事件, emit, cleanup, spoutAck……

用户只需要开发实现ITaskHook的类, 并将类名配置到(storm-conf TOPOLOGY-AUTO-TASK-HOOKS)

系统会在这些事件发生时, 自动调用所有注册的hook中的相应的functions

 

public interface ITaskHook {

 void prepare(Map conf, TopologyContext context);

 void cleanup();

 void emit(EmitInfo info);

 void spoutAck(SpoutAckInfo info);

 void spoutFail(SpoutFailInfo info);

 void boltExecute(BoltExecuteInfo info);

 void boltAck(BoltAckInfo info);

 void boltFail(BoltFailInfo info);

}

public class EmitInfo {

 public List Object values;

 public String stream;

 public int taskId;

 public Collection Integer outTasks;

 public EmitInfo(List Object values, String stream, int taskId, Collection Integer outTasks) {

 this.values = values;

 this.stream = stream;

 this.taskId = taskId;

 this.outTasks = outTasks;

}

 

1. add hook

在mk-task的时候, 会从storm-conf配置里面读出hooks的class names 
创建hook对象, 加入到TopologyContext的_hooks中

(defn mk-task [executor-data task-id]

 (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]

 (.addTaskHook ^TopologyContext (:user-context task-data) (- klass Class/forName .newInstance)))

)
public class TopologyContext extends WorkerTopologyContext implements IMetricsContext {

 private List ITaskHook _hooks = new ArrayList ITaskHook 

 public void addTaskHook(ITaskHook hook) {

 hook.prepare(_stormConf, this);

 _hooks.add(hook);

 public Collection ITaskHook getHooks() {

 return _hooks;

}

 

2. apply hook

当发生相应的事件时, 调用事先注册的hooks

下面的例子是在emit时, 调用相应的hooks 
apply-hooks宏实现也很简单, 从topology context中取出hooks列表, 对每个hook调用emit(EmitInfo)

(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(defmacro apply-hooks [topology-context method-sym info-form]

 (let [hook-sym (with-meta (gensym "hook") {:tag backtype.storm.hooks.ITaskHook})]

 `(let [hooks# (get-context-hooks ~topology-context)]

 (when-not (hooks-empty? hooks#)

 (let [info# ~info-form]

 (fast-list-iter [~hook-sym hooks#]

 (~method-sym ~hook-sym info#)

 ))))))

本文章摘自博客园,原文发布日期:2013-07-30