Getting Started
Documentation

Apache Samza 1.0 [Docs]

We’re thrilled to announce to the release of Apache Samza 1.0.

Today Samza forms the backbone of hundreds of real-time production applications across a multitude of companies, such as LinkedIn, VMWare, Slack, Redfin among many others. This release of Samza adds a variety of features and capabilities to Samza’s existing arsenal, coupled with new and improved documentation, code snippets, examples, and a brand-new website design! Here are a few selected highlights:

  • Stable high level APIs that allow creating complex processing pipelines with ease.

  • Beam Samza Runner now marries Beam’s best in class support for EventTime based windowed processing and sophisticated triggering with Samza’s stable and scalable stateful processing model.

  • Table API that provides a common abstraction for accessing remote or local databases. Developers are now able to “join” an input event stream with such a Table.

  • Integration Test Framework to enable effortless testing of Samza jobs without deploying a Kafka, Yarn, or Zookeeper cluster.

  • Support for Apache Log4j2 allowing improved logging performance, customization, and efficiency.

  • Upgraded Kafka client and consumer.

  • An interactive shell for Samza SQL for seamless formulation, development, and testing of SamzaSQL queries.

  • Side-input support that allows using log-compacted data sources to populate KV state for Samza applications.

  • An improved website with detailed documentation and lots of code samples!

In addition, Samza 1.0 brings numerous bug-fixes, upgrades, and improvements listed below.

New features

Samza 1.0 brings full-feature support for the following:

Improved Stable High Level APIs

Samza 1.0 brings Descriptor APIs that allows applications to specify their input and output systems and streams in code. Samza’s new Context APIs provide applications unified access to job-level, container-level, task-level, and application-level context and capabilities. This also simplifies Samza’s ApplicationRunner interface.

This API evolution requires a few simple modifications to application code, which we describe in detail in our upgrade steps

Beam Runner Support

Samza’s Beam Runner enables executing Apache Beam pipelines over Samza. This enables Samza applications to create complex processing pipelines that require event-time based processing, varying types of event-time based windowing, and more. This feature is supported in both the YARN and standalone deployment models.

Joining Streams and Tables

Samza’s Table API provides developers with unified access to local and remote data sources such as Key-Value stores or web services, while providing features such as rate-limiting, throttling, and caching capabilities. This provides first-class API primitives for building Stream-Table join jobs. Learn more about the use, semantics, and examples for Table API here.

Test Samza without ZK, Yarn or Kafka

Samza 1.0 brings a test framework that allows testing Samza applications using in-memory input and output. Users can now setup test and testing pipelines for their applications without needing to setup any other services, such as Kafka, YARN, or Zookeeper.

Log4J2 support

Samza now supports Apache Log4j 2 for system and application logging. Log4j 2 is an upgrade to Log4j that provides significant improvements over its predecessor, Log4j 1.x, such as better throughput and latency, custom log levels, and a pluggable logging architecture.

Kafka upgrade

This release upgrades Samza to use Kafka’s high-level consumer (Kafka v0.11.1.62). This brings latency and throughput benefits for Samza applications that consume from Kafka, in addition to bug-fixes. This also means Samza applications can now better their utilization of the underlying Kafka cluster.

SamzaSQL Shell

SamzaSQL now provides a shell for users to type-in their SQL queries, while Samza does the heavy-lifting of wiring the inputs and outputs, and sizing the application in the background. This is great for testing and experimenting with queries while formulating your application-logic, specially suited for data-scientists and tinkerers.

Side-inputs

Samza 1.0 brings the ability to leverage existing log-compacted data sources (e.g., Kafka topics) to populate KV state for Samza applications. If your data processing pipeline involves Hadoop-to-Kafka push, this feature alleviates the need for your Samza job to create separate Kafka-topics to back KV state.

Improved website, documentation, and samples

We’ve re-designed the Samza website making it easier to find details on key Samza concepts and patterns. All documentation has been revised and rewritten, keeping in mind the feedback we got from our customers. We’ve revised and added sample application code to showcase Samza 1.0 and the use of its new APIs.

Enhancements and Upgrades

This release brings the following enhancements, upgrades, and capabilities:

New and improved documentation, code snippets, and examples

API enhancements and simplifications

SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment

SAMZA-1804: System and stream descriptors

SAMZA-1858: Public APIs for shared context

SAMZA-1763: Add async methods to Table API

SAMZA-1786: Introduce the metadata store abstraction

SAMZA-1859: Zookeeper implementation of MetadataStore

SAMZA-1788: Add the LocationIdProvider abstraction

Upgrades and Bug-fixes

SAMZA-1768: Handle corrupted OFFSET file

SAMZA-1817: Long classpath support for non-split deployments SAMZA-1719: Add caching support to table-API

SAMZA-1783: Add Log4j2 functionality in Samza

SAMZA-1868: Refactor KafkaSystemAdmin from using SimpleConsumer

SAMZA-1776: Refactor KafkaSystemConsumer to remove the usage of deprecated SimpleConsumer client

