Lagom Kafka Client

§Lagom Kafka Client

Lagom provides an implementation of the Message Broker API that uses Kafka. In the remainder you will learn how to add the dependency in your build, and how to configure and tune topic’s publishers and subscribers.

§Dependency

To use this feature add the following in your project’s build.

In Maven:

<dependency>
    <groupId>com.lightbend.lagom</groupId>
    <artifactId>lagom-javadsl-kafka-broker_2.11</artifactId>
    <version>${lagom.version}</version>
</dependency>

In sbt:

libraryDependencies += lagomJavadslKafkaBroker

When importing the Lagom Kafka Broker module keep in mind that the Lagom Kafka Broker module requires one implementation of a Lagom Persistence so make sure your dependencies include either Lagom Persistence Cassandra or Lagom Persistence JDBC

§Configuration

The Lagom Kafka Client implementation is built using akka-stream-kafka. The akka-stream-kafka library wraps the official Apache Java Kafka client and exposes a (Akka) stream based API to publish/consume messages to/from Kafka. Therefore, we have effectively three libraries at play, with each of them exposing its own configuration. Let’s explore the configuration keys exposed by each layer, starting with the one sitting at the top, i.e., the Lagom Kafka Client.

§Lagom Kafka Client

lagom.broker.kafka {
  # The name of the Kafka service to look up out of the service locator.
  # If this is an empty string, then a service locator lookup will not be done,
  # and the brokers configuration will be used instead.
  service-name = "kafka_native"
  service-name = ${?KAFKA_SERVICE_NAME}

  # The URLs of the Kafka brokers. Separate each URL with a comma.
  # This will be ignored if the service-name configuration is non empty.
  brokers = ${lagom.broker.defaults.kafka.brokers}

  client {
    default {
      # Exponential backoff for failures
      failure-exponential-backoff {
        # minimum (initial) duration until processor is started again
        # after failure
        min = 3s

        # the exponential back-off is capped to this duration
        max = 30s

        # additional random delay is based on this factor
        random-factor = 0.2
      }
    }

    # configuration used by the Lagom Kafka producer
    producer = ${lagom.broker.kafka.client.default}
    producer.role = ""

    # configuration used by the Lagom Kafka consumer
    consumer {
      failure-exponential-backoff = ${lagom.broker.kafka.client.default.failure-exponential-backoff}

      # The number of offsets that will be buffered to allow the consumer flow to
      # do its own buffering. This should be set to a number that is at least as
      # large as the maximum amount of buffering that the consumer flow will do,
      # if the consumer buffer buffers more than this, the offset buffer will
      # backpressure and cause the stream to stop.
      offset-buffer = 100

      # Number of messages batched together by the consumer before the related messages'
      # offsets are committed to Kafka.
      # By increasing the batching-size you are trading speed with the risk of having
      # to re-process a larger number of messages if a failure occurs.
      # The value provided must be strictly greater than zero.
      batching-size = 20

      # Interval of time waited by the consumer before the currently batched messages'
      # offsets are committed to Kafka.
      # This parameter is useful to ensure that messages' offsets are always committed
      # within a fixed amount of time.
      # The value provided must be strictly greater than zero.
      batching-interval = 5 seconds
    }
  }
}

First, notice that the service-name is set to “kafka_native” by default. This property defines how the kafka broker URI will be looked up in the service locator (since v1.3.1). If you choose you can disable the lookup by setting the service-name to an empty string and pass the location of your Kafka brokers via the key lagom.broker.kafka.brokers. This setting is mapped to Kafka’s boot-strap server list so only a few of the brokers need to be specified since the rest will be discovered dynamically. In production, you will usually want to have at least two brokers for resiliency. Make sure to separate each broker URL with a comma.

Second, we have configuration that is specific to the publisher and the subscriber. The lagom.broker.kafka.client.default.failure-exponential-backoff defines configuration for what to do when a publisher or subscriber stream fails. Specifically, it allows you to configure the backoff time that is awaited before restarting a publishing/consuming stream. Failure can happen for different reasons, for instance it may be due to an application error, or because of a network error. Independently of the cause, Lagom will keep retrying to restart the stream (whilst waiting longer and longer between each failed retry). As you can see, both the publisher and subscriber use the same defaults, but different values for either of them can be set.

Third, the consumer has a few more configuration keys allowing you to decide how often the read-side offset is persisted in the datastore. When tuning these values, you are trading performances (storing the offset every time a message is consumed can be costly), with the risk of having to re-process some message if a failure occurs.

§Akka Stream Kafka configuration

See the akka-stream-kafka reference.conf to find out about the available configuration parameters.

§Apache Java Kafka Client

See the Producer Configs documentation to learn about the exposed configuration for the publisher. While, for the subscriber, see the New Consumer Configs. The only caveat is that if you need to change the value of any of the configuration provided by the Java Kafka Client, you must prepend the desired configuration key you want to change with akka.kafka.consumer.kafka-clients, for the consumer, or akka.kafka.producer.kafka-clients. For instance, let’s assume you’d like to change the consumer’s request.timeout.ms, you should add the following in the service’s application.conf:

akka.kafka.producer.kafka-clients {
  request.timeout.ms = 30000
}

§Subscriber only Services

Sometimes you will implement a Lagom Service that will only consume from the Kafka Topic. In that case you can import the Lagom Kafka Client alone (instead of importing the Lagom Kafka Broker and a Lagom Persistence implementation).

In Maven:

<dependency>
    <groupId>com.lightbend.lagom</groupId>
    <artifactId>lagom-javadsl-kafka-client_2.11</artifactId>
    <version>${lagom.version}</version>
</dependency>

In sbt:

libraryDependencies += lagomJavadslKafkaClient

If/when your subscriber-only service evolves to include features that publish data to a topic, you will need to depend on Lagom Kafka Broker and remove the dependency to Lagom Kafka Client. The Lagom Kafka Broker module includes the Lagom Kafka Client module.

§Consuming Topics from 3rd parties

You may want your Lagom service to consume data produced on services not implemented in Lagom. In that case, as described in the Service Clients section, you can create a third-party-service-api module in your Lagom project. That module will contain a Service Descriptor declaring the topic you will consume from. Once you have your ThirdPartyService interface and related classes implemented, you should add third-party-service-api as a dependency on your fancy-service-impl. Finally, you can consume from the topic described in ThirdPartyService as documented in the Subscribe to a topic section.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.