[Flume][Kafka]Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)
Flume 与 Kakfa结合例子(Kakfa 作为flume 的sink 输出到 Kafka topic)
进行准备工作:
$sudo mkdir -p /flume/web_spooldir
$sudo chmod a+w -R /flume
编辑 flume的配置文件:
$ cat /home/tester/flafka/spooldir_kafka.conf
# Name the components on this agent
agent1.sources = weblogsrc
agent1.sinks = kafka-sink
agent1.channels = memchannel
# Configure the source
agent1.sources.weblogsrc.type = spooldir
agent1.sources.weblogsrc.spoolDir = /flume/web_spooldir
agent1.sources.weblogsrc.channels = memchannel
# Configure the sink
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic = weblogs
agent1.sinks.kafka-sink.brokerList = localhost:9092
agent1.sinks.kafka-sink.batchSize = 20
agent1.sinks.kafka-sink.channel = memchannel
# Use a channel which buffers events in memory
agent1.channels.memchannel.type = memory
agent1.channels.memchannel.capacity = 100000
agent1.channels.memchannel.transactionCapacity = 1000
$
运行 Flume-ng:
$ flume-ng agent --conf /etc/flume-ng/conf \
> --conf-file spooldir_kafka.conf \
> --name agent1 -Dflume.root.logger=INFO,console
输出类似如下:
Info: Sourcing environment configuration script /etc/flume-ng/conf/flume-env.sh
Info: Including Hadoop libraries found via (/usr/bin/hadoop) for HDFS access
Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar from classpath
Info: Including HBASE libraries found via (/usr/bin/hbase) for HBASE access
Info: Excluding /usr/lib/hbase/bin/../lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/lib/hbase/bin/../lib/slf4j-log4j12.jar from classpath
Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar from classpath
Info: Excluding /usr/lib/hadoop/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/lib/hadoop/lib/slf4j-log4j12.jar from classpath
Info: Excluding /usr/lib/zookeeper/lib/slf4j-api-1.7.5.jar from classpath
Info: Excluding /usr/lib/zookeeper/lib/slf4j-log4j12-1.7.5.jar from classpath
Info: Excluding /usr/lib/zookeeper/lib/slf4j-log4j12.jar from classpath
Info: Including Hive libraries found via () for Hive access
+ exec /usr/java/default/bin/java -Xmx500m -Dflume.root.logger=INFO,console -cp '/etc/flume-ng/conf:/usr/lib/flume-
ng/lib/*:/etc/hadoop/conf:/usr/lib/hadoop/lib/activation-1.1.jar:/usr/lib/hadoop/lib/apacheds-i18n-2.0.0-M15.jar
...
-Djava.library.path=:/usr/lib/hadoop/lib/native:/usr/lib/hadoop/lib/native:/usr/lib/hbase/bin/../lib/native/Linux-amd64-64
org.apache.flume.node.Application --conf-file spooldir_kafka.conf --name agent1
2017-10-23 01:15:11,209 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start
(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2017-10-23 01:15:11,223 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider
$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:spooldir_kafka.conf
2017-10-23 01:15:11,256 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty
(FlumeConfiguration.java:1017)] Processing:kafka-sink
...
2017-10-23 01:15:11,933 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start
(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: weblogsrc started
2017-10-23 01:15:13,003 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Verifying properties
2017-10-23 01:15:13,271 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property
key.serializer.class is overridden to kafka.serializer.StringEncoder
2017-10-23 01:15:13,271 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property
metadata.broker.list is overridden to localhost:9092
2017-10-23 01:15:13,277 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property
request.required.acks is overridden to 1
2017-10-23 01:15:13,277 (lifecycleSupervisor-1-1) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Property serializer.class
is overridden to kafka.serializer.DefaultEncoder
2017-10-23 01:15:13,718 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register
(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: kafka-sink: Successfully registered new MBean.
2017-10-23 01:15:13,719 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start
(MonitoredCounterGroup.java:96)] Component type: SINK, name: kafka-sink started
...
2017-10-23 01:15:13,720 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents
(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2017-10-23 01:15:13,720 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile
(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-01-13.log to
/flume/web_spooldir/2014-01-13.log.COMPLETED
..
2017-10-23 01:16:11,441 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents
(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2017-10-23 01:16:11,451 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile
(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-01-24.log to
/flume/web_spooldir/2014-01-24.log.COMPLETED
2017-10-23 01:16:11,818 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents
(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2017-10-23 01:16:11,819 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile
(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-02-15.log to
/flume/web_spooldir/2014-02-15.log.COMPLETED
执行kafka consumer 程序:
$kafka-console-consumer --zookeeper localhost:2181 --topic weblogs
在另外的一个终端窗口,向/flume/web_spooldir 目录输入 web log:
cp -rf /home/tester/weblogs /tmp/tmp_weblogs
mv /tmp/tmp_weblogs/* /flume/web_spooldir
rm -rf /tmp/tmp_weblogs
Flume-ng 窗口显示的内容(正在传输log文件到Kafka topic weblogs):
2017-10-23 01:36:28,436 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents
(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2017-10-23 01:36:28,449 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile
(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2013-09-22.log to
/flume/web_spooldir/2013-09-22.log.COMPLETED
2017-10-23 01:36:28,971 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents
(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
...
2017-10-23 01:37:39,011 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile
(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-02-19.log to
/flume/web_spooldir/2014-02-19.log.COMPLETED
2017-10-23 01:37:39,386 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents
(ReliableSpoolingFileEventReader.java:258)] Last read took us just up to a file boundary. Rolling to the next file, if there is one.
2017-10-23 01:37:39,386 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.rollCurrentFile
(ReliableSpoolingFileEventReader.java:348)] Preparing to move file /flume/web_spooldir/2014-03-09.log to
/flume/web_spooldir/2014-03-09.log.COMPLETED
Consumer 窗口,输出 所有 web 文件的内容(接收 topic weblogs,获得所有web log 内容):
...
213.125.211.10 - 66543 [09/Mar/2014:00:00:14 +0100] "GET /KBDOC-00131.html HTTP/1.0" 200 9807 "http://www.tester.com" "tester
test 001"
213.125.211.10 - 66543 [09/Mar/2014:00:00:14 +0100] "GET /theme.css HTTP/1.0" 200 6448 "http://www.tester.com" "tester test 002"
$kafka-console-consumer --zookeeper localhost:2181 --topic weblogs
相关文章
- Python 操作 Kafka --- kafka-python
- 使用Python进行 kafka的生产与消费
- Kafka-序列化器与反序列化器的使用(自定义消息类型)
- Kafka-再均衡监听器
- Kafka-为什么选择kafka(kafka的优点)
- 消息中间件RabbitMQ、Kafka、RocketMQ正确选型姿势
- 132 Kafka 查找message
- 《kafka中文手册》-快速开始(三)
- kafka可视化客户端工具(Kafka Tool)的基本使用
- kafka的consumer消费能力很低的情况下的处理方案
- Kafka_Kafka 消费者 偏移量 与 积压 查询脚本 kafka-consumer-groups.sh
- Kafka - kafka生产者 消费者 监听配置属性
- 详解Kafka Producer
- kafka集群搭建
- 【kafka源码】/log_dir_event_notification的LogDir脱机事件通知
- 一图全解Kafka在zookeeper中的数据结构
- Kafka的灵魂伴侣Logi-KafkaManger(6)之专家服务(分区热点分区不足)
- Kafka集群管理工具kafka-manager的安装使用
- kafka可视化客户端工具(Kafka Tool)的基本使用
- 大数据学习——Storm+Kafka+Redis整合
- Kafka实战:集群SSL加密认证和配置(最新版kafka-2.7.0)
- kafka生产者性能监控:Monitor Kafka Producer for Performance
- KAFKA value foreach is not a member of org.apache.kafka.clients.consumer.ConsumerRecords
- kafka producer batch expired TimeoutException: KAFKA-5621、KIP-91(Provide Intuitive User Timeouts in The Producer)、KAFKA-5886
- ranger kafka - Authorizing Kafka access over non-authenticated channel via Ranger
- kafka-rest:A Comprehensive, Open Source REST Proxy for Kafka
- 4、Kafka的流数据处理平台——Confluent Platform入门教程
- 9. ubuntu16.04安装配置confluent平台并使用Kafka、KSQL进行流式操作
- 【采集层】Kafka 与 Flume 如何选择(转)
- 漫游kafka实战篇之搭建Kafka开发环境
- k8s部署kafka集群
- Apache Kafka: Next Generation Distributed Messaging System---reference
- Linux安装gssapi、kafka-python实现Kerberos认证连接Kafka等中间件集群服务(最新版)