Implementing Event Sourced Entities with Spring

Event Sourced Entities persist their state with ACID semantics new tab, scale horizontally, and isolate failures. They use the Event Sourcing Model—​rather than persisting the current state, they persist all of the events that led to the current state. Kalix stores these events in a journal.

An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.

When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?

To load an Entity, Kalix reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Kalix to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.

In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.

Event Sourced Entities offer strong consistency guarantees. Kalix distributes Entities across every node in a stateful service deployment—​at any given time, each Entity will live on exactly one node. If a command for an Entity arrives to a node not hosting that Entity, the command is forwarded by the proxy to the node that contains that particular Entity. This forwarding is done transparently, your code does not need to know. Because each Entity lives on exactly one node, that node can handle messages for each Entity sequentially. Hence, there are no concurrency concerns relating to Event Sourced Entities, each Entity handles one message at a time.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing new tab.

Event Sourced Entities persist changes as events and snapshots. Kalix needs to serialize that data to send it to the underlying data store. However, we recommend that you do not persist your service’s public API messages. This may introduce some overhead to convert from one type to an internal one but allows the service public interface logic to evolve independently of the data storage format, which should be private.

The steps necessary to implement an Event Sourced Entity include:

  1. Model the entity’s state and its domain events.

  2. Implementing behavior in command and event handlers.

  3. Creating and initializing the Entity.

The following sections walk through these steps using a shopping cart service as an example (working sample available here new tab).

Model the entity

Through our "Shopping Cart" Event Sourced Entity we expect to manage our cart, adding and removing items as we please. Being event-sourced means it will represent changes to state as a series of domain events. So let’s have a look at what kind of model we expect to store and the events our entity might generate.

src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public record ShoppingCart(String cartId, List<LineItem> items) { (1)

  public record LineItem(String productId, String name, int quantity) { (2)
  }
}
1 Our ShoppingCart is fairly simple, being composed only by a cartId and a list of line items.
2 A LineItem represents a single product and the quantity we intend to buy.
Above we are taking advantage of the Java record to reduce the amount of boilerplate code, but you can use regular classes so long as they can be serialized to JSON (e.g. using Jackson annotations).

Another fundamental aspect of our entity will be its domain events. For now, we will have 3 different events ItemAdded, ItemRemoved and CheckedOut, defined as below:

src/main/java/com/example/shoppingcart/domain/ShoppingCartEvent.java
import kalix.springsdk.annotations.TypeName;
public sealed interface ShoppingCartEvent { (1)

  @TypeName("item-added") (2)
  record ItemAdded(ShoppingCart.LineItem item) implements ShoppingCartEvent {}

  @TypeName("item-removed")
  record ItemRemoved(String productId) implements ShoppingCartEvent {}

  @TypeName("checked-out")
  record CheckedOut(int timestamp) implements ShoppingCartEvent {}
}
1 The 3 types of event all derive from the same type ShoppingCartEvent.
2 Includes the logical type name using @TypeName annotation.
The use of logical names for subtypes is essential for maintainability purposes. Since that information is persisted, failing to do so might lead to the use of a FQCN preventing the application from correctly deserialize the events in case of a package change.

Implementing behavior

Now that we have our Entity state defined along with its events, the remaining steps can be summarized as follows:

  • declare your entity and pick an entity key (it needs to be unique as it will be used for sharding purposes);

  • define an access point (i.e. a route path) to your entity;

  • implement how each command is handled and which event(s) it generates;

  • provide a handler for each event and how it affects the entity’s state.

Let’s have a look at what our shopping cart entity will look like for the first 2 steps from the above list:

src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
@EntityKey("cartId") (2)
@EntityType("shopping-cart") (3)
@RequestMapping("/cart") (4)
public class ShoppingCartEntity extends EventSourcedEntity<ShoppingCart> { (1)

}
1 Create a class that extends EventSourcedEntity<S>, where S is the state type this entity will store (i.e. ShoppingCart).
2 Annotate such class with @EntityKey and pass the name of the key that will be used as the entity unique identifier.
3 Make sure to annotate such class with @EntityType and pass a unique name for this entity type.
4 Use Spring’s RequestMapping annotation to define the route to your entity.
The EntityKey cartId must match a path parameter (i.e. cartId) and such value needs to be unique per entity. On the other hand, the EntityType shopping-cart is common for all instances of this entity but must be stable - cannot be changed after a production deploy - and unique across the different entity types.

Generated Entity Keys

