Migrating to Akka Persistence Typed

§Migrating to Akka Persistence Typed

With the support for Akka Persistence Typed in Lagom it is possible to migrate existing code from Lagom Persistence (classic) to Akka Persistence Typed. There’s a few steps to consider in order to be able to read an existing journal.

Note: the only limitation when migrating from from Lagom Persistence (classic) to Akka Persistence Typed is that a full cluster shutdown is required. Even though all durable data is compatible, Lagom Persistence (classic) and Akka Persistence Typed can’t coexist.

Before you start, make sure you have read the page Domain Modelling with Akka Persistence Typed and you understand how to model a domain using Akka Persistence Typed.

§Migrating the model

Similarly to Lagom’s Persistent Entity, to create an Akka Persistence Typed EventSourcedBehavior you need:

  • a persistenceId: PersistenceId
  • an emptyState which represents the State before any event was ever persisted
  • a function (State, Command) => ReplyEffect to handle the commands, persist events and return a reply
  • a function (State, Event) => State to handle events and mutate the State
EventSourcedBehavior
  .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
    persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
    emptyState = ShoppingCart.empty,
    commandHandler = (cart, cmd) => cart.applyCommand(cmd),
    eventHandler = (cart, evt) => cart.applyEvent(evt)
  )

Instead of using a builder and adding multiple command and event handlers, in Akka Persistence Typed, you must define a command handler and an event handler function in which you can use Scala’s pattern matching to select the specific logic.

def applyCommand(cmd: ShoppingCartCommand): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
  cmd match {
    case x: UpdateItem => onUpdateItem(x)
    case x: Checkout   => onCheckout(x)
    case x: Get        => onReadState(x)
  }

def applyEvent(evt: ShoppingCartEvent): ShoppingCart =
  evt match {
    case ItemUpdated(productId, quantity) => updateItem(productId, quantity)
    case CheckedOut                       => checkout
  }

This migration guide will not go into more details related to writing command and event handlers. Refer to the Akka Persistence Typed docs or the section on domain modeling with Akka Persistence Typed for more information.

§Commands

Command classes are the other set of classes most impacted by the migration. First, a Command will no longer need to extend the ReplyType[R] of the Lagom API. That type was used to specify a type R for the reply produced by the Command. To specify the type R of the reply add a replyTo: ActorRef[R] field in the command.

Before:

sealed trait ShoppingCartCommand[R] extends ReplyType[R]

case class UpdateItem(productId: String, quantity: Int) extends ShoppingCartCommand[Summary]

object UpdateItem {
  implicit val format: Format[UpdateItem] = Json.format
}

After:

sealed trait ShoppingCartCommand extends ShoppingCartCommandSerializable
case class UpdateItem(productId: String, quantity: Int, replyTo: ActorRef[Confirmation])
    extends ShoppingCartCommand

The replyTo: ActorRef[R] is necessary to know where to send the response to. It must be added to all command classes and adding it has implication on the serialization of those classes. Make sure to review the Serialization section below and the Serialization pages later in this reference documentation.

§Replies

In Akka Typed, it’s not possible to return an exception to the caller. All communication between the actor and the caller must be done via the replyTo:ActorRef[R] passed in the command. Therefore, if you want to signal a rejection, you most have it encoded in your reply protocol.

See for example the Confirmation ADT below:

sealed trait Confirmation
case class Accepted(summary: Summary) extends Confirmation
case class Rejected(reason: String) extends Confirmation

Then, all the command handlers must produce a ReplyEffect. For operations that don’t mutate the model, use Effect.reply directly and for operations that persist events use Effect.persist(...).thenReply to create a ReplyEffect instance:

private def onCheckout(cmd: Checkout): ReplyEffect[ShoppingCartEvent, ShoppingCart] =
  if (items.isEmpty)
    Effect.reply(cmd.replyTo)(Rejected("Cannot checkout empty cart"))
  else
    Effect
      .persist(CheckedOut)
      .thenReply(cmd.replyTo)(updatedCart => Accepted(toSummary(updatedCart)))

See Modelling Commands and Replies for more details.

§Registration

In order to shard and distribute the EventSourcedBehavior instances across the cluster you will no longer use Lagom’s persistentEntityRegistry. Instead, Lagom now provides direct access to clusterSharding, an instance of Akka’s ClusterSharding extension you can use to initialize the sharding of EventSourcedBehavior.

Before, in the ShoppingCartLoader class we’d use the Lagom provided persistentEntityRegistry instance to register a macwire provided instance:

// Register the ShoppingCart persistent entity
persistentEntityRegistry.register(wire[ShoppingCartEntity])

That registration can be removed.

