zl程序教程

您现在的位置是:首页 >  其它

当前栏目

Flume-ng启动过程分析

分析 启动 过程 NG flume
2023-09-14 08:58:00 时间
//加载flume的配置文件,初始化Sink,Source,Channel的工厂类 PropertiesFileConfigurationProvider configurationProvider = new PropertiesFileConfigurationProvider(agentName, configurationFile); application = new Application(); //configurationProvider.getConfiguration()中实例化Sink,Source,Channel application.handleConfigurationEvent(configurationProvider.getConfiguration());-------getConfiguration------ //Map用于存储所有Sink,Source,Channel Map String, ChannelComponent channelComponentMap = Maps.newHashMap(); Map String, SourceRunner sourceRunnerMap = Maps.newHashMap(); Map String, SinkRunner sinkRunnerMap = Maps.newHashMap(); //先实例化channel loadChannels(agentConf, channelComponentMap); //将Source对应的channel注册到ChannelSelector,Source通过ChannelSelector获取Channel loadSources(agentConf, channelComponentMap, sourceRunnerMap); //向Sink注册Channel loadSinks(agentConf, channelComponentMap, sinkRunnerMap); conf.addChannel(channelName, channelComponent.channel); for(Map.Entry String, SourceRunner entry : sourceRunnerMap.entrySet()) { conf.addSourceRunner(entry.getKey(), entry.getValue()); for(Map.Entry String, SinkRunner entry : sinkRunnerMap.entrySet()) { conf.addSinkRunner(entry.getKey(), entry.getValue()); return conf //application.handleConfigurationEvent(conf)---- stopAllComponents(); startAllComponents(conf); final Application appReference = application; //关闭程序时,调用的钩子 Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") { @Override public void run() { appReference.stop(); });

startAllComponents(conf):

//通过LifecycleSupervisor类启动组件//启动MonitorRunnable,监控Channelfor (Entry String, Channel entry :

 materializedConfiguration.getChannels().entrySet()) {

 try{

 logger.info("Starting Channel " + entry.getKey());

 supervisor.supervise(entry.getValue(),

 new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);

 } catch (Exception e){

 logger.error("Error while starting {}", entry.getValue(), e);

 //等待启动

 for(Channel ch: materializedConfiguration.getChannels().values()){

 while(ch.getLifecycleState() != LifecycleState.START

 !supervisor.isComponentInErrorState(ch)){

 try {

 logger.info("Waiting for channel: " + ch.getName() +

 " to start. Sleeping for 500 ms");

 Thread.sleep(500);

 } catch (InterruptedException e) {

 logger.error("Interrupted while waiting for channel to start.", e);

 Throwables.propagate(e);

 //启动MonitorRunnable,监控sink

 for (Entry String, SinkRunner entry : materializedConfiguration.getSinkRunners()

 .entrySet()) {

 try{

 logger.info("Starting Sink " + entry.getKey());

 supervisor.supervise(entry.getValue(),

 new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);

 } catch (Exception e) {

 logger.error("Error while starting {}", entry.getValue(), e);

 //启动MonitorRunnable,监控source

 for (Entry String, SourceRunner entry : materializedConfiguration

 .getSourceRunners().entrySet()) {

 try{

 logger.info("Starting Source " + entry.getKey());

 supervisor.supervise(entry.getValue(),

 new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);

 } catch (Exception e) {

 logger.error("Error while starting {}", entry.getValue(), e);

 this.loadMonitoring();
LifecycleSupervisor.java

负责启动和监控Flume组件的类,功能如:失败重启组件
LifecycleSupervisor内部比较重要的几个变量:

//监控进程的线程池

ScheduledThreadPoolExecutor monitorService

Map LifecycleAware, ScheduledFuture ? monitorFutures

Map LifecycleAware, Supervisoree supervisedProcesses
//启动监控

 public synchronized void supervise(LifecycleAware lifecycleAware,

 SupervisorPolicy policy, LifecycleState desiredState) {

 //组件状态

 Supervisoree process = new Supervisoree();

 process.status = new Status();

 process.policy = policy;

 process.status.desiredState = desiredState;

 process.status.error = false;

 //监控线程,调用启动组件的线程。比如

 MonitorRunnable monitorRunnable = new MonitorRunnable();---- 

 lifecycleAware.start();--- 

 //如果是sink

 sinRunner.start()---- 

 runnerThread = new Thread(runner);

 runnerThread.setName("SinkRunner-PollingRunner-" +

 policy.getClass().getSimpleName());

 runnerThread.start();---- 

 //在新线程里循环调用

 DefaultSinkProcessor.process();---- 

 //sink从channel中取数据,进行处理

 sink.process();


monitorRunnable.lifecycleAware = lifecycleAware; monitorRunnable.supervisoree = process; monitorRunnable.monitorService = monitorService; supervisedProcesses.put(lifecycleAware, process); //每隔三秒监控组件运行状况 ScheduledFuture ? future = monitorService.scheduleWithFixedDelay( monitorRunnable, 0, 3, TimeUnit.SECONDS); monitorFutures.put(lifecycleAware, future); }

 这个文章是flume源码系列的开门篇,也是本人第一次阅读flume相关的代码,文章会参考一些已有的文章并结合个人的理解,当然所有参考的文章我会在文章末尾给出链接以表示尊重原创。
Flume作为Hadoop中的日志采集工具,非常的好用,但是在安装Flume的时候,查阅很多资料,发现形形色色,有的说安装Flume很简单,有的说安装Flume很复杂,需要依赖zookeeper,所以一方面说直接安装Flume,解压即可用,还有一方面说需要先装了Zookeeper才可以安装Flume。
1 .背景  flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。