Integrating with Akka

§Advanced Topic: Integrating with Akka

Lagom is built with Akka as one of the underlying technologies. Nonetheless, writing simple Lagom services generally won’t require interacting with Akka directly.

More advanced users may want direct access, as described in this section.

§Usage from Service Implementation

Most Akka functions are accessible through an ActorSystem object. You can inject the current ActorSystem into your service implementations or persistent entities with ordinary dependency injection.

Let’s look at an example of a WorkerService that accepts job requests and delegates the work to actors running on other nodes in the service’s cluster.

import akka.actor.ActorSystem
import akka.cluster.Cluster
import akka.cluster.routing.{ClusterRouterGroup, ClusterRouterGroupSettings}
import akka.routing.ConsistentHashingGroup
import akka.pattern.ask
import akka.util.Timeout
import com.lightbend.lagom.scaladsl.api.ServiceCall
import scala.concurrent.duration._

class WorkerServiceImpl(system: ActorSystem) extends WorkerService {
  if (Cluster.get(system).selfRoles("worker-node")) {
    // start a worker actor on each node that has the "worker-node" role
    system.actorOf(Worker.props, "worker")
  }

  // start a consistent hashing group router,
  // which will delegate jobs to the workers. It is grouping
  // the jobs by their task, i.e. jobs with same task will be
  // delegated to same worker node
  val workerRouter = {
    val paths = List("/user/worker")
    val groupConf = ConsistentHashingGroup(paths, hashMapping = {
      case Job(_, task, _) => task
    })
    val routerProps = ClusterRouterGroup(groupConf,
      ClusterRouterGroupSettings(
        totalInstances = 1000,
        routeesPaths = paths,
        allowLocalRoutees = true,
        useRoles = Set("worker-node")
      )
    ).props
    system.actorOf(routerProps, "workerRouter")
  }

  def doWork = ServiceCall { job =>
    implicit val timeout = Timeout(5.seconds)
    (workerRouter ? job).mapTo[JobAccepted]
  }
}

Notice how the ActorSystem is injected through the constructor. We create worker actors on each node that has the “worker-node” role. We create a consistent hashing group router that delegates jobs to the workers. Details on these features are in the Akka documentation.

The worker actor looks like this:

import akka.actor.{Actor, Props}
import akka.event.Logging

object Worker {
  def props = Props[Worker]
}

class Worker extends Actor {
  private val log = Logging.getLogger(context.system, this)

  override def receive = {
    case job @ Job(id, task, payload) =>
      log.info("Working on job: {}", job)
      sender ! JobAccepted(id)
      // perform the work...
      context.stop(self)
  }
}

The messages are ordinary case classes. Note that they extend Jsonable since they need proper Serialization when they are sent across nodes in the cluster of the service, and the have formats created for them:

import play.api.libs.json.{Format, Json}

case class Job(jobId: String, task: String, payload: String)
object Job {
  implicit val format: Format[Job] = Json.format
}
case class JobAccepted(jobId: String)
object JobAccepted {
  implicit val format: Format[JobAccepted] = Json.format
}

These formats needed to be added to the serialization registry, as described in the cluster serialization documentation.

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.