After, we use the Lagom provided clusterSharding instance to initialize the sharding of the event source Behavior under the ShoppingCart.typeKey identifier:

// in Akka Typed, this is the equivalent of Lagom's PersistentEntityRegistry.register
clusterSharding.init(
  Entity(ShoppingCart.typeKey) {
    ctx => ShoppingCart.behavior(ctx)
  }
)

To avoid entityId collisions across the cluster, initializing the sharding of a Behavior requires specifying an EntityTypeKey which acts as a namespacing. The EntityTypeKey is defined by a name and a type. The type indicates the kind of commands that can be sent to that sharded Behavior. In our example, we defined typeKey in object ShoppingCart:

object ShoppingCart {

  val typeKey = EntityTypeKey[ShoppingCartCommand]("ShoppingCartEntity")

  def empty: ShoppingCart = ShoppingCart(items = Map.empty)

  //#akka-persistence-typed-lagom-tagger-adapter
  def behavior(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
    //#akka-persistence-behavior-definition
    EventSourcedBehavior
      .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
        persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
        emptyState = ShoppingCart.empty,
        commandHandler = (cart, cmd) => cart.applyCommand(cmd),
        eventHandler = (cart, evt) => cart.applyEvent(evt)
      )
    //#akka-persistence-behavior-definition
      .withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
  }
  //#akka-persistence-typed-lagom-tagger-adapter

  implicit val format: Format[ShoppingCart] = Json.format
}

§Sending a command

In order to send commands to your Behavior instance you will have to obtain a reference to the actor where the Behavior run and send commands to it.

Before:

/**
  * Looks up the shopping cart entity for the given ID.
  */
private def entityRef(id: String) =
  persistentEntityRegistry.refFor[ShoppingCartEntity](id)

override def get(id: String): ServiceCall[NotUsed, String] = ServiceCall { _ =>
  entityRef(id)
    .ask(Get)
    .map(cart => asShoppingCartView(id, cart))
}

After:

/**
 * Looks up the shopping cart entity for the given ID.
 */
private def entityRef(id: String): EntityRef[ShoppingCartCommand] =
  clusterSharding.entityRefFor(ShoppingCart.typeKey, id)

implicit val timeout = Timeout(5.seconds)

override def get(id: String): ServiceCall[NotUsed, String] = ServiceCall { _ =>
  entityRef(id)
    .ask { reply: ActorRef[Summary] => Get(reply) }
    .map { cart => asShoppingCartView(id, cart) }
}

That is, instead of injecting a persistentEntityRegistry, use a clusterSharding instance. Instead of getting a PersistentEntityRef[T] you will obtain an EntityRef[T]. Both PersistentEntityRef[T] and EntityRef[T] provide a method called ask but their signatures are different. EntityRef[T] is part of the API of Akka Cluster Sharding and it expects a ActorRef[R] => C factory method which given a reference to a replyTo actor of type ActorRef[R] will produce a command C (see reply => Get(reply) in the code above). Then the ask method also expects an implicit timeout. The result is a Future[R] with the reply instance produced in the EventSourceBehavior.

§ Registration: caveats

Even though there is no longer a PersistentEntity instance to register, the persistentEntityRegistry is still necessary to build TopicProducer’s. When writing a Service implementation that includes a Topic Implementation the TopicProducer API requires an eventStream that is provided by the persistentEntityRegistry. This means that in some cases you will have to inject both the persistentEntityRegistry and the clusterSharding.

That is, even if you don’t register a PersistentEntity, the events produced by Akka Persistence Typed are still compatible with PersistentEntityRegistry.eventStream as long as they are properly tagged so the projections (Read Sides and Topic Producers) don’t change.

§Maintaining compatibility

Migrating to Akka Persistence Typed requires maintaining compatibility with data previously produced and persisted in the database journal. This requires focusing on three areas: De/Serialization of events (detailed later), PersistenceId and tagging.

In order to be able to read existing events using Akka Persistence Typed you must use a PersistenceId that produces an identical persistenceId string as internally done by Lagom’s PersistenceEntity’s API.

EventSourcedBehavior
  .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
    persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
    emptyState = ShoppingCart.empty,
    commandHandler = (cart, cmd) => cart.applyCommand(cmd),
    eventHandler = (cart, evt) => cart.applyEvent(evt)
  )

