当前位置: 首页 > news >正文

flink理论干货笔记(7)及spark论文相关思考

601. 注册指标,可以从getRuntimeContext(). getMetricGroup()返回MetricGroup对象,然后就能创建和注册新指标;度量类型包括Counters、Gauges、Histograms、Meters

602. Counter是计数器,方法有inc()、dec(),也可以自定义Counter;Gauges是测量,来自接口flink.metrics.Gauge,返回值类型没有限制;Histogram是直方图,方法有update,可以利用flink-metrics-dropwizard注册一个Codahale/DropWizard直方图;Meter是仪表,可用markEvent()注册事件的发生,meter()注册仪表。可用flink-metrics-dropwizard打包器注册仪表

603. 范围(scope)包括系统范围和用户范围,用metrics.scope.delimiter分割。用户范围可用MetricGroup#addGroup和#getMetricIndentifier以及#getScopeComponents。系统范围包含上下文信息,比如任务属于哪个作业,配置有metrics.scope.jm/tm/task/operator等

604. reporter是报告,具体配置有metrics.reporter.name.xxx等,可通过flink. metrics. reporter.  MetricReporter接口编写自己的报告,比如JMX报告、Ganglia报告、Graphite报告、Prometheus报告、PrometheusPushGateway报告、StatsD报告、Datadog报告、Slf4j报告等。

605. 系统指标包括5列:范围、中缀、度量、描述、类型。具体指标大类分为cpu、内存、线程、垃圾收集、类加载器、网络、集群、可用性、检查点、io、连接器、系统资源、系统cpu、系统内存、系统网络等。

606. 延迟跟踪要设置latencyTrackingInterval,此时源会定期发送特殊记录叫做LatencyMarker,所有中间算子都会保存每个源的最后一个延迟列表,以计算延迟分布。建议延迟指标只用于调试,因为会影响性能。

607. rest api可以查询度量指标,包括TM、JM、job等信息; 日志可使用log4j或logback,前者配置有log4j-cli.prooerties、log4j-yarn-session.properties、log4j.properties;logback配置有-Dlogback. configuration等,文件有logback.xml

608. 历史服务器提供了rest api,可查询JM归档的已完成作业的状态和统计信息。用historyserver.sh 启动,端口为8082;历史服务器的相关配置有historyserver.archive.fs.dir、historyserver.archive.fs.refresh-interval以及historyserver.web.tmpdir

609. 监控检查点包括:检查点计数(各种状态的)、最新完成的检查点、最新失败的检查点、最新保存点、最新还原

610. 检查点历史选项卡包括:id、状态、触发时间、最新确认、状态大小、端到端持续时间、对齐期间缓冲

611. 反压是指生成数据比下游算子消耗的速度快,于是下游记录往相反方向传播到上游。sink向上游算子施加压力

612. 采样线程,是反压检测通过反复获取正在运行的任务的堆栈跟踪样本来工作,即Thread.getStackTrace(),如果线程卡在某个内部方法调用,则存在反压。默认情况,JM为每个任务每50ms触发100个堆栈跟踪,来确定反压。

613. 以下配置JM的样本数:web. backpressure. refresh-interval、web.backpressure. num-samples、web. backpressure. delay-between-samples;反压状态有ok、low、high,其中high表示任务被加压,用红色表示 

614. flink可以监控正在运行的作业的状态和统计信息,和完成的作业,用rest api表示。默认监听8081,可在rest.port配置。它和web仪表盘服务器在同一端口

615. rest api在flink-runtime中,具体是runtime.webmonitor.WebMonitorEndpoint,用netty和netty router库来处理rest请求。相关接口包括MessageHeaders、AbstractRestHandler等,实例有JobExceptionsHandler和JobExceptionHeaders

616. rest api具体分为/cluster、/config、/jars、/jobmanager、/jobs、/overview、/savepoint-disposal、/taskmanagers,其中/jobs下的url最多

617. /jobs可以取消job(用cancel)、使用保存点取消job(用cancel-with-savepoint) ;/jars可以上传jar(用post)

618. 可用currentLowWatermark监控最低水印,表示当前事件时间,该值由上游算子收到的所有水印的最小值来计算。

