public abstract class BlockingEnvelopeMap extends java.lang.Object implements SystemConsumer
BlockingEnvelopeMap is a helper class for SystemConsumer implementations. Samza's poll() requirements make implementing SystemConsumers somewhat tricky. BlockingEnvelopeMap is provided to help other developers write SystemConsumers. The intended audience is not those writing Samza jobs, but rather those extending Samza to consume from new types of stream providers and other systems.
SystemConsumers that implement BlockingEnvelopeMap need to add messages using
put
(or putAll
),
and update noMoreMessage using setIsAtHead. The noMoreMessage variable is used
to determine whether a SystemStreamPartition is "caught up" (has read all
possible messages from the underlying system). For example, with a Kafka
system, noMoreMessages would be set to true when the last message offset
returned is equal to the offset high watermark for a given topic/partition.
Modifier and Type | Class and Description |
---|---|
class |
BlockingEnvelopeMap.BlockingEnvelopeMapMetrics |
class |
BlockingEnvelopeMap.BufferGauge |
class |
BlockingEnvelopeMap.BufferSizeGauge |
BLOCK_ON_OUTSTANDING_MESSAGES
Constructor and Description |
---|
BlockingEnvelopeMap() |
BlockingEnvelopeMap(MetricsRegistry metricsRegistry) |
BlockingEnvelopeMap(MetricsRegistry metricsRegistry,
Clock clock) |
BlockingEnvelopeMap(MetricsRegistry metricsRegistry,
Clock clock,
java.lang.String metricsGroupName) |
Modifier and Type | Method and Description |
---|---|
long |
getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) |
int |
getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) |
protected boolean |
isAtHead(SystemStreamPartition systemStreamPartition) |
protected java.util.concurrent.BlockingQueue<IncomingMessageEnvelope> |
newBlockingQueue() |
java.util.Map<SystemStreamPartition,java.util.List<IncomingMessageEnvelope>> |
poll(java.util.Set<SystemStreamPartition> systemStreamPartitions,
long timeout)
Poll the SystemConsumer to get any available messages from the underlying
system.
|
protected void |
put(SystemStreamPartition systemStreamPartition,
IncomingMessageEnvelope envelope)
Place a new
IncomingMessageEnvelope on the
queue for the specified SystemStreamPartition . |
protected void |
putAll(SystemStreamPartition systemStreamPartition,
java.util.List<IncomingMessageEnvelope> envelopes)
Place a collection of
IncomingMessageEnvelope
on the queue for the specified SystemStreamPartition . |
void |
register(SystemStreamPartition systemStreamPartition,
java.lang.String offset)
Register a SystemStreamPartition to this SystemConsumer.
|
protected java.lang.Boolean |
setIsAtHead(SystemStreamPartition systemStreamPartition,
boolean isAtHead) |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
start, stop
public BlockingEnvelopeMap()
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry)
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock)
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, java.lang.String metricsGroupName)
public void register(SystemStreamPartition systemStreamPartition, java.lang.String offset)
register
in interface SystemConsumer
systemStreamPartition
- The SystemStreamPartition object representing the Samza
SystemStreamPartition to receive messages from.offset
- String representing the offset of the point in the stream to start
reading messages from. This is an inclusive parameter; if "7" were
specified, the first message for the system/stream/partition to be
consumed and returned would be a message whose offset is "7".
Note: For broadcast streams, different tasks may checkpoint the same ssp with different values. It
is the system's responsibility to select the lowest one.protected java.util.concurrent.BlockingQueue<IncomingMessageEnvelope> newBlockingQueue()
public java.util.Map<SystemStreamPartition,java.util.List<IncomingMessageEnvelope>> poll(java.util.Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws java.lang.InterruptedException
If the underlying implementation does not take care to adhere to the timeout parameter, the SamzaContainer's performance will suffer drastically. Specifically, if poll blocks when it's not supposed to, it will block the entire main thread in SamzaContainer, and no messages will be processed while blocking is occurring.
poll
in interface SystemConsumer
systemStreamPartitions
- A set of SystemStreamPartition to poll for new messages. If
SystemConsumer has messages available for other registered
SystemStreamPartitions, but they are not in the
systemStreamPartitions set in a given poll invocation, they can't
be returned. It is illegal to pass in SystemStreamPartitions that
have not been registered with the SystemConsumer first.timeout
- If timeout < 0, poll will block unless all SystemStreamPartition
are at "head" (the underlying system has been checked, and
returned an empty set). If at head, an empty map is returned. If
timeout >= 0, poll will return any messages that are currently
available for any of the SystemStreamPartitions specified. If no
new messages are available, it will wait up to timeout
milliseconds for messages from any SystemStreamPartition to become
available. It will return an empty map if the timeout is hit, and
no new messages are available.java.lang.InterruptedException
- Thrown when a blocking poll has been interrupted by another
thread.protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws java.lang.InterruptedException
IncomingMessageEnvelope
on the
queue for the specified SystemStreamPartition
.systemStreamPartition
- SystemStreamPartition that owns the envelopeenvelope
- Message for specified SystemStreamPartitionjava.lang.InterruptedException
- from underlying concurrent collectionprotected void putAll(SystemStreamPartition systemStreamPartition, java.util.List<IncomingMessageEnvelope> envelopes) throws java.lang.InterruptedException
IncomingMessageEnvelope
on the queue for the specified SystemStreamPartition
.
Insertion of all the messages into the queue is not guaranteed to be done atomically.
systemStreamPartition
- SystemStreamPartition that owns the envelopeenvelopes
- Messages for specified SystemStreamPartitionjava.lang.InterruptedException
- from underlying concurrent collectionpublic int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition)
public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition)
protected java.lang.Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead)
protected boolean isAtHead(SystemStreamPartition systemStreamPartition)