当前位置: 首页 > news >正文

vant coupon 时间戳如何计算_flink入门(八)中的时间戳如何使用?Watermark使用及原理...

6d4cad8c923cde95066ce86809b546c1.gif点击上方蓝字  关注我们 f3b686be6202f063ead6b2b2a9bed80f.png f3b686be6202f063ead6b2b2a9bed80f.png

flink中的时间戳如何使用?---Watermark使用及原理

 1.Watermark简介

  Watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳.

2.Watermark 作用

watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。3.如何利用Watermark 

   3.1 StreamExecutionEnvironment设置时间标识

// --------------------------------------------------------------------------------------------    //  Time characteristic    // --------------------------------------------------------------------------------------------

    /**
     * Sets the time characteristic for all streams create from this environment, e.g., processing
     * time, event time, or ingestion time.
     *
     * 

If you set the characteristic to IngestionTime of EventTime this will set a default     * watermark update interval of 200 ms. If this is not applicable for your application     * you should change it using {

@link ExecutionConfig#setAutoWatermarkInterval(long)}.     *     * @param characteristic The time characteristic.     */    @PublicEvolving    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);        if (characteristic == TimeCharacteristic.ProcessingTime) {            getConfig().setAutoWatermarkInterval(0);        } else {            getConfig().setAutoWatermarkInterval(200);        }    }    /**     * Gets the time characteristic.     *     * @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)     *     * @return The time characteristic.     */    @PublicEvolving    public TimeCharacteristic getStreamTimeCharacteristic() {        return timeCharacteristic;    }

其中,TimeCharacteristic是个枚举类,定义了系统的基准时间

/**
 * The time characteristic defines how the system determines time for time-dependent
 * order and operations that depend on time (such as time windows). */@PublicEvolvingpublic enum TimeCharacteristic {    /**
     * Processing time for operators means that the operator uses the system clock of the machine
     * to determine the current time of the data stream. Processing-time windows trigger based
     * on wall-clock time and include whatever elements happen to have arrived at the operator at
     * that point in time.
     *
     * 

Using processing time for window operations results in general in quite non-deterministic     * results, because the contents of the windows depends on the speed in which elements arrive.     * It is, however, the cheapest method of forming windows and the method that introduces the     * least latency.    

*/    ProcessingTime,    /**     * Ingestion time means that the time of each individual element in the stream is determined     * when the element enters the Flink streaming data flow. Operations like windows group the     * elements based on that time, meaning that processing speed within the streaming dataflow     * does not affect windowing, but only the speed at which sources receive elements.     *     *

Ingestion time is often a good compromise between processing time and event time.     * It does not need any special manual form of watermark generation, and events are typically     * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can     * only be introduced by streaming shuffles or split/join/union operations. The fact that     * elements are not very much out-of-order means that the latency increase is moderate,     * compared to event     * time.    

*/    IngestionTime,    /**     * Event time means that the time of each individual element in the stream (also called event)     * is determined by the event's individual custom timestamp. These timestamps either exist in     * the elements from before they entered the Flink streaming dataflow, or are user-assigned at     * the sources. The big implication of this is that it allows for elements to arrive in the     * sources and in all operators out of order, meaning that elements with earlier timestamps may     * arrive after elements with later timestamps.     *     *

Operators that window or order data with respect to event time must buffer data until they     * can be sure that all timestamps for a certain time interval have been received. This is     * handled by the so called "time watermarks".     *     *

Operations based on event time are very predictable - the result of windowing operations     * is typically identical no matter when the window is executed and how fast the streams     * operate. At the same time, the buffering and tracking of event time is also costlier than     * operating with processing time, and typically also introduces more latency. The amount of     * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the     * time span between the arrival of early and late elements is. With respect to the     * "time watermarks", this means that the cost typically depends on how early or late the     * watermarks can be generated for their timestamp.     *     *

In relation to {

@link #IngestionTime}, the event time is similar, but refers the the     * event's original time, rather than the time assigned at the data source. Practically, that     * means that event time has generally more meaning, but also that it takes longer to determine     * that all elements for a certain time have arrived.     */    EventTime }

三个时间分别代表:事件生成时间EventTime,事件接入时间IngestionTime,事件处理时间ProcessingTime

