@InterfaceStability.Unstable public interface StreamApplication
The following example removes page views older than 1 hour from the input stream:
public class PageViewCounter implements StreamApplication {
public void init(StreamGraph graph, Config config) {
MessageStream<PageViewEvent> pageViewEvents =
graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m);
OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents =
graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m);
pageViewEvents
.filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
.sendTo(filteredPageViewEvents);
}
}
The example above can be run using an ApplicationRunner:
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
PageViewCounter app = new PageViewCounter();
LocalApplicationRunner runner = new LocalApplicationRunner(config);
runner.run(app);
runner.waitForFinish();
}
Implementation Notes: Currently StreamApplications are wrapped in a StreamTask
during execution.
A new StreamApplication instance will be created and initialized when planning the execution, as well as for each
StreamTask
instance used for processing incoming messages. Execution is synchronous and thread-safe
within each StreamTask
.
Modifier and Type | Method and Description |
---|---|
void |
init(StreamGraph graph,
Config config)
Describes and initializes the transforms for processing message streams and generating results.
|
void init(StreamGraph graph, Config config)
The StreamGraph
provides access to input and output streams. Input MessageStream
s can be
transformed into other MessageStream
s or sent to an OutputStream
using the MessageStream
operators.
Most operators accept custom functions for doing the transformations. These functions are InitableFunction
s
and are provided the Config
and TaskContext
during their own initialization. The config and the
context can be used, for example, to create custom metrics or access durable state stores.
A shared context between InitableFunction
s for different operators within a task instance can be set
up by providing a ContextManager
using StreamGraph.withContextManager(org.apache.samza.operators.ContextManager)
.
graph
- the StreamGraph
to get input/output streams fromconfig
- the configuration for the application