619. flink的类加载包括,java类路径(jdk库、flink依赖的库),和动态用户代码(动态提交的作业);来自会话(rest/cli)提交的作业的类都是动态加载的

620. yarn的类加载情况:当直接向yarn提交作业时,会启动专用TM/JM,此时不涉及动态类加载,因为jvm在类路径同时有flink框架类和用户代码类。在启动yarn会话时,所有作业的类都是动态加载的。mesos的类加载情况同yarn ;docker/k8s的类加载,作业代码都是动态加载的

621. 动态类加载中,有两个类加载的层次,1.java应用程序类加载器,包含所有类 2. 动态用户代码类加载器,从用户jar加载类。默认情况,flink反转类加载顺序。好处是可以使用与flink核心不同的库版本,可避免常见的依赖冲突。但也可能导致问题,需要用classloader. resolve-order配置解析顺序,可以是parent-first或child-first

622. 避免动态类加载,而是将jar放到/lib目录,注意AppClassLoader是FlinkUserCodeClassLoader的父级,最终导致只加载一次类。有时需要手动加载类(通过反射动态加载),需要可以访问作业类的类加载器 

623. xxx can not be cast xxx异常表示该类的多个版本由不同类加载器加载,原因是库与类加载方法不兼容。

624. maven-shade-plugin允许编译后更改类的包,可以解决与flink的依赖冲突

625. flink可自定义日志记录概述,在env.java.opts.jobmanager和env. java. opts. taskmanager,或者FLINK_LOG_PREFIX

626. java flight recorder可用于性能分析,也是事件收集框架。其实在jvm参数配置,例如-XX:+FlightRecorder;JITWatch是hotspot jit的日志分析器和可视化工具。

627. checkstyle idea插件需要配置,并在tools/maven下创建checkstyle.xml,还要配置checkstyle.suppressions.file,最终会针对任何checkstyle违规行为发出警告。可以扫描整个模块。此外还有scala的checkstyle,对应tools/maven/scalastyle_config.xml 

628. maven3.3.x构建flink会有问题?不会正确遮蔽某些依赖项?因此建议用maven3.2.5?

629. 要选择正确版本的hadoop,一般只能从hadoop2.4.0开始支持flink,可以在mvn命令最后加上-Dhadoop.version=2.6.1;也可以使用cdh版的hadoop,比如2.6.1-cdh5.0.0 ;构建flink要选择正确版本的scala,比如scala2.11或2.12

630. 若出现File name too long的异常,需要在pom.xml中设置-Xmax-classfile-name,具体是scala-maven-plugin下

631. TM中有多个slot,每个slot都可运行一个并行任务管道。内部用SlotSharingGroup和CoLocationGroup决定哪些任务可以共享哪些slot

632. JM会跟踪分布式任务,决定何时安排下一个任务,并对已完成的任务或执行失败做出反应。JM接收JobGraph,它由算子(JobVertex)和中间结果(IntermediateDataSet)组成的数据流表示。JM将JobGraph转换为ExecutionGraph,后者是JG的并行版本。对于每个JobVertex,都有并行的ExecutionVertex,而EV会跟踪特定子任务的执行状态。来自同一个JV的所有EV都保存在一个ExecutionJobVertex中。除了顶点,EG还包含IntermediateResult和IntermediateResultPartition,前者跟踪IntermediateDataSet状态,后者是每个分区的状态。

633. 作业的阶段有created、running、finished、cancelling、cancelled、restarted、failing、failed、suspended这些状态

634. 任务也有生命周期,StreamTask是flink流处理中所有不同任务子类型的基础。问题:任务生命周期和算子生命周期啥关系?

635. 算子生命周期包括:初始化阶段(setup方法,udf为setRuntimeContext;initializeState方法; open方法,udf为open方法)、处理阶段(processElement方法,udf为run方法;processWatermark方法)、检查点阶段(snapshotState方法)、终止阶段(close方法,udf为close方法;dispose方法)

636. 任务生命周期,包括常规无故障执行,和由于某些原因被取消(即中断执行)。前者主要是setInitialState、invoke、setup-operators、task-specific-init、initialize-operator-states、run、close-operators、dispose-operators、task-specific-cleanup、common-cleanup。 

637. 思考:flink python 流api用的是Jython框架。和pyflink啥关系? 