d3dc683c907e2743bb0d103b9647b33e.png

3.2 Watermark的产生

/**
 * A Watermark tells operators that no elements with a timestamp older or equal
 * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
 * sources and propagate through the operators of the topology. Operators must themselves emit
 * watermarks to downstream operators using
 * {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that
 * do not internally buffer elements can always forward the watermark that they receive. Operators
 * that buffer elements, such as window operators, must forward a watermark after emission of
 * elements that is triggered by the arriving watermark.
 *
 * 

In some cases a watermark is only a heuristic and operators should be able to deal with * late elements. They can either discard those or update the result and emit updates/retractions * to downstream operations. * *

When a source closes it will emit a final watermark with timestamp {

@code Long.MAX_VALUE}. * When an operator receives this it will know that no more input will be arriving in the future. */

 其中,Output定义如下:

/**
 * A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
 * of this interface that can be used to emit elements and other messages, such as barriers
 * and watermarks, from an operator.
 *
 * @param  The type of the elements that can be emitted. */

Watermark的产生方法

    /**
     * Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
     * operators.
     *
     * 

A watermark specifies that no element with a timestamp lower or equal to the watermark     * timestamp will be emitted in the future.    

*/ void emitWatermark(Watermark mark);

 3.3 Operator 处理Watermark

OneInputStreamOperator#processElement

TwoInputStreamOperator#processElement1 

TwoInputStreamOperator#processElement2 

operator的关系类图

319f3109dbf66a362a060bbdb82d1d0a.png

 以WindowOperator为例

    @Override    public void processElement(StreamRecord element) throws Exception {        final Collection elementWindows = windowAssigner.assignWindows(
            element.getValue(), element.getTimestamp(), windowAssignerContext);        //if element is handled by none of assigned elementWindows
        boolean isSkippedElement = true;        final K key = this.getKeyedStateBackend().getCurrentKey();        if (windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet mergingWindows = getMergingWindowSet();            for (W window: elementWindows) {                // adding the new window might result in a merge, in that case the actualWindow                // is the merged window and we work with that. If we don't merge then                // actualWindow == window
                W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction() {
                    @Override                    public void merge(W mergeResult,
                            Collection mergedWindows, W stateWindowResult,
                            Collection mergedStateWindows) throws Exception {                        if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {                            throw new UnsupportedOperationException("The end timestamp of an " +
                                    "event-time window cannot become earlier than the current watermark " +
                                    "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                                    " window: " + mergeResult);
                        } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {                            throw new UnsupportedOperationException("The end timestamp of a " +
                                    "processing-time window cannot become earlier than the current processing time " +
                                    "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
                                    " window: " + mergeResult);
                        }

                        triggerContext.key = key;
                        triggerContext.window = mergeResult;

                        triggerContext.onMerge(mergedWindows);                        for (W m: mergedWindows) {
                            triggerContext.window = m;
                            triggerContext.clear();
                            deleteCleanupTimer(m);
                        }                        // merge the merged state windows into the newly resulting state window                        windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                    }
                });                // drop if the window is already late
                if (isWindowLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);                    continue;
                }
                isSkippedElement = false;

                W stateWindow = mergingWindows.getStateWindow(actualWindow);                if (stateWindow == null) {                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }

                windowState.setCurrentNamespace(stateWindow);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = actualWindow;

                TriggerResult triggerResult = triggerContext.onElement(element);                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();                    if (contents == null) {                        continue;
                    }
                    emitWindowContents(actualWindow, contents);
                }                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(actualWindow);
            }            // need to make sure to update the merging state in state            mergingWindows.persist();
        } else {            for (W window: elementWindows) {                // drop if the window is already late
                if (isWindowLate(window)) {                    continue;
                }
                isSkippedElement = false;

                windowState.setCurrentNamespace(window);
                windowState.add(element.getValue());

                triggerContext.key = key;
                triggerContext.window = window;

                TriggerResult triggerResult = triggerContext.onElement(element);                if (triggerResult.isFire()) {
                    ACC contents = windowState.get();                    if (contents == null) {                        continue;
                    }
                    emitWindowContents(window, contents);
                }                if (triggerResult.isPurge()) {
                    windowState.clear();
                }
                registerCleanupTimer(window);
            }
        }        // side output input event if        // element not handled by any window        // late arriving tag has been set        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {            if (lateDataOutputTag != null){
                sideOutput(element);
            } else {                this.numLateRecordsDropped.inc();
            }
        }
    }

