@InterfaceStability.Evolving public interface TaskApplication extends SamzaApplication<TaskApplicationDescriptor>
TaskApplication
describes the inputs, outputs, state, configuration and the processing logic for the
application in Samza's Low Level API.
A typical TaskApplication
implementation consists of the following stages:
SystemDescriptor
s, StreamDescriptor
s and TableDescriptor
s
TaskApplicationDescriptor
.
StreamTask
or AsyncStreamTask
that operates
on each IncomingMessageEnvelope
one at a time.
TaskFactory
using TaskApplicationDescriptor.withTaskFactory(org.apache.samza.task.TaskFactory)
that creates
instances of the task above. The TaskFactory
implementation must be Serializable
.
The following example TaskApplication
removes page views older than 1 hour from the input stream:
public class PageViewFilter implements TaskApplication {
public void describe(TaskApplicationDescriptor appDescriptor) {
KafkaSystemDescriptor trackingSystemDescriptor = new KafkaSystemDescriptor("tracking");
KafkaInputDescriptor<PageViewEvent> inputStreamDescriptor =
trackingSystemDescriptor.getInputDescriptor("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
KafkaOutputDescriptor<PageViewEvent>> outputStreamDescriptor =
trackingSystemDescriptor.getOutputDescriptor("recentPageViewEvent", new JsonSerdeV2<>(PageViewEvent.class)));
appDescriptor
.withInputStream(inputStreamDescriptor)
.withOutputStream(outputStreamDescriptor)
.withTaskFactory((StreamTaskFactory) () -> new PageViewTask());
}
}
public class PageViewTask implements StreamTask {
public void process(IncomingMessageEnvelope message, MessageCollector collector, TaskCoordinator coordinator) {
PageViewEvent m = (PageViewEvent) message.getValue();
if (m.getCreationTime() > System.currentTimeMillis() - Duration.ofHours(1).toMillis()) {
collector.send(new OutgoingMessageEnvelope(
new SystemStream("tracking", "recentPageViewEvent"), message.getKey(), message.getKey(), m));
}
}
}
describe