Flink 源码走读(一)
2023-06-13 09:15:00 时间
实例以yarn-per-job为例。
flink提交作业是通过flink run进行提交的,可以从提交脚本中看到启动类即程序的入口是:
org.apache.flink.client.cli.CliFrontend
定位到源码中main函数,查看执行逻辑
/** Submits the job based on the arguments. */
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
System.exit(retCode);
}
}
进入parseAndRun 方法,开始到真正执行过程
public int parseAndRun(String[] args) {
// check for action
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action
String action = args[0];
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(
commitID.equals(EnvironmentInformation.UNKNOWN)
? ""
: ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
System.out.println();
System.out.println(
"Valid actions are \"run\", \"run-application\", \"list\", \"info\", \"savepoint\", \"stop\", or \"cancel\".");
System.out.println();
System.out.println(
"Specify the version option (-v or --version) to print Flink version.");
System.out.println();
System.out.println(
"Specify the help option (-h or --help) to get help on the command.");
return 1;
}
} catch (CliArgsException ce) {
return handleArgException(ce);
} catch (ProgramParametrizationException ppe) {
return handleParametrizationException(ppe);
} catch (ProgramMissingJobException pmje) {
return handleMissingJobException();
} catch (Exception e) {
return handleError(e);
}
}
由action常量可知有 run 、 run-application、info、list、cancel、stop、savepoint几种action
相关文章
- 大数据ClickHouse(十九):Flink 写入 ClickHouse API
- 快收藏!优化 Apache Flink 应用程序的 7 个技巧!
- Flink 自定义SQL实现Hudi MOR表压缩
- Flink CDC 新一代数据集成框架
- Flink状态管理
- 什么是Flink?Flink能用来做什么?[通俗易懂]
- flink中文社区_flink demo
- 【Flink教程-已解决】在idea中测试flink的时候,提示读取文件时候错误,提示文件不存在解决方案
- Flink作业反压处理
- Flink运行方式及对比
- Flink开发-生成Jar(Maven依赖处理)
- Flink实战-安装及部署
- Flink创始团队二次创业再被收购,Kafka母公司与阿里“遭遇战”已经开始
- 【Flink SQL】Apache Calcite 架构剖析
- Flink 1.17发布后数据开发领域需要关注的一些点
- Flink读取Oracle数据源的研究(flink读oracle)
- Flink技术与Oracle数据库结合,助力数据分析(flink与oracle)