638. env.create_python_source(…).flat_map(…).key_by(…).time_window(…).reduce(…).output()
env. execute() ; 一些重要的类有SourceFunction、FlatMapFunction、KeySelector、ReduceFunction等

639. env.read_text_file(…)可获得一个DataStream,而data.write_as_text(…)可写入文件。而data.output()只对本地机器的开发调试有用。

640. flink python的流api算子有map、flat_map、filter、key_by、reduce、count_window、time_window、apply、union、split、select、iterate等 

641. flink python流数据源可用以下方式获取:read_text_file、from_elements、generate_sequence;flink python流数据写入可用write_as_text、output、write_to_socket等

642. env.set_parallelism()可指定运行环境级别并行度而系统级别的并行度要在flink-conf.yaml指定,具体是parallelism.default参数

643. 问题:bin下的pyflink-stream.sh没了,改成pyflink-shell.sh了? 

644. flink python批api常见类有GroupReduceFunction等,可自定义类Adder用在下面的reduce_group中,用法举例:
env=get_environment()
data. flat_map(…).group_by(…).reduce_group(Adder(),…).output()
env. execute(local=True) 

645. flink-conf.yaml的python.binary.path设置二进制文件;flink 支持python3吗?

646. flink python的批api算子有map、flat_map、map_partition、filter、reduce、reduce_group、sum、max、min、aggregate、and_agg、join、where、equal_to、co_group、cross、zip_with_index、group_by、union等 

647. flink python批数据源有read_text、read_csv、from_elements、generate_sequence;python元组映射到flink Tuple类型,它包含各种类型的固定数量的字段(最多25个);数据接收方式有write_text、write_csv、output  

648. 广播变量允许为算子操作的所有并行实例提供数据集,通过名称注册with_broadcast_set(DataSet,String),访问使用self. context. get_broadcast_variable(String) 

649. 问题:流计算有广播变量吗? bin下的pyflink.sh没了,改成pyflink-shell.sh了?

650. 问题:spark streaming有水印吗?flink有慢节点的说法吗?flink的dataset也是像rdd那样弹性的吗? 

651. 问题:flink的流批一体是说流和批都统一用一套底层api(或流处理引擎)?还是说,所有数据都要么看成无限流,要么看成有限流? 

652. 思考:spark才是真正的流批一体吧,因为不管流还是批,都用的RDD(或者dataframe、dataset)!? 

653. 问题:flink中有像spark rdd那样通用的数据结构和类吗?flink dataset能和spark rdd类比吗?前者不是不可变的? 

654. 每个rdd有一个持久化的优先级,来指定内存中的哪些数据被优先写入到磁盘; RDD的persist方法表明该RDD在后续操作还会用到;RDD是不可变的,不需要创建快照

655. RDD[Int]是一个整型rdd,但一般忽略类型是因为scala可以进行类型推断;spark用反射解决scala的闭包对象问题

656. spark的transformation算子有map、filter、flapmap、sample、groupbykey、reducebykey、union、join、cogroup、crossProduct、mapValues、sort、partitionBy;spark的action算子有count、collect、reduce、lookup、save

657. 问题:RDD根据Partitioner类来划分数据集?比如基于哈希或范围划分?flink dataset和rdd一样,也是有分区的吗?应该是

658. rdd利用血缘关系恢复,比用检查点恢复节省时间;spark.textFile(…).map(…).partitionBy (myFun).persist() ;spark和flink基本分区策略一样,也是hash和range;join一般来说是一种宽依赖。除非父RDD已经基于hash策略被划分过了

659. RDD中,partitions表示文件中每个文件块的分区(包含文件块在每个分区对象的偏移量),preferredLocations表示文件块所在的节点,iterator表示读取这些文件块

660. 在RDD上调用map,会返回一个MappedRDD对象,这个对象与其父对象有相同的分区和preferredLocations

661. 两个RDD调用union会返回一个RDD,每个子分区都是通过窄依赖于同一个父级分区计算而来。注意union不会丢弃重复的值。

662. sample即抽样,类似于映射。RDD会为每一个分区保存一个种子来确定如何对父级记录采样。 

