zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

Flink-checkpoint配置及重启策略

配置flink 策略 重启 CheckPoint
2023-09-11 14:14:34 时间

Flink-checkpoint配置及重启策略

val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    //--------------- checkpoint配置 ----------------
    env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setCheckpointStorage("hdfs://localhost:9083/flink/checkpoint")
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    // 执行checkpoint,JobManager向source发送barrier直到checkpoint生成的最大时间间隔
    env.getCheckpointConfig.setCheckpointTimeout(60000L)
    // 最大并行checkpoint,最多允许出现几个checkpoint
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(2)
    /**
      * 两次checkpoint最小间隔时间(尾到头的时间),如果两次CP间隔为1s,第一次cp耗时800ms,
      * 为保证两次最小间隔为500ms,第二次cp需要向后推移300ms
      * 配置了最小间隔,会使setMaxConcurrentCheckpoints配置失效
      */
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)

    /**
      * 倾向于从checkpoint中进行恢复,即使savepoint数据比checkpoint更近
      * 但是此配置Flink官方已经不推荐使用【https://issues.apache.org/jira/browse/FLINK-20427】
      * 因为使用这种方式会造成数据重复sink,在一些较为严谨的使用场景下,会造成数据异常
      * 不建议使用
      */
    //env.getCheckpointConfig.setPreferCheckpointForRecovery(true)

    /**
      * 最多容忍几次checkpoint失败
      * 如果为0则不容忍任何checkpoint失败,checkpoint失败即任务失败
      */
    env.getCheckpointConfig.setTolerableCheckpointFailureNumber(5)

    //--------------- 重启策略 ----------------
    /**
      * RestartStrategies.RestartStrategyConfiguration fixedDelayRestart(restartAttempts: Int, delayBetweenAttempts: Long)
      * restartAttempts:尝试重启次数
      * delayBetweenAttempts:重启间隔
      * RestartStrategies.fixedDelayRestart(3, 60000L): restartAttempts为尝试重启次数,delayBetweenAttempts为重启间隔
      */
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 60000L))

    /**
      * RestartStrategies.FailureRateRestartStrategyConfiguration failureRateRestart(failureRate: Int, failureInterval: Time, delayInterval: Time)
      * failureRate:失败次数
      * failureInterval:失败时间间隔(总的)
      * delayInterval:两次尝试重启的时间间隔
      */
    env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.minutes(5), Time.seconds(10)))