@InterfaceStability.Unstable public final class Windows extends java.lang.Object
Windows.
Groups incoming messages in a MessageStream into finite windows for processing.
Each window is uniquely identified by its WindowKey. A window can have one or more associated
Triggers that determine when results from the Window are emitted. Each emitted result contains one
or more messages in the window and is called a WindowPane.
A window can have early triggers that allow emitting WindowPanes speculatively before all data
for the window has arrived, or late triggers that allow handling late arrivals of data.
window wk1
+--------------------------------+
------------+--------+-----------+
| | | |
| pane 1 |pane2 | pane3 |
+-----------+--------+-----------+
-----------------------------------
incoming message stream ------+
-----------------------------------
window wk2
+---------------------+---------+
| pane 1| pane 2 | pane 3 |
| | | |
+---------+-----------+---------+
window wk3
+----------+-----------+---------+
| | | |
| pane 1 | pane 2 | pane 3|
| | | |
+----------+-----------+---------+
A Window can be one of the following types:
MessageStream into sessions.
A session captures some period of activity over a MessageStream.
The boundary for a session is defined by a sessionGap. All messages that that arrive within
the gap are grouped into the same session.
A Window is said to be "keyed" when the incoming messages are first grouped based on their key
and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants
of the window types above.
The value for the window can be updated incrementally by providing an initialValue Supplier
and an aggregating FoldLeftFunction. The initial value supplier is invoked every time a new window is
created. The aggregating function is invoked for each incoming message for the window. If these are not provided,
the emitted WindowPane will contain a collection of messages in the window.
Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of finer granularity are not supported.
| Modifier and Type | Method and Description |
|---|---|
static <M,K> Window<M,K,java.util.Collection<M>> |
keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn,
java.time.Duration sessionGap)
Creates a
Window that groups incoming messages into sessions per-key based on the provided
sessionGap. |
static <M,K,WV> Window<M,K,WV> |
keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn,
java.time.Duration sessionGap,
java.util.function.Supplier<? extends WV> initialValue,
FoldLeftFunction<? super M,WV> aggregator)
Creates a
Window that groups incoming messages into sessions per-key based on the provided
sessionGap and applies the provided fold function to them. |
static <M,K> Window<M,K,java.util.Collection<M>> |
keyedTumblingWindow(java.util.function.Function<? super M,? extends K> keyFn,
java.time.Duration interval)
Creates a
Window that groups incoming messages into fixed-size, non-overlapping
processing time based windows using the provided keyFn. |
static <M,K,WV> Window<M,K,WV> |
keyedTumblingWindow(java.util.function.Function<? super M,? extends K> keyFn,
java.time.Duration interval,
java.util.function.Supplier<? extends WV> initialValue,
FoldLeftFunction<? super M,WV> aggregator)
Creates a
Window that groups incoming messages into fixed-size, non-overlapping processing
time based windows based on the provided keyFn and applies the provided fold function to them. |
static <M> Window<M,java.lang.Void,java.util.Collection<M>> |
tumblingWindow(java.time.Duration duration)
Creates a
Window that groups incoming messages into fixed-size, non-overlapping
processing time based windows. |
static <M,WV> Window<M,java.lang.Void,WV> |
tumblingWindow(java.time.Duration interval,
java.util.function.Supplier<? extends WV> initialValue,
FoldLeftFunction<? super M,WV> aggregator)
Creates a
Window that windows values into fixed-size processing time based windows and aggregates
them applying the provided function. |
public static <M,K,WV> Window<M,K,WV> keyedTumblingWindow(java.util.function.Function<? super M,? extends K> keyFn, java.time.Duration interval, java.util.function.Supplier<? extends WV> initialValue, FoldLeftFunction<? super M,WV> aggregator)
Window that groups incoming messages into fixed-size, non-overlapping processing
time based windows based on the provided keyFn and applies the provided fold function to them.
The below example computes the maximum value per-key over fixed size 10 second windows.
MessageStream<UserClick> stream = ...;
Function<UserClick, String> keyFn = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10), maxAggregator));
M - the type of the input messageWV - the type of the WindowPane output valueK - the type of the key in the WindowkeyFn - the function to extract the window key from a messageinterval - the duration in processing timeinitialValue - the initial value supplier for the aggregator. Invoked when a new window is created.aggregator - the function to incrementally update the window value. Invoked when a new message
arrives for the window.Window function.public static <M,K> Window<M,K,java.util.Collection<M>> keyedTumblingWindow(java.util.function.Function<? super M,? extends K> keyFn, java.time.Duration interval)
Window that groups incoming messages into fixed-size, non-overlapping
processing time based windows using the provided keyFn.
The below example groups the stream into fixed-size 10 second windows for each key.
MessageStream<UserClick> stream = ...;
Function<UserClick, String> keyFn = ...;
MessageStream<WindowPane<String, Collection<UserClick>>> windowedStream = stream.window(
Windows.keyedTumblingWindow(keyFn, Duration.ofSeconds(10)));
public static <M,WV> Window<M,java.lang.Void,WV> tumblingWindow(java.time.Duration interval, java.util.function.Supplier<? extends WV> initialValue, FoldLeftFunction<? super M,WV> aggregator)
Window that windows values into fixed-size processing time based windows and aggregates
them applying the provided function.
The below example computes the maximum value per-key over fixed size 10 second windows.
MessageStream<String> stream = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<String, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
MessageStream<WindowPane<Void, Integer>> windowedStream = stream.window(
Windows.tumblingWindow(Duration.ofSeconds(10), maxAggregator));
M - the type of the input messageWV - the type of the WindowPane output valueinterval - the duration in processing timeinitialValue - the initial value supplier for the aggregator. Invoked when a new window is created.aggregator - the function to incrementally update the window value. Invoked when a new message
arrives for the window.Window functionpublic static <M> Window<M,java.lang.Void,java.util.Collection<M>> tumblingWindow(java.time.Duration duration)
Window that groups incoming messages into fixed-size, non-overlapping
processing time based windows.
The below example groups the stream into fixed-size 10 second windows and computes a windowed-percentile.
MessageStream<Long> stream = ...;
Function<Collection<Long>, Long> percentile99 = ..
MessageStream<WindowPane<Void, Collection<Long>>> windowedStream =
integerStream.window(Windows.tumblingWindow(Duration.ofSeconds(10)));
MessageStream<Long> windowedPercentiles =
windowedStream.map(windowPane -> percentile99(windowPane.getMessage());
M - the type of the input messageduration - the duration in processing timeWindow functionpublic static <M,K,WV> Window<M,K,WV> keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn, java.time.Duration sessionGap, java.util.function.Supplier<? extends WV> initialValue, FoldLeftFunction<? super M,WV> aggregator)
Window that groups incoming messages into sessions per-key based on the provided
sessionGap and applies the provided fold function to them.
A session captures some period of activity over a MessageStream.
A session is considered complete when no new messages arrive within the sessionGap. All messages
that arrive within the gap are grouped into the same session.
The below example computes the maximum value per-key over a session window of gap 10 seconds.
MessageStream<UserClick> stream = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c) -> Math.max(parseInt(m), c);
Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
MessageStream<WindowPane<String, Integer>> windowedStream = stream.window(
Windows.keyedSessionWindow(userIdExtractor, Duration.minute(1), maxAggregator));
M - the type of the input messageK - the type of the key in the WindowWV - the type of the output value in the WindowPanekeyFn - the function to extract the window key from a messagesessionGap - the timeout gap for defining the sessioninitialValue - the initial value supplier for the aggregator. Invoked when a new window is created.aggregator - the function to incrementally update the window value. Invoked when a new message
arrives for the window.Window functionpublic static <M,K> Window<M,K,java.util.Collection<M>> keyedSessionWindow(java.util.function.Function<? super M,? extends K> keyFn, java.time.Duration sessionGap)
Window that groups incoming messages into sessions per-key based on the provided
sessionGap.
A session captures some period of activity over a MessageStream. The
boundary for the session is defined by a sessionGap. All messages that that arrive within
the gap are grouped into the same session.
The below example groups the stream into per-key session windows of gap 10 seconds.
MessageStream<UserClick> stream = ...;
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<UserClick, Integer, Integer> maxAggregator = (m, c)-> Math.max(parseIntField(m), c);
Function<UserClick, String> userIdExtractor = m -> m.getUserId()..;
MessageStream<WindowPane<String>, Collection<M>> windowedStream = stream.window(
Windows.keyedSessionWindow(userIdExtractor, Duration.ofSeconds(10)));