Message Broker API

§Message Broker API

The Lagom Message Broker API provides a distributed publish-subscribe model that services can use to share data via topics. A topic is simply a channel that allows services to push and pull data. In Lagom, topics are strongly typed, hence both the subscriber and producer can know in advance what the expected data flowing through will be.

§Declaring a topic

To publish data to a topic a service needs to declare the topic in its service descriptor.

import com.lightbend.lagom.javadsl.api.*;
import com.lightbend.lagom.javadsl.api.broker.Topic;

import static com.lightbend.lagom.javadsl.api.Service.*;

public interface HelloService extends Service {
  String GREETINGS_TOPIC = "greetings";

  @Override
  default Descriptor descriptor() {
    return named("helloservice")
        .withCalls(
            pathCall("/api/hello/:id", this::hello), pathCall("/api/hello/:id", this::useGreeting))
        // here we declare the topic(s) this service will publish to
        .withTopics(topic(GREETINGS_TOPIC, this::greetingsTopic))
        .withAutoAcl(true);
  }
  // The topic handle
  Topic<GreetingMessage> greetingsTopic();

  ServiceCall<NotUsed, String> hello(String id);

  ServiceCall<GreetingMessage, Done> useGreeting(String id);
}

The syntax for declaring a topic is similar to the one used already to define services’ endpoints. The Descriptor.withTopics method accepts a sequence of topic calls, each topic call can be defined via the Service.topic static method. The latter takes a topic name (i.e., the topic identifier), and a reference to a method that returns a Topic instance.

Data flowing through a topic is serialized to JSON by default. Of course, it is possible to use a different serialization format, and you can do so by passing a different message serializer for each topic defined in a service descriptor. For instance, using the above service definition, here is how you could have passed a custom serializer: topic("greetings", this::greetingsTopic).withMessageSerializer(<your-custom-serializer>).

§Partitioning topics

Kafka will distribute messages for a particular topic across many partitions, so that the topic can scale. Messages sent to different partitions may be processed out of order, so if the ordering of the messages you are publishing matters, you need to ensure that the messages are partitioned in such a way that order is preserved. Typically, this means ensuring each message for a particular entity goes to the same partition.

Lagom allows this by allowing you to configure a partition key strategy, which extracts the partition key out of a message. Kafka will then use this key to help decide what partition to send each message to. The partition can be selected using the partitionKeyStrategy property, by passing a PartitionKeyStrategy to it:

return named("blogpostservice")
    .withTopics(
        topic("blogposts", this::blogPostEvents)
            .withProperty(KafkaProperties.partitionKeyStrategy(), BlogPostEvent::getPostId));

§Implementing a topic

The primary source of messages that Lagom is designed to produce is persistent entity events. Rather than publishing events in an ad-hoc fashion in response to particular things happening, it is better to take the stream of events from your persistent entities, and adapt that to a stream of messages sent to the message broker. In this way, you can ensure at least once processing of events by both publishers and consumers, which allows you to guarantee a very strong level of consistency throughout your system.

Lagom’s TopicProducer helper provides two methods for publishing a persistent entities event stream, singleStreamWithOffset for use with non sharded read side event streams, and taggedStreamWithOffset for use with sharded read side event streams. Both of these methods take a callback which takes the last offset that the topic producer published, and allows resumption of the event stream from that offset via the PersistentEntityRegistry.eventStream method for obtaining a read-side stream. For more details on read-side streams, see Persistent Read-Side's.

Lagom will, in the case of the singleStreamWithOffset method, ensure that your topic producer only runs on one node of your cluster, or with the taggedStreamWithOffset method will distribute the tags evenly across the cluster to distribute the publishing load.

Here’s an example of publishing a single, non sharded event stream:

public Topic<GreetingMessage> greetingsTopic() {
  return TopicProducer.singleStreamWithOffset(
      offset -> {
        return persistentEntityRegistry
            .eventStream(HelloEventTag.INSTANCE, offset)
            .map(this::convertEvent);
      });
}

Note that the read-side event stream you passed to the topic producer is “activated” as soon as the service is started. That means all events persisted by your services will eventually be published to the connected topic. Also, you will generally want to map your domain events into some other type, so that other service won’t be tightly coupled to another service’s domain model events. In other words, domain model events are an implementation detail of the service, and should not be leaked.

§Filtering events

You may not want all events persisted by your services to be published. If that is the case then you can filter the event stream:

public Topic<GreetingMessage> greetingsTopic() {
  return TopicProducer.singleStreamWithOffset(
      offset -> {
        return persistentEntityRegistry
            .eventStream(HelloEventTag.INSTANCE, offset)
            .mapConcat(this::filterHelloGreetings);
      });
}

private List<Pair<GreetingMessage, Offset>> filterHelloGreetings(Pair<HelloEvent, Offset> pair) {
  if (pair.first() instanceof AbstractGreetingMessageChanged) {
    AbstractGreetingMessageChanged msg = (AbstractGreetingMessageChanged) pair.first();
    // Only publish greetings where the message is "Hello".
    if (msg.getMessage().equals("Hello")) {
      return Arrays.asList(convertEvent(pair));
    }
  }
  return Collections.emptyList();
}

When an event is filtered, the TopicProducer does not publish the event. It also does not advance the offset. If the TopicProducer restarts then it will resume from the last offset. If a large number of events are filtered then the last offset could be quite far behind, and so all those events will be reprocessed and filtered out again. You need to be aware that this may occur and keep the number of consecutively filtered elements relatively low and also minimize the time and resources required to perform the filtering.

§Offset storage

