Storm-源码分析- spout (backtype.storm.spout)
ISpout作为实现spout的核心interface, spout负责feeding message, 并且track这些message.
如果需要Spout track发出的message, 必须给出message-id, 这个message-id可以是任意类型, 但是如果不指定或将message-id置空, storm就不会track这个message
必须要注意的是, spout线程会在一个线程中调用ack, fail, nextTuple, 所以不用考虑互斥, 但是也要这些function中, 避免任意的block
/** * ISpout is the core interface for implementing spouts. A Spout is responsible * for feeding messages into the topology for processing. For every tuple emitted by * a spout, Storm will track the (potentially very large) DAG of tuples generated * based on a tuple emitted by the spout. When Storm detects that every tuple in * that DAG has been successfully processed, it will send an ack message to the Spout. * p If a tuple fails to be fully process within the configured timeout for the * topology (see {@link backtype.storm.Config}), Storm will send a fail message to the spout * for the message. /p * p When a Spout emits a tuple, it can tag the tuple with a message id. The message id * can be any type. When Storm acks or fails a message, it will pass back to the * spout the same message id to identify which tuple its referring to. If the spout leaves out * the message id, or sets it to null, then Storm will not track the message and the spout * will not receive any ack or fail callbacks for the message. /p * p Storm executes ack, fail, and nextTuple all on the same thread. This means that an implementor * of an ISpout does not need to worry about concurrency issues between those methods. However, it * also means that an implementor must ensure that nextTuple is non-blocking: otherwise * the method could block acks and fails that are pending to be processed. /p public interface ISpout extends Serializable { * Called when a task for this component is initialized within a worker on the cluster. * It provides the spout with the environment in which the spout executes. * p This includes the: /p * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine. * @param context This object can be used to get information about this tasks place within the topology, including the task id and component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object. void open(Map conf, TopologyContext context, SpoutOutputCollector collector); * Called when an ISpout is going to be shutdown. There is no guarentee that close * will be called, because the supervisor kill -9s worker processes on the cluster. * p The one context where close is guaranteed to be called is a topology is * killed when running Storm in local mode. /p void close(); * Called when a spout has been activated out of a deactivated mode. * nextTuple will be called on this spout soon. A spout can become activated * after having been deactivated when the topology is manipulated using the * `storm` client. void activate(); * Called when a spout has been deactivated. nextTuple will not be called while * a spout is deactivated. The spout may or may not be reactivated in the future. void deactivate(); * When this method is called, Storm is requesting that the Spout emit tuples to the * output collector. This method should be non-blocking, so if the Spout has no tuples * to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous * to have nextTuple sleep for a short amount of time (like a single millisecond) * so as not to waste too much CPU. void nextTuple(); * Storm has determined that the tuple emitted by this spout with the msgId identifier * has been fully processed. Typically, an implementation of this method will take that * message off the queue and prevent it from being replayed. void ack(Object msgId); * The tuple emitted by this spout with the msgId identifier has failed to be * fully processed. Typically, an implementation of this method will put that * message back on the queue to be replayed at a later time. void fail(Object msgId);
2. SpoutOutputCollector
用于expose spout发送(emit) tuples的接口
和bolt的output collector相比, spout的output collector可以指定message-id, 用于spout track该message
emit
List Integer emit(String streamId, List Object tuple, Object messageId)
emit, 3个参数, 发送到的streamid, tuple, 和message-id
如果streamid为空, 则发送到默认stream, Utils.DEFAULT_STREAM_ID
如果messageid为空, 则spout不会track this message
1个返回值, 最终发送到的task ids
emitDirect
void emitDirect(int taskId, String streamId, List Object tuple, Object messageId)
directgrouping, 直接通过taskid指定发送的task
/** * This output collector exposes the API for emitting tuples from an {@link backtype.storm.topology.IRichSpout}. * The main difference between this output collector and {@link OutputCollector} * for {@link backtype.storm.topology.IRichBolt} is that spouts can tag messages with ids so that they can be * acked or failed later on. This is the Spout portion of Storms API to * guarantee that each message is fully processed at least once. public class SpoutOutputCollector implements ISpoutOutputCollector { ISpoutOutputCollector _delegate; public SpoutOutputCollector(ISpoutOutputCollector delegate) { _delegate = delegate; * Emits a new tuple to the specified output stream with the given message ID. * When Storm detects that this tuple has been fully processed, or has failed * to be fully processed, the spout will receive an ack or fail callback respectively * with the messageId as long as the messageId was not null. If the messageId was null, * Storm will not track the tuple and no callback will be received. The emitted values must be * immutable. * @return the list of task ids that this tuple was sent to public List Integer emit(String streamId, List Object tuple, Object messageId) { return _delegate.emit(streamId, tuple, messageId); * Emits a new tuple to the default output stream with the given message ID. * When Storm detects that this tuple has been fully processed, or has failed * to be fully processed, the spout will receive an ack or fail callback respectively * with the messageId as long as the messageId was not null. If the messageId was null, * Storm will not track the tuple and no callback will be received. The emitted values must be * immutable. * @return the list of task ids that this tuple was sent to public List Integer emit(List Object tuple, Object messageId) { return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); * Emits a tuple to the default output stream with a null message id. Storm will * not track this message so ack and fail will never be called for this tuple. The * emitted values must be immutable. public List Integer emit(List Object tuple) { return emit(tuple, null); * Emits a tuple to the specified output stream with a null message id. Storm will * not track this message so ack and fail will never be called for this tuple. The * emitted values must be immutable. public List Integer emit(String streamId, List Object tuple) { return emit(streamId, tuple, null); * Emits a tuple to the specified task on the specified output stream. This output * stream must have been declared as a direct stream, and the specified task must * use a direct grouping on this stream to receive the message. The emitted values must be * immutable. public void emitDirect(int taskId, String streamId, List Object tuple, Object messageId) { _delegate.emitDirect(taskId, streamId, tuple, messageId); * Emits a tuple to the specified task on the default output stream. This output * stream must have been declared as a direct stream, and the specified task must * use a direct grouping on this stream to receive the message. The emitted values must be * immutable. public void emitDirect(int taskId, List Object tuple, Object messageId) { emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId); * Emits a tuple to the specified task on the specified output stream. This output * stream must have been declared as a direct stream, and the specified task must * use a direct grouping on this stream to receive the message. The emitted values must be * immutable. * p Because no message id is specified, Storm will not track this message * so ack and fail will never be called for this tuple. /p public void emitDirect(int taskId, String streamId, List Object tuple) { emitDirect(taskId, streamId, tuple, null); * Emits a tuple to the specified task on the default output stream. This output * stream must have been declared as a direct stream, and the specified task must * use a direct grouping on this stream to receive the message. The emitted values must be * immutable. * p Because no message id is specified, Storm will not track this message * so ack and fail will never be called for this tuple. /p public void emitDirect(int taskId, List Object tuple) { emitDirect(taskId, tuple, null); @Override public void reportError(Throwable error) { _delegate.reportError(error); }
本文章摘自博客园,原文发布日期:2013-08-01
1.信息流处理 Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性。即 Storm可以用来处理源源不断流进来的消息,处理之后将结果写入到某个存储中去。
Spark Streaming-as-aService with Kafka and YARN 立即下载
相关文章
- 【lLinux驱动】linux内核源码目录结构
- 被面试官吊打系列之 ReentrantLock 源码级别 逐字逐句 吐血详尽分析
- ABP源码分析三十八: ABP.Web.Api.OData
- ABP源码分析十:Unit Of Work
- 【nodejs原理&源码赏析(4)】深度剖析cluster模块源码与node.js多进程(上)
- TOMCAT源码分析——启动服务
- EventBus源码解析
- ViewPagerindicator 源码解析
- 【转载】TabLayout 源码解析
- dubbo源码分析系列(4)dubbo通信设计
- 【原创】Windows下使用 Eclipse 管理 RabbitMQ 源码之问题解决
- linux下mysql 5.5.38 源码安装笔记
- C语言/C++基础之跨年烟花程序代码(附源码)
- MFC Windows 程序设计[179]之自定义控件(附源码)
- Hashtable数据存储结构-遍历规则,Hash类型的复杂度为啥都是O(1)-源码分析(阿里)
- Atitit. 查找linux 项目源码位置
- Atitit.常用语言的常用内部api 以及API兼容性对源码级别可移植的重要性 总结
- 深度理解Android InstantRun原理以及源码分析
- ffplay源码分析—数据结构
- ijkplayer源码分析 视频解码流程
- 从源码分析DEARGUI之add_tooltip
- 【Android 异步操作】AsyncTask 异步任务 ( 参数简介 | 方法简介 | 使用方法 | AsyncTask 源码分析 )
- 爬虫日记(74):Scrapy项目配置参数源码分析(二)
- 第二人生的源码分析(2)第二人生的基本功能
- Second life的源码销售方式
- 第二人生的源码分析(五十一)纹理图片的格式之LLImageBase类
- VC++获取网卡上的IP、网关及DNS信息,获取最佳路由,遍历路由表中的条目(附源码)
- 风格迁移0-01:stylegan-源码及数据资源下载
- GinVueAdmin源码分析3-整合MySQL