public interface SystemConsumer
SystemConsumer is the interface that must be implemented by any system that wishes to integrate with Samza. Examples of systems that one might want to integrate would be systems like Kafka, Hadoop, Kestrel, SQS, etc.
SamzaContainer uses SystemConsumer to read messages from the underlying system, and funnels the messages to the appropriate StreamTask instances. The basic flow is for the SamzaContainer to poll for all SystemStreamPartitions, feed all IncomingMessageEnvelopes to the appropriate StreamTask, and then repeat. If no IncomingMessageEnvelopes are returned, the SamzaContainer polls again, but with a timeout of 10ms.
The SamzaContainer treats SystemConsumer in the following way:
There are generally three implementation styles to this interface:
Thread-based implementations typically use a series of threads to read from an underlying system asynchronously, and put the resulting messages into a queue, which is then read from whenever the poll method is invoked. The poll method's parameters map very closely to Java's BlockingQueue interface. BlockingEnvelopeMap is a helper class that makes it easy to implement thread-based implementations of SystemConsumer.
Selector-based implementations typically setup NIO-based non-blocking socket that can be selected for new data when poll is called.
Synchronous implementations simply fetch directly from the underlying system whenever poll is invoked. Synchronous implementations must take great care to adhere to the timeout rules defined in the poll method.
Modifier and Type | Field and Description |
---|---|
static int |
BLOCK_ON_OUTSTANDING_MESSAGES
A constant that can be used in the poll method's timeout parameter to
denote that the poll invocation should block until at least one message is
available for one of the SystemStreamPartitions supplied, or until all
SystemStreamPartitions supplied are at head (have no new messages available
since the last poll invocation was made for each SystemStreamPartition).
|
Modifier and Type | Method and Description |
---|---|
java.util.List<IncomingMessageEnvelope> |
poll(java.util.Map<SystemStreamPartition,java.lang.Integer> systemStreamPartitions,
long timeout)
Poll the SystemConsumer to get any available messages from the underlying
system.
|
void |
register(SystemStreamPartition systemStreamPartition,
java.lang.String offset)
Register a SystemStreamPartition to this SystemConsumer.
|
void |
start()
Tells the SystemConsumer to connect to the underlying system, and prepare
to begin serving messages when poll is invoked.
|
void |
stop()
Tells the SystemConsumer to close all connections, release all resource,
and shut down everything.
|
static final int BLOCK_ON_OUTSTANDING_MESSAGES
void start()
void stop()
void register(SystemStreamPartition systemStreamPartition, java.lang.String offset)
systemStreamPartition
- The SystemStreamPartition object representing the Samza
SystemStreamPartition to receive messages from.offset
- String representing the offset of the point in the stream to start
reading messages from. This is an inclusive parameter; if "7" were
specified, the first message for the system/stream/partition to be
consumed and returned would be a message whose offset is "7".java.util.List<IncomingMessageEnvelope> poll(java.util.Map<SystemStreamPartition,java.lang.Integer> systemStreamPartitions, long timeout) throws java.lang.InterruptedException
If the underlying implementation does not take care to adhere to the timeout parameter, the SamzaContainer's performance will suffer drastically. Specifically, if poll blocks when it's not supposed to, it will block the entire main thread in SamzaContainer, and no messages will be processed while blocking is occurring.
systemStreamPartitions
- A map from SystemStreamPartition to maximum number of messages to
return for the SystemStreamPartition. Polling with {stream1: 100,
stream2: 1000} tells the SystemConsumer that it can return between
0 and 100 messages (inclusive) for stream1, and between 0 and 1000
messages for stream2. If SystemConsumer has messages available for
other registered SystemStreamPartitions, but they are not in the
systemStreamPartitions map in a given poll invocation, they can't
be returned. It is illegal to pass in SystemStreamPartitions that
have not been registered with the SystemConsumer first.timeout
- If timeout < 0, poll will block unless all SystemStreamPartition
are at "head" (the underlying system has been checked, and
returned an empty set). If at head, an empty list is returned. If
timeout >= 0, poll will return any messages that are currently
available for any of the SystemStreamPartitions specified. If no
new messages are available, it will wait up to timeout
milliseconds for messages from any SystemStreamPartition to become
available. It will return an empty list if the timeout is hit, and
no new messages are available.java.lang.InterruptedException