In some cases, you may wish to generate an Entity key, this is typically useful when creating an entity, and the key is a surrogate key. To indicate to Kalix that an Entity key should be generated rather than extracted from the path, be sure to annotate the corresponding command method with @GenerateEntityKey. It will often be necessary to access the generated entity key from inside the entities code. This can be done using the EntityContext.entityId new tab method, as exemplified below:

src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
  @GenerateEntityKey (1)
  @PostMapping("/create")
  public Effect<String> create() {
    return effects()
        .reply(commandContext().entityId()); (2)
  }
1 Annotate the command handler with @GenerateEntityKey.
2 Access the generated id and reply.
This will generate a version 4 (random) UUID for the Entity. Only version 4 UUIDs are currently supported for generated Entity keys.

Updating state

Having created the basis of our entity, we will now define how each command is handled. In the example below, we define a new endpoint that will add a new line item to a given shopping cart. It returns an Effect to emit an event and then sends a reply once the event is stored successfully. The state is updated by the event handler.

The only way for a command handler to modify the Entity’s state is by emitting an event. Any modifications made directly to the state (or instance variables) from the command handler are not persisted. When the Entity is passivated and reloaded, those modifications will not be present.
src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
@PostMapping("/{cartId}/add")
public Effect<String> addItem(@RequestBody LineItem item) {
  if (item.quantity() <= 0) { (1)
    return effects().error("Quantity for item " + item.productId() + " must be greater than zero.");
  }

  var event = new ShoppingCartEvent.ItemAdded(item); (2)

  return effects()
      .emitEvent(event) (3)
      .thenReply(newState -> "OK"); (4)
}

@EventHandler (5)
public ShoppingCart itemAdded(ShoppingCartEvent.ItemAdded itemAdded) {
  return currentState().onItemAdded(itemAdded); (6)
}
1 The validation ensures the quantity of items added is greater than zero and it fails for calls with illegal values by returning an Effect with effects().error.
2 From the current incoming LineItem we create a new ItemAdded event representing the change of the cart.
3 We store the event by returning an Effect with effects().emitEvent.
4 The acknowledgment that the command was successfully processed is only sent if the event was successfully stored and applied, otherwise there will be an error reply. The lambda parameter newState gives us access to the new state returned by applying such event.
5 Event handler needs to be marked with @EventHandler and receive a single parameter type matching the event type produced (i.e. ItemAdded).
6 Return the new state to be stored - the logic for state transition is defined inside the ShoppingCart domain model.
There needs to be one event handler declared per each type of event the ES entity emits (e.g. itemAdded receives a parameter of type ItemAdded, the same type emitted in addItem command handler).

As mentioned above, the business logic that allows us to transition between states was placed on the domain model as seen below:

src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public ShoppingCart onItemAdded(ShoppingCartEvent.ItemAdded itemAdded) {
  var item = itemAdded.item();
  var lineItem = updateItem(item, this); (1)
  List<LineItem> lineItems =
      removeItemByProductId(this, item.productId()); (2)
  lineItems.add(lineItem); (3)
  lineItems.sort(Comparator.comparing(LineItem::productId));
  return new ShoppingCart(cartId, lineItems); (4)
}

private static List<LineItem> removeItemByProductId(
    ShoppingCart cart, String productId) {
  return cart.items().stream()
      .filter(lineItem -> !lineItem.productId().equals(productId))
      .collect(Collectors.toList());
}

private static LineItem updateItem(LineItem item, ShoppingCart cart) {
  return cart.findItemByProductId(item.productId())
      .map(li -> li.withQuantity(li.quantity() + item.quantity()))
      .orElse(item);
}

public Optional<LineItem> findItemByProductId(String productId) {
  Predicate<LineItem> lineItemExists =
      lineItem -> lineItem.productId().equals(productId);
  return items.stream().filter(lineItemExists).findFirst();
}
1 For an existing item, we will make sure to sum the existing quantity with the incoming one.
2 Returns an update list of items without the existing item.
3 Adds the update item to the shopping cart.
4 Returns a new instance of the shopping cart with the updated line items.

Retrieving state

To have access to the current state of the entity we can use currentState() as you have probably noticed from the examples above. However, what if this is the first command we are receiving for this entity? The following example shows the implementation of the read-only command handler (accessed through GET /cart/myCarId):

src/main/java/com/example/shoppingcart/ShoppingCartEntity.java
private final String entityId;

public ShoppingCartEntity(EventSourcedEntityContext context) {
  this.entityId = context.entityId(); (1)
}

