Kafka+Storm+HDFS整合实践
在基于Hadoop平台的很多应用场景中,我们需要对数据进行离线和实时分析,离线分析可以很容易地借助于Hive来实现统计分析,但是对于实时的需求Hive就不合适了。实时应用场景可以使用Storm,它是一个实时处理系统,它为实时处理类应用提供了一个计算模型,可以很容易地进行编程处理。为了统一离线和实时计算,一般情况下,我们都希望将离线和实时计算的数据源的集合统一起来作为输入,然后将数据的流向分别经由实时系统和离线分析系统,分别进行分析处理,这时我们可以考虑将数据源(如使用Flume收集日志)直接连接一个消息中间件,如Kafka,可以整合Flume+Kafka,Flume作为消息的Producer,生产的消息数据(日志数据、业务请求数据等等)发布到Kafka中,然后通过订阅的方式,使用Storm的Topology作为消息的Consumer,在Storm集群中分别进行如下两个需求场景的处理:
直接使用Storm的Topology对数据进行实时分析处理 整合Storm+HDFS,将消息处理后写入HDFS进行离线分析处理实时处理,只要开发满足业务需要的Topology即可,不做过多说明。这里,我们主要从安装配置Kafka、Storm,以及整合Kafka+Storm、整合Storm+HDFS、整合Kafka+Storm+HDFS这几点来配置实践,满足上面提出的一些需求。配置实践使用的软件包如下所示:
zookeeper-3.4.5.tar.gz kafka_2.9.2-0.8.1.1.tgz apache-storm-0.9.2-incubating.tar.gz hadoop-2.2.0.tar.gz程序配置运行所基于的操作系统为CentOS 5.11。
Kafka安装配置
我们使用3台机器搭建Kafka集群:
192.168.4.142 h1这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定:
zookeeper.connect=h1:2181,h2:2181,h3:2181/kafka而且,需要手动在ZooKeeper中创建路径/kafka,使用如下命令连接到任意一台ZooKeeper服务器:
cd /usr/local/zookeeper可以通过查看日志,或者检查进程状态,保证Kafka集群启动成功。
我们创建一个名称为my-replicated-topic5的Topic,5个分区,并且复制因子为3,执行如下命令:
查看创建的Topic,执行如下命令:
bin/kafka-topics.sh --describe --zookeeper h1:2181,h2:2181,h3:2181/kafka --topic my-replicated-topic5结果信息如下所示:
Topic:my-replicated-topic5 PartitionCount:5 ReplicationFactor:3 Configs:我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
在一个终端,启动Producer,并向我们上面创建的名称为my-replicated-topic5的Topic中生产消息,执行如下脚本:
在另一个终端,启动Consumer,并订阅我们上面创建的名称为my-replicated-topic5的Topic中生产的消息,执行如下脚本:
bin/kafka-console-consumer.sh --zookeeper h1:2181,h2:2181,h3:2181/kafka --from-beginning --topic my-replicated-topic5可以在Producer终端上输入字符串消息行,然后回车,就可以在Consumer终端上看到消费者消费的消息内容。
也可以参考Kafka的Producer和Consumer的Java API,通过API编码的方式来实现消息生产和消费的处理逻辑。
Storm安装配置
Storm集群也依赖Zookeeper集群,要保证Zookeeper集群正常运行。Storm的安装配置比较简单,我们仍然使用下面3台机器搭建:
192.168.4.142 h1Storm集群的主节点为Nimbus,从节点为Supervisor,我们需要在h1上启动Nimbus服务,在从节点h2、h3上启动Supervisor服务:
bin/storm nimbus为了方便监控,可以启动Storm UI,可以从Web页面上监控Storm Topology的运行状态,例如在h2上启动:
bin/storm ui这样可以通过访问http://h2:8080/来查看Topology的运行状况。
整合Kafka+Storm
消息通过各种方式进入到Kafka消息中间件,比如可以通过使用Flume来收集日志数据,然后在Kafka中路由暂存,然后再由实时计算程序Storm做实时分析,这时我们就需要将在Storm的Spout中读取Kafka中的消息,然后交由具体的Spot组件去分析处理。实际上,apache-storm-0.9.2-incubating这个版本的Storm已经自带了一个集成Kafka的外部插件程序storm-kafka,可以直接使用,例如我使用的Maven依赖配置,如下所示:
dependency下面,我们开发了一个简单WordCount示例程序,从Kafka读取订阅的消息行,通过空格拆分出单个单词,然后再做词频统计计算,实现的Topology的代码,如下所示:
package org.shirdrn.storm.examples;public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5
builder.setBolt("word-splitter", new KafkaWordSplitter(),2).shuffleGrouping("kafka-reader");
builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));
上面程序,在本地调试(使用LocalCluster)不需要输入任何参数,提交到实际集群中运行时,需要传递一个参数,该参数为Nimbus的主机名称。
通过Maven构建,生成一个包含依赖的single jar文件(不要把Storm的依赖包添加进去),例如storm-examples-0.0.1-SNAPSHOT.jar,在提交Topology程序到Storm集群之前,因为用到了Kafka,需要拷贝一下依赖jar文件到Storm集群中的lib目录下面:
bin/storm jar /home/storm/storm-examples-0.0.1-SNAPSHOT.jar org.shirdrn.storm.examples.MyKafkaTopology h1
可以通过查看日志文件(logs/目录下)或者Storm UI来监控Topology的运行状况。如果程序没有错误,可以使用前面我们使用的Kafka Producer来生成消息,就能看到我们开发的Storm Topology能够实时接收到并进行处理。
上面Topology实现代码中,有一个很关键的配置对象SpoutConfig,配置属性如下所示:
该配置是指,如果该Topology因故障停止处理,下次正常运行时是否从Spout对应数据源Kafka中的该订阅Topic的起始位置开始读取,如果forceFromStart=true,则之前处理过的Tuple还要重新处理一遍,否则会从上次处理的位置继续处理,保证Kafka中的Topic数据不被重复处理,是在数据源的位置进行状态记录。
整合Storm+HDFS
Storm实时计算集群从Kafka消息中间件中消费消息,有实时处理需求的可以走实时处理程序,还有需要进行离线分析的需求,如写入到HDFS进行分析。下面实现了一个Topology,代码如下所示:
package org.shirdrn.storm.examples;"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35",
"10001 ffb52739a29348a67952e47c12da54ef 4.3 GT-I9300 samsung 2 50:CC:F8:E4:22:E2 2014-10-13 12:36:02",
"10001 ef2da82d4c8b49c44199655dc14f39f6 4.2.1 HUAWEI G610-U00 HUAWEI 2 70:72:3c:73:8b:22 2014-10-13 12:36:35"
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
builder.setBolt("hdfs-bolt", hdfsBolt, 2).fieldsGrouping("event-spout", newFields("minute"));
上面的处理逻辑,可以对HdfsBolt进行更加详细的配置,如FileNameFormat、SyncPolicy、FileRotationPolicy(可以设置在满足什么条件下,切出一个新的日志,如可以指定多长时间切出一个新的日志文件,可以指定一个日志文件大小达到设置值后,再写一个新日志文件),更多设置可以参考storm-hdfs,。
上面代码在打包的时候,需要注意,使用storm-starter自带的Maven打包配置,可能在将Topology部署运行的时候,会报错,可以使用maven-shade-plugin这个插件,如下配置所示:
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/
整合Kafka+Storm+HDFS
上面分别对整合Kafka+Storm和Storm+HDFS做了实践,可以将后者的Spout改成前者的Spout,从Kafka中消费消息,在Storm中可以做简单处理,然后将数据写入HDFS,最后可以在Hadoop平台上对数据进行离线分析处理。下面,写了一个简单的例子,从Kafka消费消息,然后经由Storm处理,写入到HDFS存储,代码如下所示:
package org.shirdrn.storm.examples;public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {
SyncPolicy syncPolicy = new CountSyncPolicy(1000); // sync the filesystem after every 1k tuples
FileRotationPolicy rotationPolicy = new TimedRotationPolicy(1.0f, TimeUnit.MINUTES); // rotate files
.withPath("/storm/").withPrefix("app_").withExtension(".log"); // set file name format
builder.setBolt("to-upper", new KafkaWordToUpperCase(),3).shuffleGrouping("kafka-reader");
上面代码中,名称为to-upper的Bolt将接收到的字符串行转换成大写以后,会将处理过的数据向后面的hdfs-bolt、realtime这两个Bolt各发一份拷贝,然后由这两个Bolt分别根据实际需要(实时/离线)单独处理。
打包后,在Storm集群上部署并运行这个Topology:
可以通过Storm UI查看Topology运行情况,可以查看HDFS上生成的数据。
使用EMR-Flume同步Kafka数据到HDFS Flume是一个分布式、可靠和高效的数据汇聚系统,其source、channel和sink的结构设计,不仅实现了数据生产者与消费者的解耦,还提供了数据缓冲的功能。一个比较通用的使用场景是使用Flume将Kafka的数据按照时间分区同步至HDFS,进行实时的流式分析或离线统计。
Mysql 流增量写入 Hdfs(二) --Storm + hdfs 的流式处理 一. 概述 上一篇我们介绍了如何将数据从 mysql 抛到 kafka,这次我们就专注于利用 storm 将数据写入到 hdfs 的过程,由于 storm 写入 hdfs 的可定制东西有些多,我们先不从 kafka 读取,而先自己定义一个 Spout 数据充当数据源,下章再进行整合。
大数据分布式架构单点故障详解(Hdfs+Yarn+HBase+Spark+Storm)构建HA高可用架构 本文梳理了常见的hadoop生态圈中的组件:Hdfs+Yarn+HBase+Spark+Storm的单点故障问题,出现原因以及单点故障的原理和解决方案(构建HA(High Available)高可用架构)。阅读本文之前,最好了解清楚各组件的架构原理。
https://www.confluent.io/product/confluent-platform/ http://blog.csdn.net/amghost/article/details/44258841 http://blog.
相关文章
- kafka概念
- 试述Hadoop的HDFS及其组成_hadoop命令和hdfs命令区别
- 安装一个开发用的kafka(不带zookeeper)
- Kafka入门实战教程(5):吞吐量与可靠性的实践
- kafka学习之Kafka 的简介(一)
- Linux 搭建 Kafka教程[通俗易懂]
- Kafka 删除topic_kafka自动创建topic
- Kafka教程_图解kafka
- 2-网站日志分析案例-日志采集:Flume-Kafka-Flume-HDFS
- 【Kafka】使用Java实现数据的生产和消费
- 消息中间件 Kafka
- 面试系列-kafka exactly once语义
- HBase数据迁移到Kafka实战详解大数据
- HBase2实战:HBase Flink和Kafka整合详解大数据
- Kafka Eagle 源码解读详解大数据
- Kafka 生产者消费者java示例代码详解编程语言
- kafka源码解析之十六生产者流程(客户端如何向topic发送数据)详解编程语言
- kafka源码解析之三Broker的启动详解编程语言
- kafka原理解析-《Learning Apache Kafka, 2nd Edition.pdf》详解编程语言
- 使用Linux命令查看HDFS文件系统详细信息(linux查看hdfs)
- Linux安装Kafka:一步一步操作指南(linux安装kafka)
- 利用HDFS实现MySQL数据的快速导入(hdfs 导入mysql)
- Linux下搭建Kafka Stream架构的实践(linux kafka)
- 利用Kafka与Oracle实现实时数据交换(kafka oracle)
- 比较分析HDFS与Oracle数据库的异同(hdfs oracle)