public abstract class BlockingEnvelopeMap extends java.lang.Object implements SystemConsumer
BlockingEnvelopeMap is a helper class for SystemConsumer implementations. Samza's poll() requirements make implementing SystemConsumers somewhat tricky. BlockingEnvelopeMap is provided to help other developers write SystemConsumers. The intended audience is not those writing Samza jobs, but rather those extending Samza to consume from new types of stream providers and other systems.
 SystemConsumers that implement BlockingEnvelopeMap need to add messages using
 put
 (or putAll),
 and update noMoreMessage using setIsAtHead. The noMoreMessage variable is used
 to determine whether a SystemStreamPartition is "caught up" (has read all
 possible messages from the underlying system). For example, with a Kafka
 system, noMoreMessages would be set to true when the last message offset
 returned is equal to the offset high watermark for a given topic/partition.
 
| Modifier and Type | Class and Description | 
|---|---|
| class  | BlockingEnvelopeMap.BlockingEnvelopeMapMetrics | 
| class  | BlockingEnvelopeMap.BufferGauge | 
| class  | BlockingEnvelopeMap.BufferSizeGauge | 
BLOCK_ON_OUTSTANDING_MESSAGES| Constructor and Description | 
|---|
| BlockingEnvelopeMap() | 
| BlockingEnvelopeMap(MetricsRegistry metricsRegistry) | 
| BlockingEnvelopeMap(MetricsRegistry metricsRegistry,
                   Clock clock) | 
| BlockingEnvelopeMap(MetricsRegistry metricsRegistry,
                   Clock clock,
                   java.lang.String metricsGroupName) | 
| Modifier and Type | Method and Description | 
|---|---|
| long | getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) | 
| int | getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) | 
| protected boolean | isAtHead(SystemStreamPartition systemStreamPartition) | 
| protected java.util.concurrent.BlockingQueue<IncomingMessageEnvelope> | newBlockingQueue() | 
| java.util.Map<SystemStreamPartition,java.util.List<IncomingMessageEnvelope>> | poll(java.util.Set<SystemStreamPartition> systemStreamPartitions,
    long timeout)Poll the SystemConsumer to get any available messages from the underlying
 system. | 
| protected void | put(SystemStreamPartition systemStreamPartition,
   IncomingMessageEnvelope envelope)Place a new  IncomingMessageEnvelopeon the
 queue for the specifiedSystemStreamPartition. | 
| protected void | putAll(SystemStreamPartition systemStreamPartition,
      java.util.List<IncomingMessageEnvelope> envelopes)Place a collection of  IncomingMessageEnvelopeon the queue for the specifiedSystemStreamPartition. | 
| void | register(SystemStreamPartition systemStreamPartition,
        java.lang.String offset)Register a SystemStreamPartition to this SystemConsumer. | 
| protected java.lang.Boolean | setIsAtHead(SystemStreamPartition systemStreamPartition,
           boolean isAtHead) | 
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitstart, stoppublic BlockingEnvelopeMap()
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry)
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock)
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, java.lang.String metricsGroupName)
public void register(SystemStreamPartition systemStreamPartition, java.lang.String offset)
register in interface SystemConsumersystemStreamPartition - The SystemStreamPartition object representing the Samza
          SystemStreamPartition to receive messages from.offset - String representing the offset of the point in the stream to start
          reading messages from. This is an inclusive parameter; if "7" were
          specified, the first message for the system/stream/partition to be
          consumed and returned would be a message whose offset is "7".
          Note: For broadcast streams, different tasks may checkpoint the same ssp with different values. It
          is the system's responsibility to select the lowest one.protected java.util.concurrent.BlockingQueue<IncomingMessageEnvelope> newBlockingQueue()
public java.util.Map<SystemStreamPartition,java.util.List<IncomingMessageEnvelope>> poll(java.util.Set<SystemStreamPartition> systemStreamPartitions, long timeout) throws java.lang.InterruptedException
If the underlying implementation does not take care to adhere to the timeout parameter, the SamzaContainer's performance will suffer drastically. Specifically, if poll blocks when it's not supposed to, it will block the entire main thread in SamzaContainer, and no messages will be processed while blocking is occurring.
poll in interface SystemConsumersystemStreamPartitions - A set of SystemStreamPartition to poll for new messages. If
          SystemConsumer has messages available for other registered
          SystemStreamPartitions, but they are not in the
          systemStreamPartitions set in a given poll invocation, they can't
          be returned. It is illegal to pass in SystemStreamPartitions that
          have not been registered with the SystemConsumer first.timeout - If timeout < 0, poll will block unless all SystemStreamPartition
          are at "head" (the underlying system has been checked, and
          returned an empty set). If at head, an empty map is returned. If
          timeout >= 0, poll will return any messages that are currently
          available for any of the SystemStreamPartitions specified. If no
          new messages are available, it will wait up to timeout
          milliseconds for messages from any SystemStreamPartition to become
          available. It will return an empty map if the timeout is hit, and
          no new messages are available.java.lang.InterruptedException - Thrown when a blocking poll has been interrupted by another
          thread.protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws java.lang.InterruptedException
IncomingMessageEnvelope on the
 queue for the specified SystemStreamPartition.systemStreamPartition - SystemStreamPartition that owns the envelopeenvelope - Message for specified SystemStreamPartitionjava.lang.InterruptedException - from underlying concurrent collectionprotected void putAll(SystemStreamPartition systemStreamPartition, java.util.List<IncomingMessageEnvelope> envelopes) throws java.lang.InterruptedException
IncomingMessageEnvelope
 on the queue for the specified SystemStreamPartition.
 Insertion of all the messages into the queue is not guaranteed to be done atomically.
systemStreamPartition - SystemStreamPartition that owns the envelopeenvelopes - Messages for specified SystemStreamPartitionjava.lang.InterruptedException - from underlying concurrent collectionpublic int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition)
public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition)
protected java.lang.Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead)
protected boolean isAtHead(SystemStreamPartition systemStreamPartition)