Flume-ng启动过程分析
分析 启动 过程 NG flume
2023-09-14 08:58:00 时间
//加载flume的配置文件,初始化Sink,Source,Channel的工厂类
PropertiesFileConfigurationProvider configurationProvider =
new PropertiesFileConfigurationProvider(agentName,
configurationFile);
application = new Application();
//configurationProvider.getConfiguration()中实例化Sink,Source,Channel
application.handleConfigurationEvent(configurationProvider.getConfiguration());-------getConfiguration------
//Map用于存储所有Sink,Source,Channel
Map String, ChannelComponent channelComponentMap = Maps.newHashMap();
Map String, SourceRunner sourceRunnerMap = Maps.newHashMap();
Map String, SinkRunner sinkRunnerMap = Maps.newHashMap();
//先实例化channel
loadChannels(agentConf, channelComponentMap);
//将Source对应的channel注册到ChannelSelector,Source通过ChannelSelector获取Channel
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
//向Sink注册Channel
loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
conf.addChannel(channelName, channelComponent.channel);
for(Map.Entry String, SourceRunner entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
for(Map.Entry String, SinkRunner entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
return conf
//application.handleConfigurationEvent(conf)----
stopAllComponents();
startAllComponents(conf);
final Application appReference = application;
//关闭程序时,调用的钩子
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
@Override
public void run() {
appReference.stop();
});
这个文章是flume源码系列的开门篇,也是本人第一次阅读flume相关的代码,文章会参考一些已有的文章并结合个人的理解,当然所有参考的文章我会在文章末尾给出链接以表示尊重原创。
Flume作为Hadoop中的日志采集工具,非常的好用,但是在安装Flume的时候,查阅很多资料,发现形形色色,有的说安装Flume很简单,有的说安装Flume很复杂,需要依赖zookeeper,所以一方面说直接安装Flume,解压即可用,还有一方面说需要先装了Zookeeper才可以安装Flume。
1 .背景 flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。
startAllComponents(conf):
//通过LifecycleSupervisor类启动组件//启动MonitorRunnable,监控Channelfor (Entry String, Channel entry : materializedConfiguration.getChannels().entrySet()) { try{ logger.info("Starting Channel " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e){ logger.error("Error while starting {}", entry.getValue(), e); //等待启动 for(Channel ch: materializedConfiguration.getChannels().values()){ while(ch.getLifecycleState() != LifecycleState.START !supervisor.isComponentInErrorState(ch)){ try { logger.info("Waiting for channel: " + ch.getName() + " to start. Sleeping for 500 ms"); Thread.sleep(500); } catch (InterruptedException e) { logger.error("Interrupted while waiting for channel to start.", e); Throwables.propagate(e); //启动MonitorRunnable,监控sink for (Entry String, SinkRunner entry : materializedConfiguration.getSinkRunners() .entrySet()) { try{ logger.info("Starting Sink " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); //启动MonitorRunnable,监控source for (Entry String, SourceRunner entry : materializedConfiguration .getSourceRunners().entrySet()) { try{ logger.info("Starting Source " + entry.getKey()); supervisor.supervise(entry.getValue(), new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START); } catch (Exception e) { logger.error("Error while starting {}", entry.getValue(), e); this.loadMonitoring();LifecycleSupervisor.java
负责启动和监控Flume组件的类,功能如:失败重启组件
LifecycleSupervisor内部比较重要的几个变量:
//监控进程的线程池 ScheduledThreadPoolExecutor monitorService Map LifecycleAware, ScheduledFuture ? monitorFutures Map LifecycleAware, Supervisoree supervisedProcesses
//启动监控 public synchronized void supervise(LifecycleAware lifecycleAware, SupervisorPolicy policy, LifecycleState desiredState) { //组件状态 Supervisoree process = new Supervisoree(); process.status = new Status(); process.policy = policy; process.status.desiredState = desiredState; process.status.error = false; //监控线程,调用启动组件的线程。比如 MonitorRunnable monitorRunnable = new MonitorRunnable();---- lifecycleAware.start();--- //如果是sink sinRunner.start()---- runnerThread = new Thread(runner); runnerThread.setName("SinkRunner-PollingRunner-" + policy.getClass().getSimpleName()); runnerThread.start();---- //在新线程里循环调用 DefaultSinkProcessor.process();---- //sink从channel中取数据,进行处理 sink.process();
monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); //每隔三秒监控组件运行状况 ScheduledFuture ? future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future); }
这个文章是flume源码系列的开门篇,也是本人第一次阅读flume相关的代码,文章会参考一些已有的文章并结合个人的理解,当然所有参考的文章我会在文章末尾给出链接以表示尊重原创。
Flume作为Hadoop中的日志采集工具,非常的好用,但是在安装Flume的时候,查阅很多资料,发现形形色色,有的说安装Flume很简单,有的说安装Flume很复杂,需要依赖zookeeper,所以一方面说直接安装Flume,解压即可用,还有一方面说需要先装了Zookeeper才可以安装Flume。
1 .背景 flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。
相关文章
- 【Redis源码】Redis 启动过程分析
- 记一次SpringBoot启动异常,jar问题的排查分析
- EasyDSS v3.2.4内核无法启动的原因分析及解决办法
- 数量生态学冗余分析(RDA)分析植物多样性物种数据结果可视化|附代码数据
- Spring启动过程中实例化前部分的分析
- SpringBoot启动原理源码分析-第一篇
- 学校AI视频行为分析监测系统
- 【Kafka】Kafka-Server-start.sh 启动脚本分析(Ver 2.7.2)
- SAP UI5 Fiori 应用在启动时向 ABAP 后台发起的 OData 请求序列的顺序和作用分析
- 无法分析响应内容,因为 Internet Explorer 引擎不可用,或者 Internet Explorer 的首次启动配置不完整
- 【RocketMq】NameServ启动脚本分析(Ver4.9.4)
- ChIP-seq 分析:Call Peak(8)
- 【Netty源码分析】02 Netty Server 启动流程 下
- 【Android 安全】DEX 加密 ( Application 替换 | Android 应用启动原理 | ActivityThread 后续分析 | Application 替换位置 )
- 【Android 插件化】Hook 插件化框架 ( 从 Hook 应用角度分析 Activity 启动流程 一 | Activity 进程相关源码 )
- 【Java 并发编程】线程池机制 ( 测试线程开销 | 启动线程分析 | 用户态 | 内核态 | 用户线程 | 内核线程 | 轻量级进程 )
- 【Android 启动过程】Activity 启动源码分析 ( Activity -> AMS、主线程阶段 )
- 【Android 启动过程】Activity 启动源码分析 ( AMS -> ActivityThread、AMS 线程阶段 二 )
- 【Android 启动过程】Activity 启动源码分析 ( ActivityThread -> Activity、主线程阶段 一 )
- 【Android 启动过程】Activity 启动源码分析 ( ActivityThread -> Activity、主线程阶段 二 )
- MySQL-8.0.32 启动失败问题的分析
- Oracle硬解析和软解析的区别分析
- Android启动过程分析详解手机开发
- NodeManager代码分析之NodeManager启动过程详解大数据
- 解决服务mssql启动出错的实例分析(启动服务mssql出错)
- 步骤Oracle中开启审计后步骤分析(oracle中启动审计后)
- PHP语言中global和$GLOBALS[]的分析之二
- SQLServer2005安装提示服务无法启动原因分析及解决
- 深入Java分布式计算的使用分析