Integrating with Akka

§Advanced Topic: Integrating with Akka

Lagom is built with Akka as one of the underlying technologies. Nonetheless, writing simple Lagom services generally won’t require interacting with Akka directly.

More advanced users may want direct access, as described in this section.

§Usage from Service Implementation

Pretty much everything in Akka is accessible through an ActorSystem object. You can inject the current ActorSystem into your service implementations or persistent entities with ordinary dependency injection.

Let’s look at an example of a WorkerService that accepts job requests and delegates the work to actors running on other nodes in the service’s cluster.

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.routing.ClusterRouterGroup
import akka.cluster.routing.ClusterRouterGroupSettings
import akka.routing.ConsistentHashingGroup
import akka.pattern.ask
import akka.util.Timeout
import com.lightbend.lagom.scaladsl.api.ServiceCall
import scala.concurrent.duration._

class WorkerServiceImpl(system: ActorSystem) extends WorkerService {
  if (Cluster.get(system).selfRoles("worker-node")) {
    // start a worker actor on each node that has the "worker-node" role
    system.actorOf(Worker.props, "worker")
  }

  // start a consistent hashing group router,
  // which will delegate jobs to the workers. It is grouping
  // the jobs by their task, i.e. jobs with same task will be
  // delegated to same worker node
  val workerRouter = {
    val paths = List("/user/worker")
    val groupConf = ConsistentHashingGroup(paths, hashMapping = {
      case Job(_, task, _) => task
    })
    val routerProps = ClusterRouterGroup(
      groupConf,
      ClusterRouterGroupSettings(
        totalInstances = 1000,
        routeesPaths = paths,
        allowLocalRoutees = true,
        useRoles = Set("worker-node")
      )
    ).props
    system.actorOf(routerProps, "workerRouter")
  }

  def doWork = ServiceCall { job =>
    implicit val timeout = Timeout(5.seconds)
    (workerRouter ? job).mapTo[JobAccepted]
  }
}

Notice how the ActorSystem is injected through the constructor. We create worker actors on each node that has the “worker-node” role. We create a consistent hashing group router that delegates jobs to the workers. Details on these features are in the Akka documentation.

The worker actor looks like this:

import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging

object Worker {
  def props = Props[Worker]
}

class Worker extends Actor {
  private val log = Logging.getLogger(context.system, this)

  override def receive = {
    case job @ Job(id, task, payload) =>
      log.info("Working on job: {}", job)
      sender ! JobAccepted(id)
      // perform the work...
      context.stop(self)
  }
}

The messages are ordinary case classes. Note that they extend Jsonable since they need proper Serialization when they are sent across nodes in the cluster of the service, and the have formats created for them:

import play.api.libs.json.Format
import play.api.libs.json.Json

case class Job(jobId: String, task: String, payload: String)
object Job {
  implicit val format: Format[Job] = Json.format
}
case class JobAccepted(jobId: String)
object JobAccepted {
  implicit val format: Format[JobAccepted] = Json.format
}

These formats needed to be added to the serialization registry, as described in the cluster serialization documentation.

§Updating Akka version

If you want to use a newer version of Akka, one that is not used by Lagom yet, you can add the following to your build.sbt file:

// The newer Akka version you want to use.
val akkaVersion = "2.6.0-RC2"

// Akka dependencies used by Lagom
dependencyOverrides ++= Seq(
  "com.typesafe.akka" %% "akka-actor"                  % akkaVersion,
  "com.typesafe.akka" %% "akka-remote"                 % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster"                % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-sharding"       % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-tools"          % akkaVersion,
  "com.typesafe.akka" %% "akka-cluster-typed"          % akkaVersion,
  "com.typesafe.akka" %% "akka-coordination"           % akkaVersion,
  "com.typesafe.akka" %% "akka-discovery"              % akkaVersion,
  "com.typesafe.akka" %% "akka-distributed-data"       % akkaVersion,
  "com.typesafe.akka" %% "akka-serialization-jackson"  % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence"            % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence-query"      % akkaVersion,
  "com.typesafe.akka" %% "akka-slf4j"                  % akkaVersion,
  "com.typesafe.akka" %% "akka-stream"                 % akkaVersion,
  "com.typesafe.akka" %% "akka-protobuf-v3"            % akkaVersion,
  "com.typesafe.akka" %% "akka-actor-typed"            % akkaVersion,
  "com.typesafe.akka" %% "akka-persistence-typed"      % akkaVersion,
  "com.typesafe.akka" %% "akka-multi-node-testkit"     % akkaVersion % Test,
  "com.typesafe.akka" %% "akka-testkit"                % akkaVersion % Test,
  "com.typesafe.akka" %% "akka-stream-testkit"         % akkaVersion % Test,
  "com.typesafe.akka" %% "akka-actor-testkit-typed"    % akkaVersion % Test,
  // Use "sbt-dependency-graph" or any other dependency report generator to
  // make sure you add all the necessary dependencies on this list
)

Of course, other Akka artifacts can be added transitively. Use sbt-dependency-graph to better inspect your build and check which ones you need to add explicitly.

Note: When doing such updates, keep in mind that you need to follow Akka’s Binary Compatibility Rules. And if you are manually adding other Akka artifacts, remember to keep the version of all the Akka artifacts consistent since mixed versioning is not allowed.

§Adding other Akka dependencies

If you want to use Akka artifacts that are not added transtively by Lagom, you can use com.lightbend.lagom.core.LagomVersions.akka to ensure all the artifacts will use a consistent version. For example:

import com.lightbend.lagom.core.LagomVersion

libraryDependencies ++= Seq(
  "com.typesafe.akka" %% "akka-stream-typed" % LagomVersion.akka
)

Note: When resolving dependencies, sbt will get the newest one declared for this project or added transitively. It means that if Play depends on a newer Akka (or Akka HTTP) version than the one you are declaring, Play version wins. See more details about how sbt does evictions here.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.