总结:

流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。此时就是watermark发挥作用了,它表示当达到watermark到达之后,在watermark之前的数据已经全部达到(即使后面还有延迟的数据)

 参考资料

【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html

【2】https://blog.csdn.net/lmalds/article/details/52704170

【3】https://www.jianshu.com/p/7d524ef8143c

相关文章:

  • python按esc结束循环_當按ESC鍵時,立即讓python退出程序
  • python 元组和列表区别_Python高级数据类型之列表、元组
  • 微软商店账户登录一直转圈_Win 10再曝致命BUG,微软:暂不清楚问题根源
  • vlookup两个条件匹配_自从学会了INDEX+MATCH,我就再也不用VLOOKUP了
  • 32位svn 64位操作系统_如何选择安装32位系统还是64位系统
  • python生产和消费模型_Python线程+队列实现生产消费模型,Pythonthreadingqueue
  • python中变量_Python中的变量与常量
  • 安卓工控主板运行时会自动重启_安卓工控机发展已入高峰 自主可控将成会为发展关键...
  • python性能测试方法_Python性能测试之performance
  • opencv 多线程_深度学习和OpenCV对象检测(MobileNet SSD多进程视频流实时识别)
  • linkedblockingqueue使用_关于Java多线程及线程池的使用看这篇就够了
  • pyserial库是python语言用于_Python 串口读写的实现方法
  • 如何用python画函数_如何用python画常值函数?
  • 磁珠 符号_「硬见小百科」0欧姆电阻、磁珠、电感的应用
  • python练手题_3道趣味Python题,非常适合小白练手
  • 2017 前端面试准备 - 收藏集 - 掘金
  • fetch 从初识到应用
  • JavaScript 基本功--面试宝典
  • Java知识点总结(JavaIO-打印流)
  • js数组之filter
  • node 版本过低
  • Redux系列x:源码分析
  • Ruby 2.x 源代码分析:扩展 概述
  • Shadow DOM 内部构造及如何构建独立组件
  • XForms - 更强大的Form
  • 安装python包到指定虚拟环境
  • - 概述 - 《设计模式(极简c++版)》
  • 手机app有了短信验证码还有没必要有图片验证码?
  • 一个JAVA程序员成长之路分享
  • Oracle Portal 11g Diagnostics using Remote Diagnostic Agent (RDA) [ID 1059805.
  • 长三角G60科创走廊智能驾驶产业联盟揭牌成立,近80家企业助力智能驾驶行业发展 ...
  • (delphi11最新学习资料) Object Pascal 学习笔记---第7章第3节(封装和窗体)
  • (JSP)EL——优化登录界面,获取对象,获取数据
  • (zhuan) 一些RL的文献(及笔记)
  • (顶刊)一个基于分类代理模型的超多目标优化算法
  • (非本人原创)史记·柴静列传(r4笔记第65天)
  • (附源码)计算机毕业设计ssm基于Internet快递柜管理系统
  • (附源码)计算机毕业设计SSM基于健身房管理系统
  • (转)linux下的时间函数使用
  • (转)大道至简,职场上做人做事做管理
  • (转)机器学习的数学基础(1)--Dirichlet分布
  • ***检测工具之RKHunter AIDE
  • .NET LINQ 通常分 Syntax Query 和Syntax Method
  • [2016.7 test.5] T1
  • [autojs]autojs开关按钮的简单使用
  • [C#基础]说说lock到底锁谁?
  • [c]扫雷
  • [C++]打开新世界的大门之C++入门
  • [C++]二叉搜索树
  • [Docker]十.Docker Swarm讲解
  • [IE技巧] 使IE8以单进程的模式运行
  • [javaSE] GUI(事件监听机制)
  • [LeetCode] Max Points on a Line
  • [LeetCode]-Integer to Roman 阿拉伯数字转罗马数字
  • [Linux内存管理-分页机制]—把一个虚拟地址转换为物理地址