Message Broker API

§Message Broker API

The Lagom Message Broker API provides a distributed publish-subscribe model that services can use to share data via topics. A topic is simply a channel that allows services to push and pull data. In Lagom, topics are strongly typed, hence both the subscriber and producer can know in advance what the expected data flowing through will be.

§Declaring a topic

To publish data to a topic a service needs to declare the topic in its service descriptor.

import com.lightbend.lagom.scaladsl.api.broker.Topic
import com.lightbend.lagom.scaladsl.api.Service
import com.lightbend.lagom.scaladsl.api.ServiceCall
import play.api.libs.json.Format
import play.api.libs.json.Json

object HelloService {
  val TOPIC_NAME = "greetings"
}
trait HelloService extends Service {
  final override def descriptor = {
    import Service._
    named("brokerdocs")
      .withCalls(
        pathCall("/api/hello/:id", hello _),
        pathCall("/api/hello/:id", useGreeting _)
      )
      .withTopics(
        topic(HelloService.TOPIC_NAME, greetingsTopic)
      )
      .withAutoAcl(true)
  }

  // The topic handle
  def greetingsTopic(): Topic[GreetingMessage]

  def hello(id: String): ServiceCall[NotUsed, String]
  def useGreeting(id: String): ServiceCall[GreetingMessage, Done]
}

The syntax for declaring a topic is similar to the one used already to define services’ endpoints. The Descriptor.withTopics method accepts a sequence of topic calls, each topic call can be defined via the topic method on the Service object. The latter takes a topic name (i.e., the topic identifier), and a method reference that returns a Topic instance.

Data flowing through a topic is serialized to JSON by default. Of course, it is possible to use a different serialization format, and you can do so by providing an implicit message serializer for each topic defined in a service descriptor. This is the same approach described on message serialization when presenting Service Descriptors. You may want to review the message serializers documentation too.

§Partitioning topics

Kafka will distribute messages for a particular topic across many partitions, so that the topic can scale. Messages sent to different partitions may be processed out of order, so if the ordering of the messages you are publishing matters, you need to ensure that the messages are partitioned in such a way that order is preserved. Typically, this means ensuring each message for a particular entity goes to the same partition.

Lagom allows this by allowing you to configure a partition key strategy, which extracts the partition key out of a message. Kafka will then use this key to help decide what partition to send each message to. The partition can be selected using the partitionKeyStrategy property, by passing a PartitionKeyStrategy to it:

named("blogpostservice")
  .withTopics(
    topic("blogposts", blogPostEvents)
      .addProperty(
        KafkaProperties.partitionKeyStrategy,
        PartitionKeyStrategy[BlogPostEvent](_.postId)
      )
  )

§Implementing a topic

The primary source of messages that Lagom is designed to produce is persistent entity events. Rather than publishing events in an ad-hoc fashion in response to particular things happening, it is better to take the stream of events from your persistent entities, and adapt that to a stream of messages sent to the message broker. In this way, you can ensure at least once processing of events by both publishers and consumers, which allows you to guarantee a very strong level of consistency throughout your system.

Lagom’s TopicProducer helper provides two methods for publishing a persistent entities event stream, singleStreamWithOffset for use with non sharded read side event streams, and taggedStreamWithOffset for use with sharded read side event streams. Both of these methods take a callback which takes the last offset that the topic producer published, and allows resumption of the event stream from that offset via the PersistentEntityRegistry.eventStream method for obtaining a read-side stream. For more details on read-side streams, see Persistent Read-Side's.

Lagom will, in the case of the singleStreamWithOffset method, ensure that your topic producer only runs on one node of your cluster, or with the taggedStreamWithOffset method will distribute the tags evenly across the cluster to distribute the publishing load.

Here’s an example of publishing a single, non sharded event stream:

override def greetingsTopic(): Topic[GreetingMessage] =
  TopicProducer.singleStreamWithOffset { fromOffset =>
    persistentEntityRegistry
      .eventStream(HelloEventTag.INSTANCE, fromOffset)
      .map(ev => (convertEvent(ev), ev.offset))
  }

private def convertEvent(helloEvent: EventStreamElement[HelloEvent]): GreetingMessage = {
  helloEvent.event match {
    case GreetingMessageChanged(msg) => GreetingMessage(msg)
  }
}

Note that the read-side event stream you passed to the topic producer is, by default, “activated” as soon as the service is started. You can change that default behavior using the Projections API. That means all events persisted by your services will eventually be published to the connected topic. Also, you will generally want to map your domain events into some other type, so that other service won’t be tightly coupled to another service’s domain model events. In other words, domain model events are an implementation detail of the service, and should not be leaked.

§Filtering events

You may not want all events persisted by your services to be published. If that is the case then you can filter the event stream:

override def greetingsTopic(): Topic[GreetingMessage] =
  TopicProducer.singleStreamWithOffset { fromOffset =>
    persistentEntityRegistry
      .eventStream(HelloEventTag.INSTANCE, fromOffset)
      .mapConcat(filterEvents)
  }

private def filterEvents(ev: EventStreamElement[HelloEvent]) = ev match {
  // Only publish greetings where the message is "Hello".
  case ev @ EventStreamElement(_, GreetingMessageChanged("Hello"), offset) =>
    immutable.Seq((convertEvent(ev), offset))
  case _ => Nil
}

