vant coupon 时间戳如何计算_flink入门(八)中的时间戳如何使用?Watermark使用及原理...
flink中的时间戳如何使用?---Watermark使用及原理
1.Watermark简介
Watermark是flink为了处理eventTime窗口计算提出的一种机制,本质上也是一种时间戳.
2.Watermark 作用
watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。3.如何利用Watermark
3.1 StreamExecutionEnvironment设置时间标识
// -------------------------------------------------------------------------------------------- // Time characteristic // --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
*
If you set the characteristic to IngestionTime of EventTime this will set a default * watermark update interval of 200 ms. If this is not applicable for your application * you should change it using {
@link ExecutionConfig#setAutoWatermarkInterval(long)}. * * @param characteristic The time characteristic. */ @PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { this.timeCharacteristic = Preconditions.checkNotNull(characteristic); if (characteristic == TimeCharacteristic.ProcessingTime) { getConfig().setAutoWatermarkInterval(0); } else { getConfig().setAutoWatermarkInterval(200); } } /** * Gets the time characteristic. * * @see #setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic) * * @return The time characteristic. */ @PublicEvolving public TimeCharacteristic getStreamTimeCharacteristic() { return timeCharacteristic; }
其中,TimeCharacteristic是个枚举类,定义了系统的基准时间
/**
* The time characteristic defines how the system determines time for time-dependent
* order and operations that depend on time (such as time windows). */@PublicEvolvingpublic enum TimeCharacteristic { /**
* Processing time for operators means that the operator uses the system clock of the machine
* to determine the current time of the data stream. Processing-time windows trigger based
* on wall-clock time and include whatever elements happen to have arrived at the operator at
* that point in time.
*
*
Using processing time for window operations results in general in quite non-deterministic * results, because the contents of the windows depends on the speed in which elements arrive. * It is, however, the cheapest method of forming windows and the method that introduces the * least latency.
*/ ProcessingTime, /** * Ingestion time means that the time of each individual element in the stream is determined * when the element enters the Flink streaming data flow. Operations like windows group the * elements based on that time, meaning that processing speed within the streaming dataflow * does not affect windowing, but only the speed at which sources receive elements. * *
Ingestion time is often a good compromise between processing time and event time. * It does not need any special manual form of watermark generation, and events are typically * not too much out-or-order when they arrive at operators; in fact, out-of-orderness can * only be introduced by streaming shuffles or split/join/union operations. The fact that * elements are not very much out-of-order means that the latency increase is moderate, * compared to event * time.
*/ IngestionTime, /** * Event time means that the time of each individual element in the stream (also called event) * is determined by the event's individual custom timestamp. These timestamps either exist in * the elements from before they entered the Flink streaming dataflow, or are user-assigned at * the sources. The big implication of this is that it allows for elements to arrive in the * sources and in all operators out of order, meaning that elements with earlier timestamps may * arrive after elements with later timestamps. * *
Operators that window or order data with respect to event time must buffer data until they * can be sure that all timestamps for a certain time interval have been received. This is * handled by the so called "time watermarks". * *
Operations based on event time are very predictable - the result of windowing operations * is typically identical no matter when the window is executed and how fast the streams * operate. At the same time, the buffering and tracking of event time is also costlier than * operating with processing time, and typically also introduces more latency. The amount of * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the * time span between the arrival of early and late elements is. With respect to the * "time watermarks", this means that the cost typically depends on how early or late the * watermarks can be generated for their timestamp. * *
In relation to {
@link #IngestionTime}, the event time is similar, but refers the the * event's original time, rather than the time assigned at the data source. Practically, that * means that event time has generally more meaning, but also that it takes longer to determine * that all elements for a certain time have arrived. */ EventTime }
三个时间分别代表:事件生成时间EventTime,事件接入时间IngestionTime,事件处理时间ProcessingTime
3.2 Watermark的产生
/**
* A Watermark tells operators that no elements with a timestamp older or equal
* to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
* sources and propagate through the operators of the topology. Operators must themselves emit
* watermarks to downstream operators using
* {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that
* do not internally buffer elements can always forward the watermark that they receive. Operators
* that buffer elements, such as window operators, must forward a watermark after emission of
* elements that is triggered by the arriving watermark.
*
*
In some cases a watermark is only a heuristic and operators should be able to deal with * late elements. They can either discard those or update the result and emit updates/retractions * to downstream operations. * *
When a source closes it will emit a final watermark with timestamp {
@code Long.MAX_VALUE}. * When an operator receives this it will know that no more input will be arriving in the future. */
其中,Output定义如下:
/**
* A {@link org.apache.flink.streaming.api.operators.StreamOperator} is supplied with an object
* of this interface that can be used to emit elements and other messages, such as barriers
* and watermarks, from an operator.
*
* @param The type of the elements that can be emitted. */
Watermark的产生方法
/**
* Emits a {@link Watermark} from an operator. This watermark is broadcast to all downstream
* operators.
*
*
A watermark specifies that no element with a timestamp lower or equal to the watermark * timestamp will be emitted in the future.
*/ void emitWatermark(Watermark mark);
3.3 Operator 处理Watermark
OneInputStreamOperator#processElement
TwoInputStreamOperator#processElement1
TwoInputStreamOperator#processElement2
operator的关系类图
以WindowOperator为例
@Override public void processElement(StreamRecord element) throws Exception { final Collection elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext); //if element is handled by none of assigned elementWindows
boolean isSkippedElement = true; final K key = this.getKeyedStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet mergingWindows = getMergingWindowSet(); for (W window: elementWindows) { // adding the new window might result in a merge, in that case the actualWindow // is the merged window and we work with that. If we don't merge then // actualWindow == window
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction() {
@Override public void merge(W mergeResult,
Collection mergedWindows, W stateWindowResult,
Collection mergedStateWindows) throws Exception { if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) { throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) { throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
" window: " + mergeResult);
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows); for (W m: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
} // merge the merged state windows into the newly resulting state window windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
}); // drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow); continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow); if (stateWindow == null) { throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) {
ACC contents = windowState.get(); if (contents == null) { continue;
}
emitWindowContents(actualWindow, contents);
} if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(actualWindow);
} // need to make sure to update the merging state in state mergingWindows.persist();
} else { for (W window: elementWindows) { // drop if the window is already late
if (isWindowLate(window)) { continue;
}
isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element); if (triggerResult.isFire()) {
ACC contents = windowState.get(); if (contents == null) { continue;
}
emitWindowContents(window, contents);
} if (triggerResult.isPurge()) {
windowState.clear();
}
registerCleanupTimer(window);
}
} // side output input event if // element not handled by any window // late arriving tag has been set // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) { if (lateDataOutputTag != null){
sideOutput(element);
} else { this.numLateRecordsDropped.inc();
}
}
}
总结:
流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。此时就是watermark发挥作用了,它表示当达到watermark到达之后,在watermark之前的数据已经全部达到(即使后面还有延迟的数据)
参考资料
【1】https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/event_time.html
【2】https://blog.csdn.net/lmalds/article/details/52704170
【3】https://www.jianshu.com/p/7d524ef8143c