663. join即连接两个rdd,可能会产生两个宽、两个窄依赖,或一宽一窄。怎么理解? 如果两个rdd都是一样的hash/范围划分策略,那么就会产生窄依赖?如果一个父rdd有一种策略,而另一个不具有,那么同时产生窄和宽依赖?结果rdd都是一个划分策略,要么继承父rdd,要么是默认的hash划分策略

664. yarn等调度器对spark的任务调度,是根据数据存储(即数据本地性)来确定的。比如某个分区数据刚好在内存中,那么该任务会分配给该节点。

665. 如果节点故障,任务失败,那么会将该stage的任务转移到其他节点重新执行。如果某个任务很慢,那么会在其他节点上执行该任务的拷贝。最先得到的结果为最终的结果。spark为何那么聪明,源码怎么写?(虽然mapreduce也这么做) 

666. rdd模型将作业分解为多个相互独立的任务,这使的多用户集群能够共享资源。spark允许多线程同时提交作业,同时允许公平调度,就像Hadoop fair scheduler那样。同时支持延迟调度,即支持数据本地化访问。甚至允许取消作业来为高优先级的作业腾出资源。spark也能使用mesos和yarn等。spark还使用Sparrow系统来扩展支持分布式调度,允许多个spark应用以去中性化方式在同一集群上排队。

667. spark解析器(shell)本质上就是基于scala解析器,可以进行交互式查询。spark解析器在scala基础上做了两个改变:类传输用http,以及代码生成器的改动。

668. rdd可以存于内存或磁盘,而前者分为未序列化对象和序列化对象。对于内存情况,使用LRU回收算法管理,为最新的rdd分区腾出内存空间。

669. 多个spark实例可以共享rdd,通过Tachyon实现(目前叫Alluxio)。 思考:flink的多个作业能否基于alluxio共享?  事实上除了flink spark,甚至hbase storm kafka Hadoop tf caffe 都是可以基于alluxio的!另外,alluxio的作者李浩源和spark的作者都是uc伯克利大学的博士!!后者是alluxio的第三作者!

670. 用血缘关系恢复rdd,对于很长的血缘是很耗时的,因此需要检查点,特别是有宽依赖的长血缘,比如pagerank。

671. 用REPLICATE标志来持久化为rdd设置检查点。现在spark的检查点是否支持自动化?根据数据集大小和所需时间,自动决定哪些rdd需要检查点。

672. 由于rdd的不可变特性,使的它的检查点更容易,即不需要关注一致性问题,同时可在后台进行,而不需要暂停程序或分布式快照。

673. pregel和spark啥关系?基于spark pregel实现的pagerank?pregel是谷歌针对迭代图应用的模型。也有超步的概念。rdd可以表达pregel模型的关键是,每次迭代都是将相同的自定义函数作用于所有节点。

674. 那么是否可以基于rdd实现列式存储? 已知rdd的每条记录都有作为唯一标识的key,能用于数据划分。另外rdd的不可变性的灵感来源于java字符串的不可变?基于机器学习的数据分析方法比基于数仓的方法更高级?

675. spark sql/shark能代替传统数仓?因为hive是离线的,需要从磁盘读取,而rdd是在内存中的? spark sql是按列压缩存储的? spark sql使用局部DAG执行(PDE)来扩展,从而优化sql查询。在开始执行后,基于观测到的统计数据,选择一个更好的连接策略或更合适的并发度。 

676. spark sql和hive是兼容的,支持hive所有sql和udf,同时引入复杂的分析函数增强了sql,支持java/scala/python,实验表明spark sql是hive的100倍。mapreduce不存在数据划分,而spark存在? 

677. spark sql的执行包括:查询解析、生成逻辑计划、生成物理计划。查询解析得到抽象语法树,然后转换成逻辑计划。然后进行基本的逻辑优化,如谓词下推,得到物理计划。 

678. 与hive不同,spark sql的优化器采用额外的规则优化,如推送LIMIT到各个分区,并创建由rdd转换而不是mapreduce任务构成的物理计划。

679. spark内存存储的默认方式是将数据分区作为jvm对象的集合存储。spark sql将基本类型的列以jvm原始数组形式存储。每一列仅创建一个jvm对象,可以带来快速的gc和紧凑的数据表示。spark sql的压缩手段有字典编码、游程编码、位填充。列式存储能带来更好的缓存行为,特别是需要在特定列上频繁聚合的分析查询。 

