M
- the type of messages in this stream@InterfaceStability.Unstable public interface MessageStream<M>
MessageStream
.
A MessageStream
corresponding to an input stream can be obtained using StreamGraph.getInputStream(java.lang.String, org.apache.samza.serializers.Serde<M>)
.
Modifier and Type | Method and Description |
---|---|
MessageStream<M> |
filter(FilterFunction<? super M> filterFn)
Applies the provided function to messages in this
MessageStream and returns the
filtered MessageStream . |
<OM> MessageStream<OM> |
flatMap(FlatMapFunction<? super M,? extends OM> flatMapFn)
Applies the provided 1:n function to transform a message in this
MessageStream
to n messages in the transformed MessageStream |
<K,OM,JM> MessageStream<JM> |
join(MessageStream<OM> otherStream,
JoinFunction<? extends K,? super M,? super OM,? extends JM> joinFn,
Serde<K> keySerde,
Serde<M> messageSerde,
Serde<OM> otherMessageSerde,
java.time.Duration ttl,
java.lang.String id)
|
<OM> MessageStream<OM> |
map(MapFunction<? super M,? extends OM> mapFn)
Applies the provided 1:1 function to messages in this
MessageStream and returns the
transformed MessageStream . |
MessageStream<M> |
merge(java.util.Collection<? extends MessageStream<? extends M>> otherStreams)
Merges all
otherStreams with this MessageStream . |
static <T> MessageStream<T> |
mergeAll(java.util.Collection<? extends MessageStream<? extends T>> streams)
Merges all
streams . |
<K,V> MessageStream<KV<K,V>> |
partitionBy(java.util.function.Function<? super M,? extends K> keyExtractor,
java.util.function.Function<? super M,? extends V> valueExtractor,
KVSerde<K,V> serde,
java.lang.String id)
Re-partitions this
MessageStream using keys from the keyExtractor by creating a new
intermediate stream on the job.default.system . |
<K,V> MessageStream<KV<K,V>> |
partitionBy(java.util.function.Function<? super M,? extends K> keyExtractor,
java.util.function.Function<? super M,? extends V> valueExtractor,
java.lang.String id)
Same as calling
partitionBy(Function, Function, KVSerde, String) with a null KVSerde. |
void |
sendTo(OutputStream<M> outputStream)
Allows sending messages in this
MessageStream to an OutputStream . |
void |
sink(SinkFunction<? super M> sinkFn)
Allows sending messages in this
MessageStream to an output system using the provided SinkFunction . |
<K,WV> MessageStream<WindowPane<K,WV>> |
window(Window<M,K,WV> window,
java.lang.String id)
Groups the messages in this
MessageStream according to the provided Window semantics
(e.g. |
<OM> MessageStream<OM> map(MapFunction<? super M,? extends OM> mapFn)
MessageStream
and returns the
transformed MessageStream
.OM
- the type of messages in the transformed MessageStream
mapFn
- the function to transform a message to another messageMessageStream
<OM> MessageStream<OM> flatMap(FlatMapFunction<? super M,? extends OM> flatMapFn)
MessageStream
to n messages in the transformed MessageStream
OM
- the type of messages in the transformed MessageStream
flatMapFn
- the function to transform a message to zero or more messagesMessageStream
MessageStream<M> filter(FilterFunction<? super M> filterFn)
MessageStream
and returns the
filtered MessageStream
.
The FilterFunction
is a predicate which determines whether a message in this MessageStream
should be retained in the filtered MessageStream
.
filterFn
- the predicate to filter messages from this MessageStream
.MessageStream
void sink(SinkFunction<? super M> sinkFn)
MessageStream
to an output system using the provided SinkFunction
.
Offers more control over processing and sending messages than sendTo(OutputStream)
since
the SinkFunction
has access to the MessageCollector
and
TaskCoordinator
.
This can also be used to send output to a system (e.g. a database) that doesn't have a corresponding Samza SystemProducer implementation.
sinkFn
- the function to send messages in this stream to an external systemvoid sendTo(OutputStream<M> outputStream)
MessageStream
to an OutputStream
.
When sending messages to an OutputStream<KV<K, V>>
, messages are partitioned using their serialized key.
When sending messages to any other OutputStream<M>
, messages are partitioned using a null partition key.
outputStream
- the output stream to send messages to<K,WV> MessageStream<WindowPane<K,WV>> window(Window<M,K,WV> window, java.lang.String id)
MessageStream
according to the provided Window
semantics
(e.g. tumbling, sliding or session windows) and returns the transformed MessageStream
of
WindowPane
s.
Use the Windows
helper methods to create the appropriate windows.
The id
must be unique for each operator in this application. It is used as part of the unique ID
for any state stores and streams created by this operator (the full ID also contains the job name, job id and
operator type). If the application logic is changed, this ID must be reused in the new operator to retain
state from the previous version, and changed for the new operator to discard the state from the previous version.
K
- the type of key in the message in this MessageStream
. If a key is specified,
panes are emitted per-key.WV
- the type of value in the WindowPane
in the transformed MessageStream
window
- the window to group and process messages from this MessageStream
id
- the unique id of this operator in this applicationMessageStream
<K,OM,JM> MessageStream<JM> join(MessageStream<OM> otherStream, JoinFunction<? extends K,? super M,? super OM,? extends JM> joinFn, Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, java.time.Duration ttl, java.lang.String id)
MessageStream
with another MessageStream
using the provided
pairwise JoinFunction
.
Messages in each stream are retained for the provided ttl
and join results are
emitted as matches are found.
Both inputs being joined must have the same number of partitions, and should be partitioned by the join key.
The id
must be unique for each operator in this application. It is used as part of the unique ID
for any state stores and streams created by this operator (the full ID also contains the job name, job id and
operator type). If the application logic is changed, this ID must be reused in the new operator to retain
state from the previous version, and changed for the new operator to discard the state from the previous version.
K
- the type of join keyOM
- the type of messages in the other streamJM
- the type of messages resulting from the joinFn
otherStream
- the other MessageStream
to be joined withjoinFn
- the function to join messages from this and the other MessageStream
keySerde
- the serde for the join keymessageSerde
- the serde for messages in this streamotherMessageSerde
- the serde for messages in the other streamttl
- the ttl for messages in each streamid
- the unique id of this operator in this applicationMessageStream
MessageStream<M> merge(java.util.Collection<? extends MessageStream<? extends M>> otherStreams)
otherStreams
with this MessageStream
.
The merged stream contains messages from all streams in the order they arrive.
otherStreams
- other MessageStream
s to be merged with this MessageStream
MessageStream
static <T> MessageStream<T> mergeAll(java.util.Collection<? extends MessageStream<? extends T>> streams)
streams
.
The merged MessageStream
contains messages from all streams
in the order they arrive.
T
- the type of messages in each of the streamsstreams
- MessageStream
s to be mergedMessageStream
java.lang.IllegalArgumentException
- if streams
is empty<K,V> MessageStream<KV<K,V>> partitionBy(java.util.function.Function<? super M,? extends K> keyExtractor, java.util.function.Function<? super M,? extends V> valueExtractor, KVSerde<K,V> serde, java.lang.String id)
MessageStream
using keys from the keyExtractor
by creating a new
intermediate stream on the job.default.system
. This intermediate stream is both an output and
input to the job.
Uses the provided KVSerde
for serialization of keys and values. If the provided serde
is null,
uses the default serde provided via StreamGraph.setDefaultSerde(org.apache.samza.serializers.Serde<?>)
, which must be a KVSerde. If the default
serde is not a KVSerde
, a runtime exception will be thrown. If no default serde has been provided
before calling this method, a KVSerde<NoOpSerde, NoOpSerde>
is used.
The number of partitions for this intermediate stream is determined as follows:
If the stream is an eventual input to a join(org.apache.samza.operators.MessageStream<OM>, org.apache.samza.operators.functions.JoinFunction<? extends K, ? super M, ? super OM, ? extends JM>, org.apache.samza.serializers.Serde<K>, org.apache.samza.serializers.Serde<M>, org.apache.samza.serializers.Serde<OM>, java.time.Duration, java.lang.String)
, and the number of partitions for the other stream is known,
then number of partitions for this stream is set to the number of partitions in the other input stream.
Else, the number of partitions is set to the value of the job.intermediate.stream.partitions
configuration, if present.
Else, the number of partitions is set to to the max of number of partitions for all input and output streams
(excluding intermediate streams).
The id
must be unique for each operator in this application. It is used as part of the unique ID
for any state stores and streams created by this operator (the full ID also contains the job name, job id and
operator type). If the application logic is changed, this ID must be reused in the new operator to retain
state from the previous version, and changed for the new operator to discard the state from the previous version.
Unlike sendTo(org.apache.samza.operators.OutputStream<M>)
, messages with a null key are all sent to partition 0.
K
- the type of output keyV
- the type of output valuekeyExtractor
- the Function
to extract the message and partition key from the input message.
Messages with a null key are all sent to partition 0.valueExtractor
- the Function
to extract the value from the input messageserde
- the KVSerde
to use for (de)serializing the key and value.id
- the unique id of this operator in this applicationMessageStream
<K,V> MessageStream<KV<K,V>> partitionBy(java.util.function.Function<? super M,? extends K> keyExtractor, java.util.function.Function<? super M,? extends V> valueExtractor, java.lang.String id)
partitionBy(Function, Function, KVSerde, String)
with a null KVSerde.
Uses the default serde provided via StreamGraph.setDefaultSerde(org.apache.samza.serializers.Serde<?>)
, which must be a KVSerde. If the default
serde is not a KVSerde
, a runtime exception will be thrown. If no default serde has been provided
before calling this method, a KVSerde<NoOpSerde, NoOpSerde>
is used.
K
- the type of output keyV
- the type of output valuekeyExtractor
- the Function
to extract the message and partition key from the input messagevalueExtractor
- the Function
to extract the value from the input messageid
- the unique id of this operator in this applicationMessageStream