When an event is filtered, the TopicProducer does not publish the event. It also does not advance the offset. If the TopicProducer restarts then it will resume from the last offset. If a large number of events are filtered then the last offset could be quite far behind, and so all those events will be reprocessed and filtered out again. You need to be aware that this may occur and keep the number of consecutively filtered elements relatively low and also minimize the time and resources required to perform the filtering.

§Offset storage

Lagom will use your configured persistence API provider to store the offsets for your event streams. To read more about offset storage, see the Cassandra offset documentation, JDBC database offset documentation and Slick database offset documentation.

§Subscribe to a topic

To subscribe to a topic, a service just needs to call Topic.subscribe on the topic of interest. For instance, imagine that a service wants to collect all greetings messages published by the HelloService. The first thing you should do is inject a HelloService (See the section on using service clients for a complete explanation on using a client to another service). Then, subscribe to the greetings topic, and hook your logic to apply to each messages that published to the topic.

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
    Flow.fromFunction(doSomethingWithTheMessage)
  )

When calling Topic.subscribe you will get back a Subscriber instance. In the above code snippet we have subscribed to the greetings topic using at-least-once delivery semantics. That means each message published to the greetings topic is received at least once, but possibly more. The subscriber also offers a atMostOnceSource that gives you at-most-once delivery semantics. If in doubt, prefer using at-least-once delivery semantics.

Finally, subscribers are grouped together via Subscriber.withGroupId. A subscriber group allows many nodes in your cluster to consume a message stream while ensuring that each message is only handled once by each node in your cluster. Without subscriber groups, all of your nodes for a particular service would get every message in the stream, leading to their processing being duplicated. By default, Lagom will use a group id that has the same name as the service consuming the topic.

§Consuming message metadata

Your broker implementation may provide additional metadata with messages which you can consume. This can be accessed by invoking the Subscriber.withMetadata method, which returns a subscriber that wraps the messages in a Message.

import com.lightbend.lagom.scaladsl.api.broker.Message
import com.lightbend.lagom.scaladsl.broker.kafka.KafkaMetadataKeys

helloService
  .greetingsTopic()
  .subscribe
  .withMetadata
  .atLeastOnce(
    Flow[Message[GreetingMessage]].map { msg =>
      val greetingMessage = msg.payload
      val messageKey      = msg.messageKeyAsString
      val kafkaHeaders    = msg.get(KafkaMetadataKeys.Headers)
      println(s"Message: $greetingMessage Key: $messageKey Headers: $kafkaHeaders")
      Done
    }
  )

The messageKeyAsString method is provided as a convenience for accessing the message key. Other properties can be accessed using the get method. A full list of the metadata keys available for Kafka can be found here.

§Skipping messages

You may only want to apply your logic to a subset of the messages that the topic publishes and skip the others. The Flow that is passed to Subscriber.atLeastOnce must emit exactly one Done element for each element that it receives. It must also emit them in the same order that the elements were received. This means that you must not use methods such as filter or collect on the Flow which would drop elements.

The easiest way to achieve this is to use a total function which returns Done for the elements that should be skipped. For example:

helloService
  .greetingsTopic()
  .subscribe
  .atLeastOnce(
    Flow[GreetingMessage].map {
      case msg @ GreetingMessage("Kia ora") => doSomethingWithTheMessage(msg)
      case _                                => Done // Skip all messages where the message is not "Kia ora".
    }
  )

§Polymorphic event streams

Typically you will want to publish more than one type of event to a particular topic. This can be done by creating an interface that each event implements. In order to successfully serialize these events to and from JSON, you will have to include some extra information on your JSON representation of the data.

For example, consider a situation where you have a blog post created event and a blog post published event. Here’s what your event structure might look like:

sealed trait BlogPostEvent {
  def postId: String
}

case class BlogPostCreated(postId: String, title: String) extends BlogPostEvent

case class BlogPostPublished(postId: String) extends BlogPostEvent

And that’s how your Play JSON formatters could look like:

case object BlogPostCreated {
  implicit val blogPostCreatedFormat: Format[BlogPostCreated] = Json.format
}

case object BlogPostPublished {
  implicit val blogPostPublishedFormat: Format[BlogPostPublished] = Json.format
}

You will have to implement a custom Message Serializer that adds extra information on each JSON message so that you know what deserializer to use on the other end. The resulting JSON for the BlogPostCreated event will look like this:

{
 "postId": "23",
 "title": "new post",
 "event_type": "postCreated"
}

While the JSON for the BlogPostPublished event will look like this:

{
 "postId": "23",
 "event_type": "postPublished"
}

You can do that using Play JSON transformers:

object BlogPostEvent {
  implicit val reads: Reads[BlogPostEvent] = {
    (__ \ "event_type").read[String].flatMap {
      case "postCreated"   => implicitly[Reads[BlogPostCreated]].map(identity)
      case "postPublished" => implicitly[Reads[BlogPostPublished]].map(identity)
      case other           => Reads(_ => JsError(s"Unknown event type $other"))
    }
  }
  implicit val writes: Writes[BlogPostEvent] = Writes { event =>
    val (jsValue, eventType) = event match {
      case m: BlogPostCreated   => (Json.toJson(m)(BlogPostCreated.blogPostCreatedFormat), "postCreated")
      case m: BlogPostPublished => (Json.toJson(m)(BlogPostPublished.blogPostPublishedFormat), "postPublished")
    }
    jsValue.transform(__.json.update((__ \ 'event_type).json.put(JsString(eventType)))).get
  }
}

This approach has an added maintenance cost. Fortunately there are libraries that expand Play JSON features and provide support for algebraic data type serialization. For example: Play JSON Derived Codecs.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.