The code above uses PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId). There are three important pieces on that statement that we must review:

  1. The first argument of PersistenceId.apply() must be the same value you used in Lagom Persistence (classic). This first argument is known as the typeHint and is used by the journal as a mechanism to avoid ID collision between different types. In Lagom Persistence (classic) the type hint defaults to the classname of your PersistentEntity but it can be overwritten (review your code or the persisted data on your database). In our case, we are using entityContext.entityTypeKey.name because we defined the type key as EntityTypeKey[ShoppingCartCommand]("ShoppingCartEntity") where "ShoppingCartEntity" is the classname of the code we had in the implementation based on Lagom Persistence (classic).
  2. The second argument must be the business id of your Aggregate. In this case, we can use entityContext.entityId because we’re using that same business id for the sharded actor.
  3. An (optional) third argument specifying a separator. Lagom uses the "|" as a separator and, since PersistenceId also uses "|" as a default we’re not specifying a separator.

Even if you use the appropriate PersistenceId, you need to use a compatible serializer for your events. Read more about De/Serialization in the section below.

Finally, only tagged events are readable by Lagom projections (either Read Sides and Topic Producers), and Lagom projections expect event tags to honour certain semantics. Finally, for events to be consumed in the correct order you must keep tagging the events in the same way as in your previous Lagom application.

Lagom provides an AkkaTaggerAdapter utility class that can be used to convert an existing Lagom AggregateEventTag to the appropriated tagging function expected by Akka Persistence Typed. When defining the EventSourcedBehavior specify a tagger using withTagger with the AkkaTaggerAdapter.fromLagom:

def behavior(entityContext: EntityContext[ShoppingCartCommand]): Behavior[ShoppingCartCommand] = {
  //#akka-persistence-behavior-definition
  EventSourcedBehavior
    .withEnforcedReplies[ShoppingCartCommand, ShoppingCartEvent, ShoppingCart](
      persistenceId = PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId),
      emptyState = ShoppingCart.empty,
      commandHandler = (cart, cmd) => cart.applyCommand(cmd),
      eventHandler = (cart, evt) => cart.applyEvent(evt)
    )
  //#akka-persistence-behavior-definition
    .withTagger(AkkaTaggerAdapter.fromLagom(entityContext, ShoppingCartEvent.Tag))
}

§Serialization

All the classes sent over the wire or stored on the database will still need to be serializable. Persisted events need to be read.

Existing code creating and registering serializers is 100% valid except for Command classes. In Akka Typed, it is required to add a replyTo: ActorRef[Reply] field on messages that need a reference to reply back. In order to serialize a class that includes an ActorRef[T] field the class must use the Akka Jackson serializer. Read more on the serialization section of the docs.

To convert your Command classes to use Akka Jackson serialization instead of Lagom Play-JSON you need to follow these steps:

First,create a marker trait. For example:

/**
 * This is a marker trait for shopping cart commands.
 * We will serialize them using the Akka Jackson serializer that is able to
 * deal with the replyTo field. See application.conf
 */
trait ShoppingCartCommandSerializable

Then, use the regular Akka serialization binding mechanism so all classes extending that trait use the Akka Jackson JSON serializer:

akka.actor {
  serialization-bindings {
    # commands won't use Lagom Play-Json serializers but Akka Jackson serializers
    "com.example.shoppingcart.impl.ShoppingCartCommandSerializable" = jackson-json
  }
}

Then, remove all code that’s play-json from your Command classes and companion objects.

Before:

sealed trait ShoppingCartCommand[R] extends ReplyType[R]

case class UpdateItem(productId: String, quantity: Int) extends ShoppingCartCommand[Summary]

object UpdateItem {
  implicit val format: Format[UpdateItem] = Json.format
}

After:

sealed trait ShoppingCartCommand extends ShoppingCartCommandSerializable
case class UpdateItem(productId: String, quantity: Int, replyTo: ActorRef[Confirmation])
    extends ShoppingCartCommand

Note how the type of the reply is no longer specified via ReplyType[T] but as the type of the protocol the replyTo: ActorRef[T] actor.

And finally, remove all commands from JsonSerialiserRegistry

override def serializers: Seq[JsonSerializer[_]] = Seq(
  JsonSerializer[ItemUpdated],
  JsonSerializer[CheckedOut.type],
  JsonSerializer[UpdateItem],
  JsonSerializer[Checkout.type],
  JsonSerializer[Get.type],
  JsonSerializer[ShoppingCart],
  JsonSerializer[ShoppingCartException]
)
override def serializers: Seq[JsonSerializer[_]] = Seq(
  // state and events can use play-json, but commands should use jackson because of ActorRef[T] (see application.conf)
  JsonSerializer[ShoppingCart],
  JsonSerializer[ItemUpdated],
  JsonSerializer[CheckedOut.type],
  // the replies use play-json as well
  JsonSerializer[Summary],
  JsonSerializer[Confirmation],
  JsonSerializer[Accepted],
  JsonSerializer[Rejected]
)

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.