@Override
public ShoppingCart emptyState() { (2)
  return new ShoppingCart(entityId, Collections.emptyList());
}

@GetMapping("/{cartId}") (3)
public Effect<ShoppingCart> getCart() {
  return effects().reply(currentState()); (4)
}
1 Stores the entityId on an internal attribute so we can use it later.
2 Provides initial state - overriding emptyState() is optional but if not doing it, be careful to deal with a currentState() with a null value when receiving the first command or event.
3 Marks this method as a command handler for GET requests.
4 Returns the current state as reply for the request.
For simplicity purposes, we are returning the internal state directly back to the requester. In a real-world scenario, it’s usually better to instead convert this internal domain model into a public model so the internal representation is free to evolve without breaking clients code.

Snapshots

Snapshots are an important optimization for Event Sourced Entities that emit many events. Rather than reading the entire journal upon loading or restart, Kalix can initiate them from a snapshot.

Snapshots are stored and handled automatically by Kalix without any specific code required. Snapshots are stored after a configured number of events:

src/main/resources/application.conf
kalix.event-sourced-entity.snapshot-every = 100

When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received.

Testing the Entity

The following snippet shows how the EventSourcedTestKit is used to test the ShoppingCartEntity implementation. Kalix provides two main APIs for unit tests, the EventSourcedTestKit and the EventSourcedResult. The former gives us the overall state of the entity and all the events produced by all the calls to the Entity. While the latter only holds the effects produced for each individual call to the Entity.

src/test/java/com/example/shoppingcart/domain/ShoppingCartTest.java
package com.example.shoppingcart.domain;

import com.example.shoppingcart.ShoppingCartEntity;
import kalix.javasdk.testkit.EventSourcedResult;
import kalix.springsdk.testkit.EventSourcedTestKit;
import org.junit.jupiter.api.Test;

import java.util.List;

import static com.example.shoppingcart.domain.ShoppingCartEvent.*;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class ShoppingCartTest {

  private final ShoppingCart.LineItem akkaTshirt = new ShoppingCart.LineItem("akka-tshirt", "Akka Tshirt", 10);


  @Test
  public void testAddLineItem() {

    EventSourcedTestKit<ShoppingCart, ShoppingCartEntity> testKit = EventSourcedTestKit.of(ShoppingCartEntity::new); (1)
    {
      EventSourcedResult<String> result = testKit.call(e -> e.addItem(akkaTshirt)); (2)
      assertEquals("OK", result.getReply()); (3)
      assertEquals(10, result.getNextEventOfType(ItemAdded.class).item().quantity()); (4)
    }

    // actually we want more akka tshirts
    {
      EventSourcedResult<String> result = testKit.call(e -> e.addItem( akkaTshirt.withQuantity(5))); (5)
      assertEquals("OK", result.getReply());
      assertEquals(5, result.getNextEventOfType(ItemAdded.class).item().quantity());
    }

    {
      assertEquals(testKit.getAllEvents().size(), 2); (6)
      EventSourcedResult<ShoppingCart> result = testKit.call(ShoppingCartEntity::getCart); (7)
      assertEquals(new ShoppingCart("testkit-entity-id", List.of(akkaTshirt.withQuantity(15))), result.getReply());
    }

  }

}
1 Creates the TestKit passing the constructor of the Entity.
2 Calls the method addItem from the Entity in the EventSourcedTestKit with quantity 10.
3 Asserts the return value is "OK".
4 Returns the next event of type IdemAdded and asserts on the quantity.
5 Add a new item with quantity 5.
6 Asserts that the total number of events should be 2.
7 Calls the getCart method and asserts that quantity should be 15.

EventSourcedResult

Calling a command handler through the TestKit gives us back an EventSourcedResult new tab. This class has methods that we can use to assert the handling of the command, such as:

  • getReply() - the response from the command handler if there was one, if not an, exception is thrown, failing the test.

  • getAllEvents() - all the events emitted by handling the command.

  • getState() - the state of the entity after applying any events the command handler emitted.

  • getNextEventOfType(ExpectedEvent.class) - check the next of the emitted events against an event type, return it for inspection if it matches, or fail the test if it does not. The event gets consumed once is inspected and the next call will look for a subsequent event.

EventSourcedTestKit

For the above example, this class provides access to all the command handlers of the ShoppingCart entity for unit testing. In addition to that also has the following methods:

  • getState() - the current state of the entity, it is updated on each method call emitting events.

  • getAllEvents() - all events emitted since the creation of the testkit instance.