Kafka Connector
Kafka I/O : QuickStart
Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process them and emit results to other Kafka topics or databases.
The hello-samza
project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results.
-
High Level Streams API Example with a corresponding tutorial
-
Low Level Task API Example with a corresponding tutorial
Concepts
####KafkaSystemDescriptor
Samza refers to any IO source (eg: Kafka) it interacts with as a system, whose properties are set using a corresponding SystemDescriptor
. The KafkaSystemDescriptor
allows you to describe the Kafka cluster you are interacting with and specify its properties.
####KafkaInputDescriptor
A Kafka cluster usually has multiple topics (a.k.a streams). The KafkaInputDescriptor
allows you to specify the properties of each Kafka topic your application should read from. For each of your input topics, you should create a corresponding instance of KafkaInputDescriptor
by providing a topic-name and a serializer.
The above example describes an input Kafka stream from the “page-view-topic” which Samza de-serializes into a JSON payload. Samza provides default serializers for common data-types like string, avro, bytes, integer etc.
####KafkaOutputDescriptor
Similarly, the KafkaOutputDescriptor
allows you to specify the output streams for your application. For each output topic you write to, you should create an instance of KafkaOutputDescriptor
.
Configuration
#####Configuring Kafka producer and consumer
The KafkaSystemDescriptor
allows you to specify any Kafka producer or Kafka consumer) property which are directly passed over to the underlying Kafka client. This allows for
precise control over the KafkaProducer and KafkaConsumer used by Samza.
####Accessing an offset which is out-of-range This setting determines the behavior if a consumer attempts to read an offset that is outside of the current valid range maintained by the broker. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers.
#####Ignoring checkpointed offsets
Samza periodically persists the last processed Kafka offsets as a part of its checkpoint. During startup, Samza resumes consumption from the previously checkpointed offsets by default. You can over-ride this behavior and configure Samza to ignore checkpoints with KafkaInputDescriptor#shouldResetOffset()
.
Once there are no checkpoints for a stream, the #withOffsetDefault(..)
determines whether we start consumption from the oldest or newest offset.
The above example configures Samza to ignore checkpointed offsets for page-view-topic
and consume from the oldest available offset during startup. You can configure this behavior to apply to all topics in the Kafka cluster by using KafkaSystemDescriptor#withDefaultStreamOffsetDefault
.
Code walkthrough: High Level Streams API
In this section, we walk through a complete example that reads from a Kafka topic, filters a few messages and writes them to another topic.
-
Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of our Kafka cluster
-
Lines 4-6 defines a KafkaInputDescriptor for our input topic -
page-views
-
Lines 7-9 defines a KafkaOutputDescriptor for our output topic -
filtered-page-views
-
Line 9 creates a MessageStream for the input topic so that you can chain operations on it later
-
Line 10 creates an OuputStream for the output topic
-
Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream