Relational Database Read-Side Support

§Relational Database Read-Side support

This page is specifically about Lagom’s support for relational database read-sides. Before reading this, you should familiarize yourself with Lagom’s general read-side support.

§Query the Read-Side Database

Let us first look at how a service implementation can retrieve data from a relational database.


import akka.NotUsed; import com.lightbend.lagom.javadsl.api.ServiceCall; import com.lightbend.lagom.javadsl.persistence.jdbc.JdbcSession; import org.pcollections.PSequence; import org.pcollections.TreePVector; import javax.inject.Inject; import java.sql.ResultSet;
public class BlogServiceImpl implements BlogService {

    private final JdbcSession jdbcSession;

    @Inject
    public BlogServiceImpl(JdbcSession jdbcSession) {
        this.jdbcSession = jdbcSession;
    }

    @Override
    public ServiceCall<NotUsed, PSequence<PostSummary>> getPostSummaries() {
        return request -> {
            return jdbcSession.withConnection(connection -> {
                ResultSet rs = connection.prepareStatement("SELECT id, title FROM blogsummary")
                        .executeQuery();
                PSequence<PostSummary> summaries = TreePVector.empty();

                while (rs.next()) {
                    summaries = summaries.plus(
                            new PostSummary(rs.getString("id"), rs.getString("title"))
                    );
                }

                return summaries;
            });
        };
    }
}

Note that the JdbcSession is injected in the constructor. JdbcSession allows access to a connection from the connection pool, using the withConnection method, and will manage transactions using the withTransaction method. Importantly, JdbcSession also manages execution of the blocking JDBC calls in a threadpool designed to handle it, which is why the withConnection and withTransaction methods return CompletionStage.

§Update the Read-Side

We need to transform the events generated by the Persistent Entities into database tables that can be queried as illustrated in the previous section. For that we will implement a ReadSideProcessor with assistance from the JdbcReadSide support component. It will consume events produced by persistent entities and update one or more JDBC tables that are optimized for queries.

This is how a ReadSideProcessor class looks like before filling in the implementation details:

import com.lightbend.lagom.javadsl.persistence.AggregateEventTag;
import com.lightbend.lagom.javadsl.persistence.ReadSideProcessor;
import com.lightbend.lagom.javadsl.persistence.jdbc.JdbcReadSide;
import org.pcollections.PSequence;

import javax.inject.Inject;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class BlogEventProcessor extends ReadSideProcessor<BlogEvent> {

    private final JdbcReadSide readSide;

    @Inject
    public BlogEventProcessor(JdbcReadSide readSide) {
        this.readSide = readSide;
    }

    @Override
    public ReadSideHandler<BlogEvent> buildHandler() {
        // TODO build read side handler
        return null;
    }

    @Override
    public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
        // TODO return the tag for the events
        return null;
    }
}

You can see that we have injected the JDBC read-side support, this will be needed later.

You should already have implemented tagging for your events as described in the Read-Side documentation, so first we’ll implement the aggregateTags method in our read-side processor stub, like so:

@Override
public PSequence<AggregateEventTag<BlogEvent>> aggregateTags() {
    return BlogEvent.TAG.allTags();
}

§Building the read-side handler

The other method on the ReadSideProcessor is buildHandler. This is responsible for creating the ReadSideHandler that will handle events. It also gives the opportunity to run two callbacks, one is a global prepare callback, the other is a regular prepare callback.

JdbcReadSide has a builder method for creating a builder for these handlers, this builder will create a handler that will automatically manage transactions and handle read-side offsets for you. It can be created like so:

JdbcReadSide.ReadSideHandlerBuilder<BlogEvent> builder =
        readSide.builder("blogsummaryoffset");

The argument passed to this method is an identifier for the read-side processor that Lagom should use when it persists the offset. Lagom will store the offsets in a table that it will automatically create itself if it doesn’t exist. If you would prefer that Lagom didn’t automatically create this table for you, you can turn off this feature by setting lagom.persistence.jdbc.create-tables.auto=false in application.conf. The DDL for the schema for this table is as follows:

CREATE TABLE read_side_offsets (
  read_side_id VARCHAR(255), tag VARCHAR(255),
  sequence_offset bigint, time_uuid_offset char(36),
  PRIMARY KEY (read_side_id, tag)
)

§Global prepare

The global prepare callback runs at least once across the whole cluster. It is intended for doing things like creating tables and preparing any data that needs to be available before read side processing starts. Read side processors may be sharded across many nodes, and so tasks like creating tables should usually only be done from one node.

The global prepare callback is run from an Akka cluster singleton. It may be run multiple times - every time a new node becomes the new singleton, the callback will be run. Consequently, the task must be idempotent. If it fails, it will be run again using an exponential backoff, and the read side processing of the whole cluster will not start until it has run successfully.

Of course, setting a global prepare callback is completely optional, you may prefer to manage database tables manually, but it is very convenient for development and test environments to use this callback to create them for you.

Below is an example method that we’ve implemented to create tables:

private void createTable(Connection connection) throws SQLException {
    connection.prepareStatement("CREATE TABLE IF NOT EXISTS blogsummary ( " +
            "id VARCHAR(64), title VARCHAR(256), PRIMARY KEY (id))").execute();
}

It can then be registered as the global prepare callback in the buildHandler method:

builder.setGlobalPrepare(this::createTable);

§Event handlers

The event handlers take an event and a connection, and updates the read-side accordingly.

Here’s an example callback for handling the PostAdded event:

private void processPostAdded(Connection connection, BlogEvent.PostAdded event) throws SQLException {
    PreparedStatement statement = connection.prepareStatement(
            "INSERT INTO blogsummary (id, title) VALUES (?, ?)");
    statement.setString(1, event.getPostId());
    statement.setString(2, event.getContent().getTitle());
    statement.execute();
}

This can then be registered with the builder using setEventHandler:

builder.setEventHandler(BlogEvent.PostAdded.class, this::processPostAdded);

Once you have finished registering all your event handlers, you can invoke the build method and return the built handler:

return builder.build();

Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.