Implementing Event Sourced Entities

Event Sourced Entities persist their state with ACID semanticsnew tab, scale horizontally, and isolate failures. They use the Event Sourcing Model—​instead of persisting the current state, they persist all the events that led to the current state. Kalix stores these events in a journal.

An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.

When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?

To load an Entity, Kalix reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Kalix to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.

In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.

Event Sourced Entities offer strong consistency guarantees. Kalix distributes Entities across every instance of a stateful service deployment—​at any given time, each Entity lives on exactly one instance. If a command for an Entity arrives to an instance not hosting that Entity, the command is forwarded by the Kalix Runtime to the one that contains that particular Entity. This forwarding is done transparently, your code does not need to know. Because each Entity lives on exactly one instance, messages can be handled sequentially. Hence, there are no concurrency concerns relating to Event Sourced Entities, each Entity handles one message at a time.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing new tab.

Event Sourced Entities persist changes as events and snapshots. Kalix needs to serialize that data to send it to the underlying data store, this is done with Protocol Buffers using protobuf types.

While Protocol Buffers are the recommended format for persisting state, we recommend that you do not persist your service’s public protobuf messages. This may introduce some overhead to convert from one type to the other but allows the service public interface logic to evolve independently of the data storage format, which should be private.

The steps necessary to implement an Event Sourced Entity include:

  1. Defining the API and domain objects in .proto files.

  2. Implementing behavior in command and event handlers.

  3. Creating and initializing the Entity.

The sections on this page walk through these steps using a shopping cart service as an example.

Defining the proto files

Our Event Sourced Entity example is a shopping cart service.

The following shoppingcart_domain.proto file defines our "Shopping" Event Sourced Entity. The entity manages line items of a cart and stores events ItemAdded and ItemRemoved to represent changes to the cart. Real-world entities store much more data - often structured data - they represent an Entity in the domain-driven design sense of the term.

Java
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// These are the messages that get persisted - the events, plus the current
// state (Cart) for snapshots.

syntax = "proto3";

package com.example.shoppingcart.domain; (1)

option java_outer_classname = "ShoppingCartDomain"; (2)

message LineItem {
  string productId = 1;
  string name = 2;
  int32 quantity = 3;
}

// The item added event.
message ItemAdded {
  LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
  string productId = 1;
}

// The checked out event.
message CheckedOut {
}

// The shopping cart state.
message Cart {
  repeated LineItem items = 1;
  bool checked_out = 2;
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.domain.
2 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartDomain.
Scala
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// These are the messages that get persisted - the events, plus the current
// state (Cart) for snapshots.

syntax = "proto3";

package com.example.shoppingcart.domain; (1)

message LineItem {
  string productId = 1;
  string name = 2;
  int32 quantity = 3;
}

// The item added event.
message ItemAdded {
  LineItem item = 1;
}

// The item removed event.
message ItemRemoved {
  string productId = 1;
}

// The checked out event.
message CheckedOut {
}

// The shopping cart state.
message Cart {
  repeated LineItem items = 1;
  bool checked_out = 2;
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.shoppingcart.domain.

The shoppingcart_api.proto file defines the commands we can send to the shopping cart service to manipulate or access the cart’s state. They make up the service API:

Java
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the shopping cart entity.

syntax = "proto3";

package com.example.shoppingcart; (1)

import "kalix/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "ShoppingCartApi"; (3)

message AddLineItem { (4)
  string cart_id = 1 [(kalix.field).id = true]; (5)
  string product_id = 2;
  string name = 3;
  int32 quantity = 4;
}

message RemoveLineItem {
  string cart_id = 1 [(kalix.field).id = true];
  string product_id = 2;
}

message GetShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message CheckoutShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message LineItem {
  string product_id = 1;
  string name = 2;
  int32 quantity = 3;
}

message Cart { (6)
  repeated LineItem items = 1;
  bool checked_out = 2;
}

service ShoppingCartService { (7)
  option (kalix.codegen) = { (8)
    event_sourced_entity: { (9)
      name: "com.example.shoppingcart.domain.ShoppingCart" (10)
      type_id: "shopping-cart" (11)
      state: "com.example.shoppingcart.domain.Cart" (12)
      events: [
        "com.example.shoppingcart.domain.ItemAdded", (13)
        "com.example.shoppingcart.domain.ItemRemoved",
        "com.example.shoppingcart.domain.CheckedOut"]
    }
  };

  rpc AddItem (AddLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/add"
      body: "*"
    };
  }

  rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/cart/{cart_id}/items/{product_id}/remove";
  }

  rpc GetCart (GetShoppingCart) returns (Cart) {
    option (google.api.http) = {
      get: "/carts/{cart_id}"
      additional_bindings: {
          get: "/carts/{cart_id}/items"
          response_body: "items"
      } };
  }

  rpc Checkout (CheckoutShoppingCart) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/checkout"
      body: "*"
    };
  }
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.
2 Import the Kalix protobuf annotations or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartApi.
4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
5 Every Command must contain a string field that contains the entity ID and is marked with the (kalix.field).id option.
6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
8 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin.
9 event_sourced_entity indicates that we want the codegen to generate an Event Sourced Entity for this service.
10 name denotes the base name for the Event Sourced Entity, the code-generation will create initial sources ShoppingCart, ShoppingCartTest and ShoppingCartIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
11 entity_type is a unique identifier of the "state storage." The entity name may be changed even after data has been created, the entity_type can’t.
12 state points to the protobuf message representing the entity’s state which is kept by Kalix. It is stored as snapshots.
13 events points to the protobuf message representing the entity’s events, which are stored by Kalix.
Scala
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the shopping cart entity.

syntax = "proto3";

package com.example.shoppingcart; (1)

import "kalix/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

message AddLineItem { (3)
  string cart_id = 1 [(kalix.field).id = true]; (4)
  string product_id = 2;
  string name = 3;
  int32 quantity = 4;
}

message RemoveLineItem {
  string cart_id = 1 [(kalix.field).id = true];
  string product_id = 2;
}

message GetShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message CheckoutShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message LineItem {
  string product_id = 1;
  string name = 2;
  int32 quantity = 3;
}

message Cart { (5)
  repeated LineItem items = 1;
  bool checked_out = 2;
}

service ShoppingCartService { (6)
  option (kalix.codegen) = { (7)
    event_sourced_entity: { (8)
      name: "com.example.shoppingcart.domain.ShoppingCart" (9)
      type_id: "shopping-cart" (10)
      state: "com.example.shoppingcart.domain.Cart" (11)
      events: [
        "com.example.shoppingcart.domain.ItemAdded", (12)
        "com.example.shoppingcart.domain.ItemRemoved",
        "com.example.shoppingcart.domain.CheckedOut"]
    }
  };

  rpc AddItem (AddLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/add"
      body: "*"
    };
  }

  rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/cart/{cart_id}/items/{product_id}/remove";
  }

  rpc GetCart (GetShoppingCart) returns (Cart) {
    option (google.api.http) = {
      get: "/carts/{cart_id}"
      additional_bindings: {
          get: "/carts/{cart_id}/items"
          response_body: "items"
      } };
  }

  rpc Checkout (CheckoutShoppingCart) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/checkout"
      body: "*"
    };
  }
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.shoppingcart.
2 Import the Kalix protobuf annotations or options.
3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
4 Every Command must contain a string field that contains the entity ID and is marked with the (kalix.field).id option.
5 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
7 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin.
8 event_sourced_entity indicates that we want the codegen to generate an Event Sourced Entity for this service.
9 name denotes the base name for the Event Sourced Entity, the code-generation will create initial sources ShoppingCart, ShoppingCartTest and ShoppingCartIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
10 entity_type is a unique identifier of the "state storage." The entity name may be changed even after data has been created, the entity_type can’t.
11 state points to the protobuf message representing the entity’s state which is kept by Kalix. It is stored as snapshots.
12 events points to the protobuf message representing the entity’s events, which are stored by Kalix.

Event Sourced Entity’s Effect API

The Event Sourced Entity’s Effect defines the operations that Kalix should perform when an incoming command is handled by an Event Sourced Entity.

An Event Sourced Entity Effect can either:

  • emit events and send a reply to the caller

  • directly reply to the caller if the command is not requesting any state change

  • rejected the command by returning an error

  • instruct Kalix to delete the entity