680. spark sql允许两个表基于公共键进行协同划分,提供快速的join操作,使用distribute by来声明表,指定对某个列进行划分。

681. 映射修剪是基于自然聚合列对数据进行分区修剪的过程。PDE(局部dag执行)是在分布式环境中动态的查询优化,它允许运行时收集数据统计信息进行动态改变查询计划。PDE会在全局收集可定制的统计信息,以及物化map任务输出时每个分区的粒度。然后基于这些信息来改变dag,或通过选择不同的操作,或改变其参数(如并行度)。 

682. PDE收集的统计信息可通过可插拔累加器来自定义,如分区大小、记录计数(用于数据倾斜检测),“重量级列表”(经常出现在数据集中的项目),近似直方图(用来估计分区的数据布局)。PDE统计信息由worker传送给master,然后汇总给优化器。master就能进行各种运行优化。一些优化例子包括join算法选择、并行度、倾斜处理。

683. join算法选择取决于表的大小(决定是广播连接还是shuffle连接)。并行度选择基于单个分区的大小来决定运行时reduce任务的数目。同时会将许多小的细粒度分区合并为粗粒度的分区,来减少reduce任务数。

684. 问题:目前PDE在spark sql还是spark core中? flink有类似PDE的动态优化吗?比如能否动态改变并行度?  基准测试和微基准测试啥关系? 

685. spark sql比hive快的其中一个原因是前者使用了基于哈希的shuffle,而后者只能用基于排序的,因为排序比哈希的计算量大。spark sql能够进行较好的估算,判断出动态优化后某些表会变得很小,就可以执行map join,而不是一开始的shuffle join,从而提高了性能。 

686. 问题:spark有不能从故障中完美恢复的情况吗?sql2rdd函数是spark sql定义的吗?它返回TableRDD? sql可以表达机器学习?应该可以,见gp以及shark。sql也可以直接调用scala,比如调用kmeans或logistic回归。

687. graphx作者不是spark作者,后者只是把它集成到spark,见博士论文引用112

688. DStreams结构将各运算流化成为一系列短时间间隔的无状态、确定性的批计算。Dstreams通过推测性执行来从慢任务中恢复。spark中也能通过rdd整合不同的计算,比如流计算和批处理

689. spark streaming能用于0.5-2s延迟的应用,如网站活动统计数据、集群监控、垃圾邮件检测等,但不适用于高频交易等。思考:spark streaming是无状态的流计算,那它如何处理有状态的流任务?  spark streaming的中间状态保存在rdd中? 

690. counts=ones.runningReduce((a,b)=>a+b)是spark streaming的一个算子举例

691. Dstreams也能像flink那样,处理无序/乱序的记录吗?  可以,Dstreams提供了两种方法:
1) Dstreams可以在每个批次之前等待一个空闲时间
2) 允许用户自行对迟到数据进行纠正。比如增量reduce操作,即在老计数基础上再加新纪录的计数,来避免重复计算。 

692. Dstreams提供了多个有状态的转换操作,包括:窗口(如window)、增量式聚合(如reduceByWindow)、状态跟踪(如updateStateByKey)。events.track(…)用于计算数据流中活跃的会话数。foreachRDD在每一个rdd上执行一段用户代码,例如打印。Dstreams提供了exactly-once的一致性语义。

693. 利用slice和topK算子,计算一段时间内最流行的词:counts.slice("21:00","21:05").topK(10) 

694. Dstreams将任务分为小的且确定性的批量操作,会导致最小的延迟时间变长,但可让系统采用更高效的恢复技术。spark streaming会在集群上进行负载均衡,应对故障或启动慢节点恢复。其中所有状态都以rdd来保存。

695. master跟踪Dstreams血缘,并调度任务来计算新的rdd分区。worker接收数据,保存输入分区和已计算的rdd,并执行任务。client发送数据给系统。注意:master既有rdd血缘,也有Dstreams血缘,同时还有input tracker、block tracker、task scheduler。而woker有input receiver、task execution、block manager

696. spark集群的worker通过ntp进行时钟同步,这样就能确定spark streaming的时间周期。

