Storm starter - Overview
Storm的starter例子, 都给的很有诚意, 不光是例子, 而是可以直接使用在实际的场景里面.
并且提高一些很有用的tool, 比如SlidingWindowCounter, TimeCacheMap
所以starter可以说是提高了基于storm编程的框架, 值得认真研究一下...
ExclamationTopology, 基本的Topology
没有什么特别的地方, 标准的例子
/** * This is a basic example of a Storm topology. public class ExclamationTopology { public static class ExclamationBolt extends BaseRichBolt { OutputCollector _collector; @Override public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; @Override public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("word"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1"); Config conf = new Config(); conf.setDebug(true); if(args!=null args.length 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); }
RollingTopWords
实现了TopN和滑动窗口功能
这个例子的Bolt实现的很有指导意义, Storm starter - RollingTopWords
SingleJoinExample
通过TimeCacheMap, 实现基于memory的join, Storm starter - SingleJoinExample
BasicDRPCTopology, ReachTopology
关于DRPC的例子, 参考Twitter Storm – DRPC
TransactionalGlobalCount, TransactionalWords
Transactional Topology, Storm - Transactional-topologies
TransactionalGlobalCount比较简单, 看看TransactionalWords
在对word计数的基础上, 加上word count分布统计信息
public static Map String, CountValue COUNT_DATABASE = new HashMap String, CountValue public static Map Integer, BucketValue BUCKET_DATABASE = new HashMap Integer, BucketValue使用Count_Database来记录word的计数
使用Bucket_Database来记录word计数的分布, 比如, 出现0~9次的word有多少, 10~20的word有多少
public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter
对于KeyedCountUpdater和前面的简单例子没有啥大区别, 在execute时对word进行count, 在finishBatch时, 直接commit到Count_Database
输出, new Fields("id", "key", "count", "prev-count"), 其他都好理解, 为啥需要prev-count? 因为在更新Bucket_Database, 需要知道该word的bucket是否发生迁移, 所以必须知道之前的count
Bucketize, 根据count/BUCKET_SIZE, 算出应该属于哪个bucket
如果新的word, 直接在某bucket +1
如果word的bucket发生变化, 在新的bucket +1, 旧的bucket –1
如果没有变化, 不需要输出
public static class Bucketize extends BaseBasicBolt { @Override public void execute(Tuple tuple, BasicOutputCollector collector) { TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0); int curr = tuple.getInteger(2); Integer prev = tuple.getInteger(3); int currBucket = curr / BUCKET_SIZE; Integer prevBucket = null; if(prev!=null) { prevBucket = prev / BUCKET_SIZE; if(prevBucket==null) { collector.emit(new Values(attempt, currBucket, 1)); } else if(currBucket != prevBucket) { collector.emit(new Values(attempt, currBucket, 1)); collector.emit(new Values(attempt, prevBucket, -1)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("attempt", "bucket", "delta")); }BucketCountUpdater, 也就是将上面的bucket的更新, 更新到Bucket_Database
Topology定义如下,
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH); TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2); builder.setBolt("count", new KeyedCountUpdater(), 5) .fieldsGrouping("spout", new Fields("word")); builder.setBolt("bucketize", new Bucketize()) .noneGrouping("count"); builder.setBolt("buckets", new BucketCountUpdater(), 5) .fieldsGrouping("bucketize", new Fields("bucket"));
WordCountTopology, 多语言的支持
分别使用ShellBolt和BaseBasicBolt来声明使用python和Java实现的Blot
public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { super("python", "splitsentence.py"); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); @Override public Map String, Object getComponentConfiguration() { return null; public static class WordCount extends BaseBasicBolt { Map String, Integer counts = new HashMap String, Integer @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if(count==null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }
在定义Topology的时候, 可以直接将ShellBolt和BaseBasicBolt混合使用, 非常方便
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8) .shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12) .fieldsGrouping("split", new Fields("word"));
本文章摘自博客园,原文发布日期:2013-05-24
Deploy and Run Apache Airflow on Alibaba Cloud # Deploy and Run Apache Airflow on Alibaba Cloud Tutorial of running open source project Apache Airflow on Alibaba Cloud with ApsaraDB (Alibaba Cloud Database). We also show a simple data migration ta
Drill storage plugin实现原理分析 # Drill Storage Plugin介绍 Drill是一个交互式SQL查询引擎,官方默认支持的数据源有hive、hbase、kafka、kudu、mongo、opentsdb、jdbc等,其中jdbc storage plugin可以覆盖所有支持jdbc协议的数据源,如:mysql、oracle等关系型数据库。所有数据源的接入都是通过drill的storage plugin实
Production-Ready Flink and Hive Integration what story you can tell now 立即下载
相关文章
- Storm是什么
- 109 Storm常用操作命令
- Storm-源码分析-EventManager (backtype.storm.event)
- Storm-源码分析-LocalState (backtype.storm.utils)
- Storm-源码分析- Messaging (backtype.storm.messaging)
- Storm-源码分析-Stats (backtype.storm.stats)
- Storm ack和fail机制再论
- [转]基于Storm的实时数据处理方案
- [转]hadoop,spark,storm,pig,hive,mahout等到底有什么区别和联系?
- 《Storm技术内幕与大数据实践》一导读
- Storm(二)CentOS7.5搭建Storm1.2.2集群
- Storm自带测试案例的运行
- storm发展历史
- 大数据学习——kafka+storm+hdfs整合
- 58 集团大规模 Storm 任务平滑迁移至 Flink 的秘密
- 《Storm企业级应用:实战、运维和调优》——第2章 开始使用Storm
- 《Storm企业级应用:实战、运维和调优》——3.1 Tuple元组
- 《Storm企业级应用:实战、运维和调优》——导读
- storm 消息确认机制及可靠性
- Storm实时计算系统
- 量化派基于Hadoop、Spark、Storm的大数据风控架构--转