当前位置: 首页 > news >正文

聊聊flink的checkpoint配置

为什么80%的码农都做不了架构师?>>>   hot3.png

本文主要研究下flink的checkpoint配置

实例

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
  • 使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode);interval用于指定checkpoint的触发间隔(单位milliseconds),而CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE
  • 也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以
  • checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉
  • minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1
  • maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)
  • enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state
  • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行

flink-conf.yaml相关配置

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false
  • state.backend用于指定checkpoint state存储的backend,默认为none
  • state.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持async的state backend可能会忽略这个参数
  • state.backend.fs.memory-threshold,默认为1024,用于指定存储于files的state大小阈值,如果小于该值则会存储在root checkpoint metadata file
  • state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend会忽略该配置
  • state.backend.local-recovery,默认为false
  • state.checkpoints.dir,默认为none,用于指定checkpoint的data files和meta data存储的目录,该目录必须对所有参与的TaskManagers及JobManagers可见
  • state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数
  • state.savepoints.dir,默认为none,用于指定savepoints的默认目录
  • taskmanager.state.local.root-dirs,默认为none

小结

  • 可以通过使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode)
  • checkpoint的高级配置可以配置checkpointTimeout(用于指定checkpoint执行的超时时间,单位milliseconds),minPauseBetweenCheckpoints(用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint),maxConcurrentCheckpoints(用于指定运行中的checkpoint最多可以有多少个,如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数大于1的值不起作用),enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state无法自动清理,但是在job canceled的时候可以配置是删除还是保留state)
  • 在flink-conf.yaml里头也有checkpoint的相关配置,主要是state backend的配置,比如state.backend.async、state.backend.incremental、state.checkpoints.dir、state.savepoints.dir等

doc

  • Checkpointing

转载于:https://my.oschina.net/go4it/blog/2985851

相关文章:

  • 堆的python实现及其应用
  • 创建一种深思熟虑的文化
  • 亚马逊Alexa借助神经网络生成播音员声音
  • 将VCSA 6.5添加到AD域
  • nginx 4层tcp代理获取真实ip
  • 刘鹏教授在新闻出版大数据应用管理技术专题培训班上作报告!
  • Mybatis配置返回为修改影响条数
  • spring源码-aop源码-5.1
  • 洛谷P2805 植物大战僵尸
  • python之上下文管理器与contextlib
  • 数据类型之函数笔记
  • Flutter redux 进阶
  • 为什么携程要做好持续交付?
  • 变频电源老化测试重要吗?需要做老化测试吗
  • JS笔记1
  • 《Java8实战》-第四章读书笔记(引入流Stream)
  • 2018天猫双11|这就是阿里云!不止有新技术,更有温暖的社会力量
  • Angular数据绑定机制
  • JAVA并发编程--1.基础概念
  • JAVA多线程机制解析-volatilesynchronized
  • Js基础——数据类型之Null和Undefined
  • sessionStorage和localStorage
  • 当SetTimeout遇到了字符串
  • 基于Mobx的多页面小程序的全局共享状态管理实践
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 配置 PM2 实现代码自动发布
  • 批量截取pdf文件
  • 前端之React实战:创建跨平台的项目架构
  • 用Python写一份独特的元宵节祝福
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • $con= MySQL有关填空题_2015年计算机二级考试《MySQL》提高练习题(10)
  • (11)工业界推荐系统-小红书推荐场景及内部实践【粗排三塔模型】
  • (C#)获取字符编码的类
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (大众金融)SQL server面试题(1)-总销售量最少的3个型号的车及其总销售量
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (蓝桥杯每日一题)平方末尾及补充(常用的字符串函数功能)
  • (论文阅读11/100)Fast R-CNN
  • (三) diretfbrc详解
  • .desktop 桌面快捷_Linux桌面环境那么多,这几款优秀的任你选
  • .NET BackgroundWorker
  • .NET Micro Framework初体验(二)
  • .net MySql
  • @Builder用法
  • @property python知乎_Python3基础之:property
  • [ 网络基础篇 ] MAP 迈普交换机常用命令详解
  • [ACM] hdu 1201 18岁生日
  • [Android]竖直滑动选择器WheelView的实现
  • [ArcPy百科]第三节: Geometry信息中的空间参考解析
  • [BZOJ4337][BJOI2015]树的同构(树的最小表示法)
  • [C++]二叉搜索树
  • [daily][archlinux][game] 几个linux下还不错的游戏
  • [HeadFrist-HTMLCSS学习笔记][第一章Web语言:开始了解HTML]