SAMZA-1730: Adding state validation in StreamProcessor before any lifecycle operation and group coordination

SAMZA-1695: Clear events in ScheduleAfterDebounceTime on session expiration

SAMZA-1647: Fix race conditions in StreamProcessor

SAMZA-1371: Some Samza Containers get stuck at “Starting BrokerProxy”

Performance and Testing

SAMZA-1648: Integration Test Framework & Collection Stream Impl

SAMZA-1748: Failure tests in the standalone deployment

A source download of Samza 1.0 is available here, and is also available in Apache’s Maven repository.

Community Developments A symposium on Stream processing with Apache Samza and Apache Kafka was held on July 19th and on October 23rd. Both were attended by more than 350 participants from across the industry. It featured in-depth talks on Samza’s Beam integration, its use at LinkedIn for real-time notifications, a talk on Kafka-replication at Uber, and Kafka cruise control, and many others.

Samza was also the focus of a talk at Strange Loop’18, focussing in depth on its scalability, performance, extensibility, and programmability.

Upgrading your application to Apache Samza 1.0

Congratulations on your decision to upgrade to Samza 1.0!

First, please follow the upgrade steps depending on if you’ve been using the low-level (i.e., Task) API or the high-level (i.e., the fluent) Samza API.

Overview of New APIs

Instead of configuration, input and output systems are now defined using SystemDescriptors. Each SystemDescriptor encapsulates the vagaries of a particular type of system, e.g., Kafka, EventHub, Brooklin, etc. These system descriptors are then used to get StreamDescriptors for input and output streams.

Applications access all Context-based objects, such as Stores, Configuration, MetricsRegistry, Tables, TaskModel, CallbackScheduler, etc, via a unified Context-API.

Upgrade Steps for Low-level API Applications

  1. Update your application to use Samza version 1.0.0.

  2. Samza 1.0’s Context API necessitates some minimal changes to the init method of your Task class (the one that implements the StreamTask interface).

    The signature of the init method now has a single Context argument.

    So, to obtain the, TaskModel and details (e.g., TaskName) in your task’s init, use context.getTaskContext().getTaskModel()

    For the MetricsRegistry for the task, use context.getTaskContext().getTaskMetricsRegistry()

    For the Application’s configuration, use context.getJobContext().getConfig()

    For KV stores, use context.getTaskContext().getStore(storeName)

    For Table objects, use context.getTaskContext().getTable(tableId)

    For scheduling a timer-callback from your task, use context.getTaskContext(). getCallbackScheduler().scheduleCallback(key, timestamp, callback).

    For the ContainerModel, use context.getContainerContext().getContainerModel()

    For the MetricsRegistry for the container, use context.getContainerContext().getContainerMetricsRegistry()

  3. In Samza 1.0, a Samza application’s input, output, and processing-task should be specified in code, rather than in config. To do that, create a class (say MySamzaTaskApplication) that implements the TaskApplication interface. In the describe() method of this class do the following:

    a. For each system your job inputs or outputs to (e.g., Kafka), define a SystemDescriptor.

    For example, for Kafka,

      KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka");
    

    Similarly, for other systems such as EventHub, use

      EventHubsSystemDescriptor eventHubsSystemDescriptor = new EventHubsSystemDescriptor("eventhub")
    

    For HDFS use a GenericSystemDescriptor,

      GenericSystemDescriptor hdfsDescriptor = new GenericSystemDescriptor("hdfs", "org.apache.samza.system.hdfs.HdfsSystemFactory");
    

    You may choose to use config variables for the system-names and the system-factories.

    b. For each input of your job, create an input descriptor,

    For example, for Kafka input,

    KafkaInputDescriptor<KV<String, PageViewEvent>> inputDescriptor = kafkaSystemDescriptor.getInputDescriptor("streamID", new JsonSerde<>());
    

    For EventHubs input,

    EventHubsInputDescriptor eventHubsInputDescriptor = eventHubsSystemDescriptor.getInputDescriptor("streamID", "namespace", "entityPath", new MyFavouriteSerde());
    

    For other input types simply use a GenericInputDescriptor, see samza-hello-world for examples for details on usage.

    c. For each output of your job, create an output descriptor,

    For example, for Kafka output,

    KafkaOutputDescriptor kafkaOutputDescriptor = kafkaSystemDescriptor.getOutputDescriptor("streamID", new JsonSerde<>());
    

    For EventHub output,

    EventHubsOutputDescriptor eventHubsOutputDescriptor = eventHubsSystemDescriptor.getOutputDescriptor("streamID", "namespace", "entityPath", new MyFavouriteSerde());
    

    For other output types simply use a GenericInputDescriptor, see samza-hello-world for examples and details on usage.

    d. Add the input and output descriptors by,

    appDesc.withInputStream(inputDescriptor);
    appDesc.withOutputStream(outputDescriptor);
    

    e. Set the Task factory and point it to your pre-existing Task class.

    appDesc.withTaskFactory((StreamTaskFactory) () = new MyExistingSamzaTask());
    
  4. In your application’s config set the following config to point to the TaskApplication class implemented above:

     <property name="app.class" value="class-path-to-TaskApplication-class">
    

    Note that this config is called app.class and not task.class.

  5. In your application’s config set the following task.systems config. This config should be a union of all input and output systems used by your application.

    For example, an application that reads or writes from kafka, wikipedia should use:

    <property name="task.systems" value="kafka, wikipedia"/>
    

    Note that, the system names used here should match the system names used when creating the SystemDescriptor above in 3.a.

  6. Remove the task.class and task.inputs configs from your job’s configuration. These configs are not needed in Samza 1.0.

