Message Broker Testing

§Message Broker Testing

When decoupling communication via a Broker you can test from both ends of the Topic. When your Service is publishing events into a Topic (as described in Declaring a Topic) your tests should verify the proper data is being pushed into the Topic. At same time, when your service is subscribed to an upstream Topic you may want to test how your Service behaves when there are incoming events.

A broker will not be started neither when writing publish nor consumption tests. Instead, Lagom provides in-memory implementations or the Broker API in order to make tests faster. Integration tests with a complete broker should be later implemented but that is out of scope of this documentation. The provided in-memory implementation of the Broker API runs locally and provides exactly-once delivery. If you want to test your code under scenarios where there’s message loss (at-most-once) or message duplicates (at-least-once) you will be responsible for writing such behaviour by injecting duplicates or skipping messages.

The Lagom in-memory broker implementation will also help testing your message serialisation and deserialisation. That is only available in the tools to test publishing though since the publishing end is the one responsible to describe the messages being sent over the wire. When you test the consuming end of a topic, no de/serialisation will be run under the covers.

The following code samples use the HelloService and AnotherService already presented in previous sections. HelloService publishes GreetingsMessages on the "greetings" topic and AnotherService subscribed to those messages using atLeastOnce semantics.

§Testing publish

When a Service publishes data into a Topic the descriptor lists a TopicCall on the public API. Testing the event publishing is very similar to testing ServiceCall’s in your Service API (see Service testing).

"The PublishService" should {
  "publish events on the topic" in ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
    new PublishApplication(ctx) with LocalServiceLocator
      with TestTopicComponents
  } { server =>

    implicit val system = server.actorSystem
    implicit val mat = server.materializer

    val client: PublishService = server.serviceClient.implement[PublishService]
    val source =
      .expectNext should ===(PubMessage("msg 1"))


In order to start the application with a stubbed broker you will have to mixin a TestTopicComponents into your test application.

Use a ServiceTest you to create a client to your Service and using that client you can subscribe to the published topics. Finally, after interacting with the Service to cause the emission of some events you can assert events were published on the Topic.

The producer end is responsible to describe the public API and provide the serialisable mappings for all messages exchanged (both in ServiceCalls and TopicCalls). The tests granting the proper behavior of the publishing operations should also test the serialisbility and deserilisability of the messages.

§Testing subscription

Testing the consumption of messages requires starting the Service under test with a stub of the upstream Service producing data into the topic. The following snippet demonstrates how to achieve it.

  1. An in-memory Topic is required and means to send messages into it. Using the ProducerStubFactory it’s possible to obtain a ProducerStub given a topic name.
  2. With the producerStub instance a service stub can be build to replace the production ready upstream service. This will have to use the topic bound to the ProducerStub created in the previous step.
  3. Use the ProducerStub on the tests to send messages into the topic and interact normally with the service under test to verify the Service code.
class AnotherServiceSpec extends WordSpec with Matchers with Eventually with ScalaFutures {
  var producerStub: ProducerStub[GreetingMessage] = _

  "The AnotherService" should {
    "publish updates on greetings message" in
      ServiceTest.withServer(ServiceTest.defaultSetup) { ctx =>
        new AnotherApplication(ctx) with LocalServiceLocator {

          // (1) creates an in-memory topic and binds it to a producer stub
          val stubFactory = new ProducerStubFactory(actorSystem, materializer)
          producerStub =

          // (2) Override the default Hello service with our service stub
          // which gets the producer stub injected
          override lazy val helloService = new HelloServiceStub(producerStub)
      } { server =>

        // (3) produce a message in the stubbed topic via it's producer
        producerStub.send(GreetingMessage("Hi there!"))

        // create a service client to assert the message was consumed
        eventually(timeout(Span(5, Seconds))) {
          // cannot use async specs here because eventually only detects raised exceptions to retry.
          // if a future fail at the first time, eventually won't retry though future will succeed later.
          // see for detail info.
          val futureResp = server.serviceClient.implement[AnotherService].foo.invoke()
          whenReady(futureResp) { resp =>
            resp should ===("Hi there!")

// (2) a Service stub that will use the in-memoru topic bound to
// our producer stub
class HelloServiceStub(stub: ProducerStub[GreetingMessage])
  extends HelloService {
  override def greetingsTopic(): Topic[GreetingMessage] = stub.topic

  override def hello(id: String): ServiceCall[NotUsed, String] = ???

  override def useGreeting(id: String): ServiceCall[GreetingMessage, Done] = ???

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.