Publish-Subscribe

§Publish-Subscribe

Publish–subscribe is a well known messaging pattern. Senders of messages, called publishers, do not target the messages directly to specific receivers, but instead publish messages to topics without knowledge of which receivers, called subscribers, if any, there may be. Similarly, a subscriber express interest in a topic and receive messages published to that topic, without knowledge of which publishers, if any, there are.

§Dependency

To use this feature add the following in your project’s build:

libraryDependencies += lagomScaladslPubSub

§Usage from Service Implementation

Let’s look at an example of a service that publishes temperature measurements of hardware devices. A device can submit its current temperature and interested parties can get a stream of the temperature samples.

The service API is defined as:

trait SensorService extends Service {
  def registerTemperature(id: String): ServiceCall[Temperature, NotUsed]

  def temperatureStream(id: String): ServiceCall[NotUsed, Source[Temperature, NotUsed]]

  def descriptor = {
    import Service._

    named("/sensorservice").withCalls(
      pathCall("/device/:id/temperature", registerTemperature _),
      pathCall("/device/:id/temperature/stream", temperatureStream _)
    )
  }
}

The implementation of this interface looks like:

import com.lightbend.lagom.scaladsl.pubsub.PubSubRegistry
import com.lightbend.lagom.scaladsl.pubsub.TopicId

class SensorServiceImpl(pubSub: PubSubRegistry) extends SensorService {
  def registerTemperature(id: String) = ServiceCall { temperature =>
    val topic = pubSub.refFor(TopicId[Temperature](id))
    topic.publish(temperature)
    Future.successful(NotUsed.getInstance())
  }

  def temperatureStream(id: String) = ServiceCall { _ =>
    val topic = pubSub.refFor(TopicId[Temperature](id))
    Future.successful(topic.subscriber)
  }
}

When a device submits its current temperature it is published to a topic that is unique for that device. Note that the topic where the message is published to is defined by the message class, here Temperature, and an optional classifier, here the device id. The messages of this topic will be instances of the message class or subclasses thereof. The qualifier can be used to distinguish topics that are using the same message class. The empty string can be used as qualifier if the message class is enough to define the topic identity.

Use the method publish of the PubSubRef representing a given topic to publish a single message, see registerTemperature in the above code.

Use the method subscriber of the PubSubRef to acquire a stream Source of messages published to a given topic, see temperatureStream in the above code.

It is also possible to publish a stream of messages to a topic as is illustrated by this variant of the SensorService:

import akka.stream.Materializer

class SensorServiceImpl(pubSub: PubSubRegistry)
  (implicit materializer: Materializer) extends SensorService {

  def registerTemperature(id: String) = ServiceCall { temperatures =>
    val topic = pubSub.refFor(TopicId[Temperature](id))
    temperatures.runWith(topic.publisher)
    Future.successful(NotUsed.getInstance())
  }

  def temperatureStream(id: String) = ServiceCall { _ =>
    val topic = pubSub.refFor(TopicId[Temperature](id))
    Future.successful(topic.subscriber)
  }
}

Note how the incoming Source in registerTemperature is connected to the publisher Sink of the topic with the runWith method. Also note that we now have an implicit Materializer injected into the constructor, this is needed when running a stream. You can of course apply ordinary stream transformations of the incoming stream before connecting it to the publisher.

§Usage from Persistent Entity

You can publish messages from a Persistent Entity. First you must inject the PubSubRegistry to get hold of a PubSubRef for a given topic.

import com.lightbend.lagom.scaladsl.persistence.PersistentEntity
import com.lightbend.lagom.scaladsl.pubsub.{PubSubRegistry, TopicId}

final class Post(pubSubRegistry: PubSubRegistry) extends PersistentEntity {
  private val publishedTopic = pubSubRegistry.refFor(TopicId[PostPublished])

A command handler that publishes messages, in this case the PostPublished event, may look like this:

.onCommand[Publish.type, Done] {
  case (Publish, ctx, state) =>
    ctx.thenPersist(PostPublished(entityId)) { evt =>
      ctx.reply(Done)
      publishedTopic.publish(evt)
    }
}

To complete the picture, a service method that delivers these PostPublished events as a stream:

class BlogServiceImpl(pubSubRegistry: PubSubRegistry) extends BlogService {
  private val publishedTopic = pubSubRegistry.refFor(TopicId[PostPublished])

  override def publishedStream = ServiceCall { _ =>
    Future.successful(publishedTopic.subscriber)
  }
}

§Limitations

This feature is specifically for providing publish and subscribe functionality within a single services cluster. To publish and subscribe between services, you should instead use Lagom’s message broker support.

Published messages may be lost. For example in case of networks problems messages might not be delivered to all subscribers. Future version of Lagom may include intra-service pub-sub with at-least-once delivery, in the meantime you can achieve at-least-once delivery by using Lagom’s message broker support.

Note that anytime you fallback to message broker support you will expose your messages via a public topic making them part of your public API.

The registry of subscribers is eventually consistent, i.e. new subscribers are not immediately visible at other nodes, but typically the information will be fully replicated to all other nodes after a few seconds.

§Serialization

The published messages must be serializable since they will be sent across the nodes in the cluster of the service. JSON is the recommended serialization format for these messages. The Serialization section describes how to register serializers for the messages.

§Underlying Implementation

It is implemented with Akka Distributed Publish Subscribe.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.