Lagom will use your configured persistence API provider to store the offsets for your event streams. To read more about offset storage, see the Cassandra offset documentation, JDBC database offset documentation and JPA database offset documentation.

§Subscribe to a topic

To subscribe to a topic, a service just needs to call Topic.subscribe() on the topic of interest. For instance, imagine that a service wants to collect all greetings messages published by the HelloService (refer to the code snippet above). The first thing you should do is inject a HelloService.

private final HelloService helloService;

@Inject
public AnotherServiceImpl(HelloService helloService) {
  this.helloService = helloService;
}

Then, subscribe to the greetings topic, and hook your logic to apply to each messages that published to the topic.

helloService
    .greetingsTopic()
    .subscribe() // <-- you get back a Subscriber instance
    .atLeastOnce(
        Flow.fromFunction(
            (GreetingMessage message) -> {
              return doSomethingWithTheMessage(message);
            }));

When calling Topic.subscribe() you will get back a Subscriber instance. In the above code snippet we have subscribed to the greetings topic using at-least-once delivery semantics. That means each message published to the greetings topic is received at least once, but possibly more. The subscriber also offers a atMostOnceSource that gives you at-most-once delivery semantics. If in doubt, prefer using at-least-once delivery semantics.

Finally, subscribers are grouped together via Subscriber.withGroupId. A subscriber group allows many nodes in your cluster to consume a message stream while ensuring that each message is only handled once by each node in your cluster. Without subscriber groups, all of your nodes for a particular service would get every message in the stream, leading to their processing being duplicated. By default, Lagom will use a group id that has the same name as the service consuming the topic.

§Consuming message metadata

Your broker implementation may provide additional metadata with messages which you can consume. This can be accessed by invoking the Subscriber.withMetadata() method, which returns a subscriber that wraps the messages in a Message.

helloService
    .greetingsTopic()
    .subscribe()
    .withMetadata()
    .atLeastOnce(
        Flow.fromFunction(
            (Message<GreetingMessage> msg) -> {
              GreetingMessage payload = msg.getPayload();
              String messageKey = msg.messageKeyAsString();
              Optional<Headers> kafkaHeaders = msg.get(KafkaMetadataKeys.HEADERS);
              System.out.println(
                  "Message: " + payload + " Key: " + messageKey + " Headers: " + kafkaHeaders);
              return Done.getInstance();
            }));

The messageKeyAsString method is provided as a convenience for accessing the message key. Other properties can be accessed using the get method. A full list of the metadata keys available for Kafka can be found here.

§Skipping messages

You may only want to apply your logic to a subset of the messages that the topic publishes and skip the others. The Flow that is passed to Subscriber.atLeastOnce must emit exactly one Done element for each element that it receives. It must also emit them in the same order that the elements were received. This means that you must not use methods such as filter or collect on the Flow which would drop elements.

The easiest way to achieve this is to use a total function which returns Done for the elements that should be skipped. For example:

helloService
    .greetingsTopic()
    .subscribe()
    .atLeastOnce(
        Flow.fromFunction(
            (GreetingMessage message) -> {
              if (message.getMessage().equals("Kia ora")) {
                return doSomethingWithTheMessage(message);
              } else {
                // Skip all messages where the message is not "Kia ora".
                return Done.getInstance();
              }
            }));

§Polymorphic event streams

Typically you will want to publish more than one type of event to a particular topic. This can be done by creating an interface that each event implements. In order to successfully serialize these events to and from JSON, a few extra annotations are needed to instruct Jackson to describe and consume the type of the event in the produced JSON.

For example, consider a situation where you have a blog post created event and a blog post published event. Here’s what your event structure might look like:

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeName;

@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = Void.class)
@JsonSubTypes({
  @JsonSubTypes.Type(BlogPostEvent.BlogPostCreated.class),
  @JsonSubTypes.Type(BlogPostEvent.BlogPostPublished.class)
})
public interface BlogPostEvent {

  String getPostId();

  @JsonTypeName("created")
  final class BlogPostCreated implements BlogPostEvent {
    private final String postId;
    private final String title;

    @JsonCreator
    public BlogPostCreated(String postId, String title) {
      this.postId = postId;
      this.title = title;
    }

    public String getPostId() {
      return postId;
    }

    public String getTitle() {
      return title;
    }
  }

  @JsonTypeName("published")
  final class BlogPostPublished implements BlogPostEvent {
    private final String postId;

    @JsonCreator
    public BlogPostPublished(String postId) {
      this.postId = postId;
    }

    public String getPostId() {
      return postId;
    }
  }
}

The @JsonTypeInfo annotation describes how the type of the event will be serialized. In this case, it’s saying each event type will be identified by its name, and that name will go into a property called type. The @JsonTypeName on each event subclass says what the name of that event should be. And the @JsonSubTypes annotation is used to tell Jackson what the possible sub types of the event are, so that it knows where to look when deserializing.

The resulting JSON for the BlogPostCreated event will look like this:

{
  "type": "created",
  "postId": "1234",
  "title": "Some title"
}

While the JSON for the BlogPostPublished event will look like this:

{
  "type": "published",
  "postId": "1234",
}

Finally, note the defaultImpl = Void.class in the @JsonSubTypes annotation. This tells Jackson that if it comes across an event type that it doesn’t recognize the name for, to deserialize it as null. This is optional, but can be important for ensuring forwards compatibility in your services, if a service adds a new event subclass that it publishes, often you want your existing services that consume that event stream to just ignore it. Setting this will allow them to do that, otherwise, you’ll have to upgrade all the services that consume that event stream to explicitly ignore it before you upgrade the producer that produces the events.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.