zl程序教程

您现在的位置是:首页 >  其它

当前栏目

Storm starter - Overview

storm Overview Starter
2023-09-11 14:16:09 时间

Storm的starter例子, 都给的很有诚意, 不光是例子, 而是可以直接使用在实际的场景里面. 
并且提高一些很有用的tool, 比如SlidingWindowCounter, TimeCacheMap 
所以starter可以说是提高了基于storm编程的框架, 值得认真研究一下...

 

ExclamationTopology, 基本的Topology

没有什么特别的地方, 标准的例子

/**

 * This is a basic example of a Storm topology.

public class ExclamationTopology {

 public static class ExclamationBolt extends BaseRichBolt {

 OutputCollector _collector;

 @Override

 public void prepare(Map conf, TopologyContext context, OutputCollector collector) {

 _collector = collector;

 @Override

 public void execute(Tuple tuple) {

 _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));

 _collector.ack(tuple);

 @Override

 public void declareOutputFields(OutputFieldsDeclarer declarer) {

 declarer.declare(new Fields("word"));

 public static void main(String[] args) throws Exception {

 TopologyBuilder builder = new TopologyBuilder();

 builder.setSpout("word", new TestWordSpout(), 10); 

 builder.setBolt("exclaim1", new ExclamationBolt(), 3)

 .shuffleGrouping("word");

 builder.setBolt("exclaim2", new ExclamationBolt(), 2)

 .shuffleGrouping("exclaim1");

 Config conf = new Config();

 conf.setDebug(true);

 if(args!=null args.length 0) {

 conf.setNumWorkers(3);

 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

 } else {

 LocalCluster cluster = new LocalCluster();

 cluster.submitTopology("test", conf, builder.createTopology());

 Utils.sleep(10000);

 cluster.killTopology("test");

 cluster.shutdown(); 

}

 

RollingTopWords

实现了TopN和滑动窗口功能 
这个例子的Bolt实现的很有指导意义, Storm starter - RollingTopWords

 

SingleJoinExample

通过TimeCacheMap, 实现基于memory的join, Storm starter - SingleJoinExample

 

BasicDRPCTopology, ReachTopology

关于DRPC的例子, 参考Twitter Storm – DRPC

 

TransactionalGlobalCount, TransactionalWords

Transactional Topology, Storm - Transactional-topologies

TransactionalGlobalCount比较简单, 看看TransactionalWords 
在对word计数的基础上, 加上word count分布统计信息

public static Map String, CountValue COUNT_DATABASE = new HashMap String, CountValue 

public static Map Integer, BucketValue BUCKET_DATABASE = new HashMap Integer, BucketValue 
使用Count_Database来记录word的计数 
使用Bucket_Database来记录word计数的分布, 比如, 出现0~9次的word有多少, 10~20的word有多少 


public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter

对于KeyedCountUpdater和前面的简单例子没有啥大区别, 在execute时对word进行count, 在finishBatch时, 直接commit到Count_Database 
输出, new Fields("id", "key", "count", "prev-count"), 其他都好理解, 为啥需要prev-count? 因为在更新Bucket_Database, 需要知道该word的bucket是否发生迁移, 所以必须知道之前的count

 

Bucketize, 根据count/BUCKET_SIZE, 算出应该属于哪个bucket 
如果新的word, 直接在某bucket +1 
如果word的bucket发生变化, 在新的bucket +1, 旧的bucket –1 
如果没有变化, 不需要输出

 public static class Bucketize extends BaseBasicBolt {

 @Override

 public void execute(Tuple tuple, BasicOutputCollector collector) {

 TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);

 int curr = tuple.getInteger(2);

 Integer prev = tuple.getInteger(3);

 int currBucket = curr / BUCKET_SIZE;

 Integer prevBucket = null;

 if(prev!=null) {

 prevBucket = prev / BUCKET_SIZE;

 if(prevBucket==null) {

 collector.emit(new Values(attempt, currBucket, 1)); 

 } else if(currBucket != prevBucket) {

 collector.emit(new Values(attempt, currBucket, 1));

 collector.emit(new Values(attempt, prevBucket, -1));

 @Override

 public void declareOutputFields(OutputFieldsDeclarer declarer) {

 declarer.declare(new Fields("attempt", "bucket", "delta"));

 }
BucketCountUpdater, 也就是将上面的bucket的更新, 更新到Bucket_Database 

Topology定义如下,

MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);

TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);

builder.setBolt("count", new KeyedCountUpdater(), 5)

 .fieldsGrouping("spout", new Fields("word"));

builder.setBolt("bucketize", new Bucketize())

 .noneGrouping("count");

builder.setBolt("buckets", new BucketCountUpdater(), 5)

 .fieldsGrouping("bucketize", new Fields("bucket"));

 

WordCountTopology, 多语言的支持

Storm 多语言支持

分别使用ShellBolt和BaseBasicBolt来声明使用python和Java实现的Blot

 public static class SplitSentence extends ShellBolt implements IRichBolt {

 public SplitSentence() {

 super("python", "splitsentence.py");

 @Override

 public void declareOutputFields(OutputFieldsDeclarer declarer) {

 declarer.declare(new Fields("word"));

 @Override

 public Map String, Object getComponentConfiguration() {

 return null;

 public static class WordCount extends BaseBasicBolt {

 Map String, Integer counts = new HashMap String, Integer 

 @Override

 public void execute(Tuple tuple, BasicOutputCollector collector) {

 String word = tuple.getString(0);

 Integer count = counts.get(word);

 if(count==null) count = 0;

 count++;

 counts.put(word, count);

 collector.emit(new Values(word, count));

 @Override

 public void declareOutputFields(OutputFieldsDeclarer declarer) {

 declarer.declare(new Fields("word", "count"));

 }

在定义Topology的时候, 可以直接将ShellBolt和BaseBasicBolt混合使用, 非常方便

 TopologyBuilder builder = new TopologyBuilder();

 builder.setSpout("spout", new RandomSentenceSpout(), 5);

 builder.setBolt("split", new SplitSentence(), 8)

 .shuffleGrouping("spout");

 builder.setBolt("count", new WordCount(), 12)

 .fieldsGrouping("split", new Fields("word"));

本文章摘自博客园,原文发布日期:2013-05-24

Deploy and Run Apache Airflow on Alibaba Cloud # Deploy and Run Apache Airflow on Alibaba Cloud Tutorial of running open source project Apache Airflow on Alibaba Cloud with ApsaraDB (Alibaba Cloud Database). We also show a simple data migration ta
Drill storage plugin实现原理分析 # Drill Storage Plugin介绍 Drill是一个交互式SQL查询引擎,官方默认支持的数据源有hive、hbase、kafka、kudu、mongo、opentsdb、jdbc等,其中jdbc storage plugin可以覆盖所有支持jdbc协议的数据源,如:mysql、oracle等关系型数据库。所有数据源的接入都是通过drill的storage plugin实
Production-Ready Flink and Hive Integration what story you can tell now 立即下载