697. 为了支持spark streaming,作者对spark做了很多修改和优化,比如重写spark数据层、修改spark调度器、重写spark存储层、lineage截断、master恢复等。 Dstreams利用并行恢复和推测执行,来恢复工作节点状态。rdd可以模拟任何分布式系统,且在大多数情况下都是高效的,除非系统对网络延迟非常敏感。

698. 思考:mapreduce也能模拟任何分布式计算任务? mapreduce提供map来执行本地计算,以及reduce来用于所有节点间的通信。与BSP吻合的算法都可以用RDD进行有效的评估。Pregel就是基于BSP的

699. 对于io密集型应用,使用具有数据本地性的rdd非常重要,相对于mapreduce,尽管后者也能很好的处理cpu密集型应用。

700. 问题:rdd集群中的timestep是啥?同一个算子在所有节点执行一次,就叫一个timestep?spark的广播通过BitTorrent实现??见5.4.2 
  


 
 

相关文章:

  • react native 使用阿里字体图标库
  • 本地部署docker实践
  • 【cocos2dx】记录问题,粒子不会通过setOpacity调整整体透明度
  • Docker部署可能遇到的问题
  • Java配置41-搭建Kafka服务器
  • VSCode中ESLint插件修复+配置教程
  • 来!PyFlink 作业的多种部署模式
  • 电脑重装系统后Win11安全中心无法打开如何解决
  • ue4打包出现问题解决[Callstack] 0x00007ffa47e6474c KERNELBASE.dll!UnknownFunction []
  • 【Matlab】状态空间模型的极点配置法 place() 函数
  • Chrome常用插件收集整理
  • [车联网安全自学篇] Android安全之APK内存敏感信息泄露挖掘【静态分析】
  • cpacr_el1等特殊寄存器
  • 代码层走进“百万级”分布式ID设计
  • 开源众包-项目大厅数据爬取
  • 【挥舞JS】JS实现继承,封装一个extends方法
  • 11111111
  • C学习-枚举(九)
  • Essential Studio for ASP.NET Web Forms 2017 v2,新增自定义树形网格工具栏
  • Java|序列化异常StreamCorruptedException的解决方法
  • javascript 哈希表
  • JavaScript-Array类型
  • LeetCode算法系列_0891_子序列宽度之和
  • Linux编程学习笔记 | Linux多线程学习[2] - 线程的同步
  • mysql 5.6 原生Online DDL解析
  • Octave 入门
  • Spark in action on Kubernetes - Playground搭建与架构浅析
  • 经典排序算法及其 Java 实现
  • 利用jquery编写加法运算验证码
  • 移动端 h5开发相关内容总结(三)
  • 用quicker-worker.js轻松跑一个大数据遍历
  • 在electron中实现跨域请求,无需更改服务器端设置
  • scrapy中间件源码分析及常用中间件大全
  • 没有任何编程基础可以直接学习python语言吗?学会后能够做什么? ...
  • 通过调用文摘列表API获取文摘
  • # 学号 2017-2018-20172309 《程序设计与数据结构》实验三报告
  • (2)MFC+openGL单文档框架glFrame
  • (2021|NIPS,扩散,无条件分数估计,条件分数估计)无分类器引导扩散
  • (阿里云万网)-域名注册购买实名流程
  • (二十三)Flask之高频面试点
  • (附源码)spring boot网络空间安全实验教学示范中心网站 毕业设计 111454
  • (规划)24届春招和25届暑假实习路线准备规划
  • (一)SpringBoot3---尚硅谷总结
  • (转)拼包函数及网络封包的异常处理(含代码)
  • (转)自己动手搭建Nginx+memcache+xdebug+php运行环境绿色版 For windows版
  • .axf 转化 .bin文件 的方法
  • .Net MVC4 上传大文件,并保存表单
  • .NET NPOI导出Excel详解
  • .NET 设计模式初探
  • .NET 使用配置文件
  • .NET/C# 项目如何优雅地设置条件编译符号?
  • .NET3.5下用Lambda简化跨线程访问窗体控件,避免繁复的delegate,Invoke(转)
  • .NET设计模式(8):适配器模式(Adapter Pattern)
  • .net专家(高海东的专栏)
  • @vue/cli 3.x+引入jQuery