Upgrade Steps for High-level API Applications

  1. Update your application to use samza version 1.0.0.

  2. In Samza 1.0, a Samza application’s input, output, and processing-task is specified in code, rather than in config. So to do that, update your StreamApplication class:

    a. For each system your job inputs or outputs to, define a SystemDescriptor, For each system your job inputs or outputs to (e.g., Kafka), define a SystemDescriptor.

    For example, for Kafka,

    KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka");
    

    Similarly, for other systems such as EventHub, use

    EventHubsSystemDescriptor eventHubsSystemDescriptor = new EventHubsSystemDescriptor("eventhub")
    

    For HDFS use a GenericSystemDescriptor,

    GenericSystemDescriptor hdfsDescriptor = new GenericSystemDescriptor("hdfs", "org.apache.samza.system.hdfs.HdfsSystemFactory");
    

    You may choose to use config variables for the system-names and the system-factories.

    b. For each input of your job, define an input descriptor, For example, for Kafka input,

    KafkaInputDescriptor<KV<String, PageViewEvent>> inputDescriptor = kafkaSystemDescriptor.getInputDescriptor("streamID", new JsonSerde<>());
    

    For EventHubs input,

    EventHubsInputDescriptor eventHubsInputDescriptor = eventHubsSystemDescriptor.getInputDescriptor("streamID", "namespace", "entityPath", new MyFavouriteSerde());
    

    For other input types simply use a GenericInputDescriptor, see samza-hello-world for examples for details on usage.

    c. For each output of your job, define an output descriptor,

    For example, for Kafka output,

    KafkaOutputDescriptor kafkaOutputDescriptor = kafkaSystemDescriptor.getOutputDescriptor("streamID", new JsonSerde<>());
    

    For EventHub output,

    EventHubsOutputDescriptor eventHubsOutputDescriptor = eventHubsSystemDescriptor.getOutputDescriptor("streamID", "namespace", "entityPath", new MyFavouriteSerde());
    

    For other output types simply use a GenericInputDescriptor, see samza-hello-world for examples and details on usage.

    d. Obtain the input and output stream objects by using the input and output descriptors,

    For example,

    MessageStream<PageViewEvent> inputStream = appDesc.getInputStream(pageViewInputDescriptor)
    

    Or,

    MessageStream<GenericRecord> outputStream = appDesc.getOutputStream(outputDescriptor)
    

    The type of the MessageStream will depend on the type used in the respective input or output descriptor.

    e. Update your fluent expressions to use the input and output streams from d.

  3. In Samza 1.0, the framework creates a computation graph of each application, based on the application’s fluent API logic. Therefore, all operators and user-defined functions (UDFs) used in an app’s describe() method need to be specified in static classes and methods.

  4. Add task.systems to your application configuration. This config should be a union of all input and output systems used by your application.

    For example, an application that reads or writes from kafka, wikipedia should use:

    <property name="task.systems" value="kafka, wikipedia"/>
    

    Note that, the system names used here should match the system names used when creating the SystemDescriptor above in 2.

  5. Remove the task.inputs configs from your job’s configuration. These configs are not needed in Samza 1.0.

  6. If you are implementing InitableFunction, then you will need to make a change, since it now only has a single Context argument.

    a. Samza 1.0’s context API necessitates some minimal changes to the init method.
    The signature of the init method now has a single Context argument.

    So, to obtain the, TaskModel and details (e.g., TaskName) in your task’s init, use context.getTaskContext().getTaskModel()

    For the MetricsRegistry for the task, use context.getTaskContext().getTaskMetricsRegistry()

    For the Application’s configuration, use context.getJobContext().getConfig()

    For KV stores, use context.getTaskContext().getStore(storeName)

    For Table objects, use context.getTaskContext().getTable(tableId)

    For scheduling a timer-callback from your task, use context.getTaskContext(). getCallbackScheduler().scheduleCallback(key, timestamp, callback).

    For the ContainerModel, use context.getContainerContext().getContainerModel()

    For the MetricsRegistry for the container, use context.getContainerContext().getContainerMetricsRegistry()

    b. If you are using ContextManager, then you will need to use ApplicationContainerContextFactory and/or ApplicationTaskContextFactory instead. You can access your application-defined context(s) in InitiableFunction.init through the Context#getApplicationContainerContext and/or Context#getApplicationTaskContext.

Upgrade Steps for Samza SQL

No changes are required for upgrading your application Samza 1.0.