@InterfaceStability.Unstable public interface StreamApplication
The following example removes page views older than 1 hour from the input stream:
public class PageViewCounter implements StreamApplication {
public void init(StreamGraph graph, Config config) {
MessageStream<PageViewEvent> pageViewEvents =
graph.getInputStream("pageViewEvents", (k, m) -> (PageViewEvent) m);
OutputStream<String, PageViewEvent, PageViewEvent> recentPageViewEvents =
graph.getOutputStream("recentPageViewEvents", m -> m.memberId, m -> m);
pageViewEvents
.filter(m -> m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis())
.sendTo(filteredPageViewEvents);
}
}
The example above can be run using an ApplicationRunner:
public static void main(String[] args) {
CommandLine cmdLine = new CommandLine();
Config config = cmdLine.loadConfig(cmdLine.parser().parse(args));
PageViewCounter app = new PageViewCounter();
LocalApplicationRunner runner = new LocalApplicationRunner(config);
runner.run(app);
runner.waitForFinish();
}
Implementation Notes: Currently StreamApplications are wrapped in a StreamTask during execution.
A new StreamApplication instance will be created and initialized when planning the execution, as well as for each
StreamTask instance used for processing incoming messages. Execution is synchronous and thread-safe
within each StreamTask.
| Modifier and Type | Method and Description |
|---|---|
void |
init(StreamGraph graph,
Config config)
Describes and initializes the transforms for processing message streams and generating results.
|
void init(StreamGraph graph, Config config)
The StreamGraph provides access to input and output streams. Input MessageStreams can be
transformed into other MessageStreams or sent to an OutputStream using the MessageStream
operators.
Most operators accept custom functions for doing the transformations. These functions are InitableFunctions
and are provided the Config and TaskContext during their own initialization. The config and the
context can be used, for example, to create custom metrics or access durable state stores.
A shared context between InitableFunctions for different operators within a task instance can be set
up by providing a ContextManager using StreamGraph.withContextManager(org.apache.samza.operators.ContextManager).
graph - the StreamGraph to get input/output streams fromconfig - the configuration for the application