Implementing behavior

An Event Sourced Entity implementation is a class where you define how each command is handled. The class ShoppingCart gets generated for us based on the shoppingcart_api.proto and shoppingcart_domain.proto definitions. Once the file exists, it is not overwritten, so you can freely add logic to it. ShoppingCart extends the generated class AbstractShoppingCart which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractShoppingCart contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the ShoppingCart class and you have to implement the methods required by AbstractShoppingCart.

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public class ShoppingCart extends AbstractShoppingCart { (1)
  @SuppressWarnings("unused")
  private final String entityId;

  public ShoppingCart(EventSourcedEntityContext context) { this.entityId = context.entityId(); }

  @Override
  public ShoppingCartDomain.Cart emptyState() { (2)
    return ShoppingCartDomain.Cart.getDefaultInstance();
  }
1 Extends the generated AbstractShoppingCart, which extends EventSourcedEntity new tab.
2 Defines the initial, empty, state that is used before any updates.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
class ShoppingCart(context: EventSourcedEntityContext) extends AbstractShoppingCart { (1)

  @nowarn("msg=unused")
  private val entityId = context.entityId

  override def emptyState: Cart = Cart.defaultInstance (2)
1 Extends the generated AbstractShoppingCart, which extends EventSourcedEntity new tab.
2 Defines the initial, empty, state that is used before any updates.

We need to implement all methods our Event Sourced Entity offers as command handlers.

The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.

Command handlers are implemented in the ShoppingCart class as methods that override abstract methods from AbstractShoppingCart. The methods take the current state as the first parameter and the request message as the second parameter. They return an Effect, which describes the next processing actions, such as emitting events and sending a reply.

When adding or changing the rpc definitions, including name, parameter, and return messages, in the .proto files the corresponding methods are regenerated in the abstract class (AbstractShoppingCart). This means that the Java compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.

Updating state

In the example below, the AddItem service call uses the request message AddLineItem. It returns an Effect to emit an event and then sends a reply once the event is stored successfully. The state is updated by the event handler.

The only way for a command handler to modify the Entity’s state is by emitting an event. Any modifications made directly to the state (or instance variables) from the command handler are not persisted. When the Entity is passivated and reloaded, those modifications will not be present.
Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<Empty> addItem(
    ShoppingCartDomain.Cart currentState,
    ShoppingCartApi.AddLineItem command) {
  if (currentState.getCheckedOut())
    return effects().error("Cart is already checked out.");
  if (command.getQuantity() <= 0) { (1)
    return effects().error("Quantity for item " + command.getProductId() + " must be greater than zero.");
  }

  ShoppingCartDomain.ItemAdded event = (2)
      ShoppingCartDomain.ItemAdded.newBuilder()
          .setItem(
              ShoppingCartDomain.LineItem.newBuilder()
                  .setProductId(command.getProductId())
                  .setName(command.getName())
                  .setQuantity(command.getQuantity())
                  .build())
          .build();

  return effects()
          .emitEvent(event) (3)
          .thenReply(newState -> Empty.getDefaultInstance()); (4)
}
1 The validation ensures the quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect with effects().error.
2 From the current incoming AddLineItem we create a new ItemAdded event representing the change of the cart.
3 We store the event by returning an Effect with effects().emitEvent.
4 The acknowledgment that the command was successfully processed is only sent if the event was successfully stored, otherwise there will be an error reply.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
override def addItem(currentState: Cart, addLineItem: shoppingcart.AddLineItem): EventSourcedEntity.Effect[Empty] =
  if (currentState.checkedOut)
    effects.error("Cart is already checked out")
  else if (addLineItem.quantity <= 0)
    effects.error(s"Quantity for item ${addLineItem.productId} must be greater than zero.") (1)
  else {
    val event = ItemAdded( (2)
      Some(LineItem(productId = addLineItem.productId, name = addLineItem.name, quantity = addLineItem.quantity)))
    effects
      .emitEvent(event) (3)
      .thenReply(_ => Empty.defaultInstance) (4)
  }
1 The validation ensures the quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect with effects.error.
2 From the current incoming AddLineItem we create a new ItemAdded event representing the change of the cart.
3 We store the event by returning an Effect with effects.emitEvent.
4 The acknowledgment that the command was successfully processed is only sent if the event was successfully stored, otherwise there will be an error reply.

The new state is created from the event and the previous state in the event handler. Event handlers are implemented in the ShoppingCart class as methods that override abstract methods from AbstractShoppingCart. Event handlers are generated for each event declared in (kalix.codegen).event_sourced_entity.events.

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public ShoppingCartDomain.Cart itemAdded(
    ShoppingCartDomain.Cart currentState,
    ShoppingCartDomain.ItemAdded itemAdded) {
  ShoppingCartDomain.LineItem item = itemAdded.getItem();
  ShoppingCartDomain.LineItem lineItem = updateItem(item, currentState);
  List<ShoppingCartDomain.LineItem> lineItems =
      removeItemByProductId(currentState, item.getProductId());
  lineItems.add(lineItem);
  lineItems.sort(Comparator.comparing(ShoppingCartDomain.LineItem::getProductId));
  return ShoppingCartDomain.Cart.newBuilder().addAllItems(lineItems).build();
}

private ShoppingCartDomain.LineItem updateItem(
    ShoppingCartDomain.LineItem item, ShoppingCartDomain.Cart cart) {
  return findItemByProductId(cart, item.getProductId())
      .map(li -> li.toBuilder().setQuantity(li.getQuantity() + item.getQuantity()).build())
      .orElse(item);
}

private Optional<ShoppingCartDomain.LineItem> findItemByProductId(
    ShoppingCartDomain.Cart cart, String productId) {
  Predicate<ShoppingCartDomain.LineItem> lineItemExists =
      lineItem -> lineItem.getProductId().equals(productId);
  return cart.getItemsList().stream().filter(lineItemExists).findFirst();
}

private List<ShoppingCartDomain.LineItem> removeItemByProductId(
    ShoppingCartDomain.Cart cart, String productId) {
  return cart.getItemsList().stream()
      .filter(lineItem -> !lineItem.getProductId().equals(productId))
      .collect(Collectors.toList());
}
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
override def itemAdded(currentState: Cart, itemAdded: ItemAdded): Cart = {
  val cart = currentState.items.map(lineItem => lineItem.productId -> lineItem).toMap
  val item = cart.get(itemAdded.getItem.productId) match {
    case Some(existing) => existing.copy(quantity = existing.quantity + itemAdded.getItem.quantity)
    case None           => itemAdded.getItem
  }
  val updatedCart = cart + (item.productId -> item)
  currentState.withItems(updatedCart.values.toSeq)
}

Retrieving state

The following example shows the implementation of the GetCart command handler. This command handler is a read-only command handler—​it doesn’t update the state, it just returns it:

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<ShoppingCartApi.Cart> getCart(
    ShoppingCartDomain.Cart currentState, (1)
    ShoppingCartApi.GetShoppingCart command) {
  List<ShoppingCartApi.LineItem> apiItems =
      currentState.getItemsList().stream()
          .map(this::convert)
          .sorted(Comparator.comparing(ShoppingCartApi.LineItem::getProductId))
          .collect(Collectors.toList());
  ShoppingCartApi.Cart apiCart =
          ShoppingCartApi.Cart.newBuilder().addAllItems(apiItems)
              .setCheckedOut(currentState.getCheckedOut())
              .build(); (2)
  return effects().reply(apiCart);
}

private ShoppingCartApi.LineItem convert(ShoppingCartDomain.LineItem item) {
  return ShoppingCartApi.LineItem.newBuilder()
          .setProductId(item.getProductId())
          .setName(item.getName())
          .setQuantity(item.getQuantity())
          .build();
}
1 The current state is passed to the method.
2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect with effects().reply.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
override def getCart(
    currentState: Cart, (1)
    getShoppingCart: shoppingcart.GetShoppingCart): EventSourcedEntity.Effect[shoppingcart.Cart] = {
  val apiItems = currentState.items.map(convertToApi).sortBy(_.productId)
  val apiCart = shoppingcart.Cart(apiItems, currentState.checkedOut) (2)
  effects.reply(apiCart)
}

private def convertToApi(item: LineItem): shoppingcart.LineItem =
  shoppingcart.LineItem(productId = item.productId, name = item.name, quantity = item.quantity)
1 The current state is passed to the method.
2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect with effects.reply.

Registering the Entity

To make Kalix aware of the Event Sourced Entity, we need to register it with the service.

From the code-generation, the registration gets automatically inserted in the generated KalixFactory.withComponents method from the Main class.

Java
src/main/java/com/example/shoppingcart/Main.java
/* This code was generated by Kalix tooling.
 * As long as this file exists it will not be re-generated.
 * You are free to make changes to this file.
 */
//tag::RegisterEventSourcedEntity[]
package com.example.shoppingcart;

import kalix.javasdk.Kalix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.example.shoppingcart.domain.*;
import com.example.shoppingcart.view.*;

public final class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  public static Kalix createKalix() {
    Kalix kalix = new Kalix();
    kalix.register(ShoppingCartProvider.of(ShoppingCart::new));
    kalix.register(ShoppingCartViewServiceViewProvider.of(ShoppingCartViewServiceImpl::new));
    return kalix;
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Kalix service");
    createKalix().start();
  }
}
//end::RegisterEventSourcedEntity[]
Scala
src/main/scala/com/example/shoppingcart/Main.scala
package com.example.shoppingcart

import kalix.scalasdk.Kalix
import com.example.shoppingcart.domain.ShoppingCart
import org.slf4j.LoggerFactory

// This class was initially generated based on the .proto definition by Kalix tooling.
//
// As long as this file exists it will not be overwritten: you can maintain it yourself,
// or delete it so it is regenerated as needed.

object Main {

  private val log = LoggerFactory.getLogger("com.example.shoppingcart.Main")

  def createKalix(): Kalix = {
    // The KalixFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `Kalix()` instance.
    KalixFactory.withComponents(
      new ShoppingCart(_))
  }

  def main(args: Array[String]): Unit = {
    log.info("starting the Kalix service")
    createKalix().start()
  }
}

By default, the generated constructor has an EventSourcedEntityContext parameter, but you can change this to accept other parameters. If you change the constructor of the ShoppingCart class you will see a compilation error here, and you have to adjust the factory function that is passed to KalixFactory.withComponents.

When more components are added the KalixFactory is regenerated and you have to adjust the registration from the Main class.

Snapshots

Snapshots are an important optimization for Event Sourced Entities that emit many events. Rather than reading the entire journal upon loading or restart, Kalix can initiate them from a snapshot.

Snapshots are stored and handled automatically by Kalix without any specific code required. Snapshots are stored after a configured number of events:

src/main/resources/application.conf
kalix.event-sourced-entity.snapshot-every = 100

When the Event Sourced Entity is loaded again, the snapshot will be loaded before any other events are received.

Deleting an Entity

Normally, Event Sourced Entities are not deleted because the history of the events typically provide business value. For certain use cases or for regulatory reasons the entity can be deleted.

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<Empty> checkout(ShoppingCartDomain.Cart currentState, ShoppingCartApi.CheckoutShoppingCart checkoutShoppingCart) {
  if (currentState.getCheckedOut())
    return effects().error("Cart is already checked out.");
  return effects()
      .emitEvent(ShoppingCartDomain.CheckedOut.getDefaultInstance()) (1)
      .deleteEntity() (2)
      .thenReply(newState -> Empty.getDefaultInstance());
}
1 Emit a final event before deletion, which is handled as any other event.
2 Instruction to delete the entity.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
override def checkout(
    currentState: Cart,
    checkoutShoppingCart: CheckoutShoppingCart): EventSourcedEntity.Effect[Empty] = {
  if (currentState.checkedOut)
    effects.error("Cart is already checked out")
  else
    effects
      .emitEvent(CheckedOut.defaultInstance) (1)
      .deleteEntity() (2)
      .thenReply(_ => Empty.defaultInstance);
}
1 Emit a final event before deletion, which is handled as any other event.
2 Instruction to delete the entity.

When you give the instruction to delete the entity it will still exist for some time, including its events and snapshots. The actual removal of events and snapshots will be deleted later to give downstream consumers time to process all prior events, including the final event that was emitted together with the deleteEntity effect. By default, the existence of the entity is completely cleaned up after a week.

It is not allowed to emit more events after the entity has been "marked" as deleted. You can still handle read requests of the entity until it has been completely removed.

It is best to not reuse the same entity id after deletion, but if that happens after the entity has been completely removed it will be instantiated as a completely new entity without any knowledge of previous state.

Note that deleting View state must be handled explicitly.

Running Side Effects

An Entity may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—​if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.

See this dedicated section regarding Actions, for more details.

Testing the Entity

There are two ways to test an Entity:

  • unit tests, which run the Entity class in the same JVM as the test code itself with the help of a test kit

  • integration tests, with the service deployed in a docker container running the entire service and the test interacting over gRPC with it.

Each way has its benefits, unit tests are faster and provide more immediate feedback about success or failure but can only test a single entity at a time and in isolation. Integration tests, on the other hand, are more realistic and allow many entities to interact with other components inside and outside the service.

Unit tests

The following snippet shows how the ShoppingCartTestKit is used to test the ShoppingCart implementation. Kalix provides two main APIs for unit tests, the ShoppingCartTestKit and the EventSourcedResult. The former gives us the overall state of the entity. Its state and all the events produced by all the calls to the Entity. While the latter only holds the effects produced for each individual call to the Entity.

Java
src/test/java/com/example/shoppingcart/domain/ShoppingCartTest.java
package com.example.shoppingcart.domain;

import kalix.javasdk.testkit.EventSourcedResult;
import com.example.shoppingcart.ShoppingCartApi;
import com.google.protobuf.Empty;
import org.junit.jupiter.api.Test;

import java.util.NoSuchElementException;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

public class ShoppingCartTest {

    @Test
    public void addItemTest() {

        ShoppingCartTestKit testKit = ShoppingCartTestKit.of(ShoppingCart::new); (1)

        ShoppingCartApi.AddLineItem apples = ShoppingCartApi.AddLineItem.newBuilder().setProductId("idA")
                .setName("apples").setQuantity(1).build();
        EventSourcedResult<Empty> addingApplesResult = testKit.addItem(apples); (2)

        ShoppingCartApi.AddLineItem bananas = ShoppingCartApi.AddLineItem.newBuilder().setProductId("idB")
                .setName("bananas").setQuantity(2).build();
        testKit.addItem(bananas); (3)

        assertEquals(1, addingApplesResult.getAllEvents().size()); (4)
        assertEquals(2, testKit.getAllEvents().size()); (5)

        ShoppingCartDomain.ItemAdded addedApples = addingApplesResult.getNextEventOfType(ShoppingCartDomain.ItemAdded.class); (6)
        assertEquals("apples", addedApples.getItem().getName());
        assertThrows(NoSuchElementException.class, () ->  addingApplesResult.getNextEventOfType(ShoppingCartDomain.ItemAdded.class)); (7)
        assertEquals(Empty.getDefaultInstance(), addingApplesResult.getReply()); (8)

        ShoppingCartDomain.LineItem expectedApples = ShoppingCartDomain.LineItem.newBuilder().setProductId("idA")
                .setName("apples").setQuantity(1).build();
        ShoppingCartDomain.LineItem expectedBananas = ShoppingCartDomain.LineItem.newBuilder().setProductId("idB")
                .setName("bananas").setQuantity(2).build();
        ShoppingCartDomain.Cart expectedState = ShoppingCartDomain.Cart.newBuilder()
                .addItems(expectedApples)
                .addItems(expectedBananas)
                .build();
        assertEquals(expectedState, testKit.getState()); (9)
    }
}
1 Creating the TestKit passing the constructor of the Entity.
2 Calling the method addItem from the Entity in the ShoppingCartTestKit.
3 Calling the method addItem from the Entity in the ShoppingCartTestKit.
4 Checking the EventSourcedResult of the first call to addItem.
5 Checking the EventSourcedResult of all the calls to addItem.
6 Retrieving the first event generated from the first call to addItem.
7 Retrieving the second event generated from the first call to addItem. There is no such event as our implementation only generates one event when addItem it’s called.
8 Retrieving the response from the call to addItem.
9 Retrieving the state of the entity after all the calls to addItem.
Scala
src/test/scala/com/example/shoppingcart/domain/ShoppingCartSpec.scala
package com.example.shoppingcart.domain

import kalix.scalasdk.eventsourcedentity.EventSourcedEntity
import kalix.scalasdk.testkit.EventSourcedResult
import com.example.shoppingcart
import com.example.shoppingcart.AddLineItem
import com.google.protobuf.empty.Empty
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

class ShoppingCartSpec extends AnyWordSpec with Matchers {
  "The ShoppingCart" should {

    "correctly process commands of type AddItem" in {
      val testKit = ShoppingCartTestKit(new ShoppingCart(_)) (1)
      val apples = AddLineItem(productId = "idA", name = "apples", quantity = 1)
      val addingApplesResult = testKit.addItem(apples) (2)

      val bananas = AddLineItem(productId = "idB", name = "bananas", quantity = 2)
      testKit.addItem(bananas) (3)

      addingApplesResult.events should have size 1 (4)
      testKit.allEvents should have size 2 (5)

      val addedApples = addingApplesResult.nextEvent[ItemAdded] (6)
      addedApples.getItem.name shouldBe "apples"
      intercept[NoSuchElementException] { (7)
        addingApplesResult.nextEvent[ItemAdded]
      }
      addingApplesResult.reply shouldBe Empty.defaultInstance (8)

      val expectedState = Cart(Seq(
        LineItem(productId = "idA", name = "apples", quantity = 1),
        LineItem(productId = "idB", name = "bananas", quantity = 2)
      ))
      testKit.currentState shouldBe expectedState (9)
    }
  }
}
1 Creating the TestKit passing the constructor of the Entity.
2 Calling the method addItem from the Entity in the ShoppingCartTestKit.
3 Calling the method addItem from the Entity in the ShoppingCartTestKit.
4 Checking the EventSourcedResult of the first call to addItem.
5 Checking the EventSourcedResult of all the calls to addItem.
6 Retrieving the first event generated from the first call to addItem.
7 Retrieving the second event generated from the first call to addItem. There is no such event as our implementation only generates one event when addItem it’s called.
8 Retrieving the response from the call to addItem.
9 Retrieving the state of the entity after all the calls to addItem.
The ShoppingCartTestKit is stateful, and it holds the state of a single entity instance in memory. If you want to test more than one entity in a test, you need to create multiple instance of ShoppingCartTestKit.

By default the integration and unit test are both invoked by sbt test. To only run unit tests run sbt -DonlyUnitTest test, or sbt -DonlyUnitTest=true test, or set up that value to true in the sbt session by set onlyUnitTest := true and then run test

EventSourcedResult

Java

Calling a command handler through the TestKit gives us back an EventSourcedResult new tab. This class has methods that we can use to assert of handling the command, such as:

  • getReply() - the response from the command handler if there was one, if not an, exception is thrown, failing the test.

  • getAllEvents() - all the events emitted by handling the command.

  • getState() - the state of the entity after applying any events the command handler emitted.

  • getNextEventOfType(ExpectedEvent.class) - check the next of the emitted events against an event type, return it for inspection if it matches, or fail the test if it does not. The event gets consumed once is inspected and the next call will look for a subsequent event.

Scala

Calling a command handler through the TestKit gives us back an EventSourcedResult new tab. This class has methods that we can use to assert the result of handling the command, such as:

  • reply - the response from the command handler if there was one, if not an, exception is thrown, failing the test.

  • events - all the events emitted by handling the command.

  • state - the state of the entity after applying any events the command handler emitted.

  • nextEvent[ExpectedEvent] - check the next of the emitted events against an event type, return it for inspection if it matches, or fail the test if it does not. The event gets consumed once is inspected and the next call will look for a subsequent event.

ShoppingCartTestKit

Java

This class is generated by Kalix when the project is compiled and located in target/generated-test-sources/kalix/java/com/example/shoppingcart/domain/. It provides access to all the command handlers of the ShoppingCart entity for unit testing. In addition to that also has the following methods:

  • getState() - the current state of the entity, it is updated on each method call emitting events.

  • getAllEvents() - all events emitted since the creation of the testkit instance.

Scala

This class is generated by Kalix when the project is compiled and located in target/generated-test-sources/kalix/scala/com/example/shoppingcart/domain/. It provides access to all the command handlers of the ShoppingCart entity for unit testing. In addition to that also has the following methods:

  • currentState - the current state of the entity, it is updated on each method call emitting events.

  • allEvents - all events emitted since the creation of the testkit instance.

Integration tests

An example integration test class to start from is also generated for you. It uses an KalixTestKitExtension KalixTestKit to start docker containers and interacts with the entity with an actual gRPC client.

Java
src/it/java/com/example/shoppingcart/ShoppingCartIntegrationTest.java
import kalix.javasdk.testkit.junit.jupiter.KalixTestKitExtension;
// ...

public class ShoppingCartIntegrationTest {

  /**
   * The test kit starts both the service container and the Kalix Runtime.
   */
  @RegisterExtension
  public static final KalixTestKitExtension testKit =
    new KalixTestKitExtension(Main.createKalix()); (1)

  private final ShoppingCartService client;

  public ShoppingCartIntegrationTest() {
    this.client = testKit.getGrpcClient(ShoppingCartService.class); (2)
  }

  @Test
  public void addItemsToCart() throws Exception { (3)
    addItem("cart2", "a", "Apple", 1);
    addItem("cart2", "b", "Banana", 2);
    addItem("cart2", "c", "Cantaloupe", 3);
    ShoppingCartApi.Cart cart = getCart("cart2");
    assertEquals(3, cart.getItemsCount(), "shopping cart should have 3 items");
    assertEquals(
      cart.getItemsList(),
      List.of(item("a", "Apple", 1), item("b", "Banana", 2), item("c", "Cantaloupe", 3)),
      "shopping cart should have expected items"
    );
  }

  ShoppingCartApi.Cart getCart(String cartId) throws Exception {
    return client
      .getCart(ShoppingCartApi.GetShoppingCart.newBuilder().setCartId(cartId).build())
      .toCompletableFuture()
      .get();
  }

  void addItem(String cartId, String productId, String name, int quantity) throws Exception {
    client
      .addItem(
        ShoppingCartApi.AddLineItem.newBuilder()
          .setCartId(cartId)
          .setProductId(productId)
          .setName(name)
          .setQuantity(quantity)
          .build())
      .toCompletableFuture()
      .get();
  }

  ShoppingCartApi.LineItem item(String productId, String name, int quantity) {
    return ShoppingCartApi.LineItem.newBuilder()
      .setProductId(productId)
      .setName(name)
      .setQuantity(quantity)
      .build();
  }
}
1 Using the TestKit to create the service container and Kalix Runtime.
2 Creating a client for interacting with the gRPC endpoints for ShoppingCartService.
3 Add items to cart using and assert on its current list of items.

The integration tests are in a special profile it of the project and can be run using mvn verify -Pit.

Scala
/src/test/scala/com/example/shoppingcart/ShoppingCartServiceIntegrationSpec.scala
import kalix.scalasdk.testkit.KalixTestKit
// ...

class ShoppingCartServiceIntegrationSpec
    extends AnyWordSpec
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures {

  implicit val patience: PatienceConfig =
    PatienceConfig(Span(5, Seconds), Span(500, Millis))

  private val testKit = KalixTestKit(Main.createKalix()).start() (1)
  import testKit.executionContext

  "ShoppingCartService" must {
    val client = testKit.getGrpcClient(classOf[ShoppingCartService]) (2)

    "add items to shopping cart" in {
      val cartId = "cart1"

      val updatedCart = for {
        _ <- client.addItem(AddLineItem(cartId, "shirt", "Shirt", 1))
        done <- client.addItem(AddLineItem(cartId, "sweat", "Sweat Shirt", 2))
      } yield done

      updatedCart.futureValue

      client.getCart(GetShoppingCart(cartId)).futureValue.items shouldBe (3)
        Seq(LineItem("shirt", "Shirt", 1), LineItem("sweat", "Sweat Shirt", 2))
    }

  }

  override def afterAll() = { (4)
    testKit.stop()
    super.afterAll()
  }
}
1 Using the TestKit to create the service container and Kalix Runtime.
2 Creating a client for interacting with the gRPC endpoints for CounterService.
3 Add items to cart using and assert on its current list of items.
4 Shutting down TestKit resources after test concludes.

By default, the integration and unit test are both invoked by sbt test.