Implementing Value Entities

Value Entities persist state on every change and Kalix needs to serialize that data to send it to the underlying data store, this is done with Protocol Buffers using protobuf types.

Kalix Value Entities have nothing in common with the domain-driven design concept of Value Objects. The Value in the name refers to directly modifying a value for the entity’s state - in contrast to Event-sourced Entities that persist events and the entity state is derived from them.

While Protocol Buffers are the recommended format for persisting state, we recommend that you do not persist your service’s public protobuf messages. This may introduce some overhead to convert from one type to the other but allows the service public interface logic to evolve independently of the data storage format, which should be private.

The steps necessary to implement a Value Entity include:

  1. Defining the API and domain objects in .proto files.

  2. Implementing behavior in command handlers.

  3. Creating and initializing the Entity.

The sections on this page walk through these steps using a counter service as an example.

Defining the proto files

Our Value Entity example starts with the "Counter" service as included in the project template.

The following counter_domain.proto file defines state the Entity will hold. The entity stores an integer value as represented in the message CounterState. Real-world entities store much more data - often structured data - they represent an Entity in the domain-driven design sense of the term.

Java
src/main/proto/com/example/domain/counter_domain.proto
syntax = "proto3";

package com.example.domain; (1)

option java_outer_classname = "CounterDomain"; (2)

message CounterState { (3)
  int32 value = 1;
}
1 Any classes generated from this protobuf file will be in the Java package com.example.domain.
2 Let the messages declared in this protobuf file be inner classes to the Java class CounterDomain.
3 The CounterState protobuf message is what Kalix stores for this entity.
Scala
src/main/proto/com/example/domain/counter_domain.proto
syntax = "proto3";

package com.example.domain; (1)

message CounterState { (2)
  int32 value = 1;
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.domain.
2 The CounterState protobuf message is what Kalix stores for this entity.

The counter_api.proto file defines the commands we can send to the Counter service to manipulate or access the Counter’s state.

In this file we instruct the Kalix code generation tooling (codegen) which kind of component we want to create. The codegen will generate all stubs for your entity/service and corresponding tests, as well as an abstract class for your implementation to extend.

They make up the service API:

Java
src/main/proto/com/example/counter_api.proto
// This is the public API offered by your entity.
syntax = "proto3";

package com.example; (1)

import "kalix/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "CounterApi"; (3)

message IncreaseValue { (4)
  string counter_id = 1 [(kalix.field).id = true]; (5)
  int32 value = 2;
}

message DecreaseValue {
  string counter_id = 1 [(kalix.field).id = true];
  int32 value = 2;
}

message ResetValue {
  string counter_id = 1 [(kalix.field).id = true];
}

message GetCounter {
  string counter_id = 1 [(kalix.field).id = true];
}

message DeleteCounter {
  string counter_id = 1 [(kalix.field).id = true];
}

message CurrentCounter { (6)
  int32 value = 1;
}

service CounterService { (7)
  option (kalix.codegen) = { (8)
    value_entity: { (9)
      name: "com.example.domain.Counter" (10)
      type_id: "counter" (11)
      state: "com.example.domain.CounterState" (12)
    }
  };

  rpc Increase (IncreaseValue) returns (google.protobuf.Empty);
  rpc IncreaseWithConditional (IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease (DecreaseValue) returns (google.protobuf.Empty);
  rpc Reset (ResetValue) returns (google.protobuf.Empty);
  rpc GetCurrentCounter (GetCounter) returns (CurrentCounter);
  rpc Delete (DeleteCounter) returns (google.protobuf.Empty);
}
1 Any classes generated from this protobuf file will be in the Java package com.example.
2 Import the Kalix protobuf annotations, or options.
3 Let the messages declared in this protobuf file be inner classes to the class CounterApi.
4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
5 Every Command must contain a string field that contains the entity ID and is marked with the (kalix.field).id option.
6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
8 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin.
9 value_entity indicates that we want the codegen to generate a Value Entity for this service.
10 name denotes the base name for the Value Entity, the code-generation will create initial sources Counter, CounterTest and CounterIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
11 entity_type is a unique identifier of the "state storage." The entity name may be changed even after data has been created, the entity_type can’t.
12 state points to the protobuf message representing the Value Entity’s state which is kept by Kalix. Note, the package and name follow the definition in the domain.proto file.
Scala
src/main/proto/com/example/counter_api.proto
// This is the public API offered by your entity.
syntax = "proto3";

package com.example; (1)

import "kalix/annotations.proto"; (2)
import "google/api/annotations.proto";
import "google/protobuf/empty.proto";

message IncreaseValue { (3)
  string counter_id = 1 [(kalix.field).id = true]; (4)
  int32 value = 2;
}

message DecreaseValue {
  string counter_id = 1 [(kalix.field).id = true];
  int32 value = 2;
}

message ResetValue {
  string counter_id = 1 [(kalix.field).id = true];
}

message GetCounter {
  string counter_id = 1 [(kalix.field).id = true];
}

message DeleteCounter {
  string counter_id = 1 [(kalix.field).id = true];
}

message CurrentCounter { (5)
  int32 value = 1;
}

service CounterService { (6)
  option (kalix.codegen) = { (7)
    value_entity: { (8)
      name: "com.example.domain.Counter" (9)
      type_id: "counter" (10)
      state: "com.example.domain.CounterState" (11)
    }
  };

  rpc Increase (IncreaseValue) returns (google.protobuf.Empty);
  rpc IncreaseWithConditional (IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease (DecreaseValue) returns (google.protobuf.Empty);
  rpc Reset (ResetValue) returns (google.protobuf.Empty);
  rpc GetCurrentCounter (GetCounter) returns (CurrentCounter);
  rpc Delete (DeleteCounter) returns (google.protobuf.Empty);
}
1 Any classes generated from this protobuf file will be in the Scala package com.example.
2 Import the Kalix protobuf annotations or options.
3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
4 Every Command must contain a string field that contains the entity ID and is marked with the (kalix.field).id option.
5 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
7 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin.
8 value_entity indicates that we want the codegen to generate a Value Entity for this service.
9 name denotes the base name for the Value Entity, the code-generation will create initial sources Counter, CounterTest and CounterIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
10 entity_type is a unique identifier of the "state storage." The entity name may be changed even after data has been created, the entity_type can’t.
11 state points to the protobuf message representing the Value Entity’s state which is kept by Kalix. Note, the package and name follow the definition in the domain.proto file.

Value Entity’s Effect API

The Value Entity’s Effect defines the operations that Kalix should perform when an incoming command is handled by a Value Entity.

A Value Entity Effect can either:

  • update the entity state and send a reply to the caller

  • directly reply to the caller if the command is not requesting any state change

  • rejected the command by returning an error

  • instruct Kalix to delete the entity

Implementing behavior

A Value Entity implementation is a class where you define how each command is handled. The class Counter gets generated for us based on the counter_api.proto and counter_domain.proto definitions. Once the generated file exists, it is not overwritten, so you can freely add logic to it. Counter extends the generated class AbstractCounter which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractCounter contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the Counter class and you have to implement the methods required by AbstractCounter.

Java
src/main/java/com/example/domain/Counter.java
/**
 * A Counter represented as a value entity.
 */
public class Counter extends AbstractCounter { (1)

  private final String entityId;

  public Counter(ValueEntityContext context) {
    this.entityId = context.entityId();
  }

  @Override
  public CounterDomain.CounterState emptyState() { (2)
    return CounterDomain.CounterState.getDefaultInstance();
  }
1 Extends the generated AbstractCounter, which extends ValueEntity new tab.
2 Defines the initial, empty, state that is used before any updates.
Scala
src/main/java/com/example/domain/Counter.scala
class Counter(context: ValueEntityContext) extends AbstractCounter { (1)

  override def emptyState: CounterState = CounterState() (2)
1 Extends the generated AbstractCounter, which extends ValueEntity new tab.
2 Defines the initial, empty, state that is used before any updates.

We need to implement all methods our Value Entity offers as command handlers.

The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.

Command handlers are implemented in the Counter class as methods that override abstract methods from AbstractCounter. The methods take the current state as the first parameter and the request message as the second parameter. They return an Effect, which describes the next processing actions, such as updating state and sending a reply.

When adding or changing the rpc definitions, including name, parameter, and return messages, in the .proto files the corresponding methods are regenerated in the abstract class (AbstractCounter). This means that the compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.

Updating state

In the example below, the increase service call uses the value from the request message IncreaseValue. It returns an Effect to update the entity state and send a reply.

For Value Entities, modify the state and then trigger a save of that state in the returned Effect. The Java/Protobuf SDK has an effects().updateState(newState) method for this purpose. If you change the state but do not call updateState in the returned Effect, that state change is lost.
Java
/src/main/java/com/example/domain/Counter.java
@Override
public Effect<Empty> increase(
    CounterDomain.CounterState currentState, CounterApi.IncreaseValue command) {
  if (command.getValue() < 0) { (1)
    return effects().error("Increase requires a positive value. It was [" +
        command.getValue() + "].");
  }
  CounterDomain.CounterState newState =  (2)
      currentState.toBuilder().setValue(currentState.getValue() +
          command.getValue()).build();
  return effects()
      .updateState(newState) (3)
      .thenReply(Empty.getDefaultInstance());  (4)
}
1 The validation ensures acceptance of positive values and it fails calls with illegal values by returning an Effect with effects().error.
2 From the current state we create a new state with the increased value.
3 We store the new state by returning an Effect with effects().updateState.
4 The acknowledgment that the command was successfully processed is only sent if the state update was successful, otherwise there will be an error reply.
Scala
/src/main/java/com/example/domain/Counter.scala
override def increase(currentState: CounterState, command: example.IncreaseValue): ValueEntity.Effect[Empty] =
  if (command.value < 0) (1)
    effects.error(s"Increase requires a positive value. It was [${command.value}].")
  else {
    val newState = currentState.copy(value = currentState.value + command.value) (2)
    effects
      .updateState(newState) (3)
      .thenReply(Empty.defaultInstance) (4)
  }
1 The validation ensures acceptance of positive values and it fails calls with illegal values by returning an Effect with effects.error.
2 From the current state we create a new state with the increased value.
3 We store the new state by returning an Effect with effects.updateState.
4 The acknowledgment that the command was successfully processed is only sent if the state update was successful, otherwise there will be an error reply.

Retrieving state

The following example shows the implementation of the GetCurrentCounter command handler. This command handler is a read-only command handler—​it doesn’t update the state, it just returns it:

Java
src/main/java/com/example/domain/Counter.java
@Override
public Effect<CounterApi.CurrentCounter> getCurrentCounter(
    CounterDomain.CounterState currentState, (1)
    CounterApi.GetCounter command) {
  CounterApi.CurrentCounter current =
      CounterApi.CurrentCounter.newBuilder()
          .setValue(currentState.getValue()).build(); (2)
  return effects().reply(current);
}
1 The current state is passed to the method.
2 We use its value to create the CurrentCounter value that is sent as a reply by returning an Effect with effects().reply.
Scala
src/main/scala/com/example/domain/Counter.scala
override def getCurrentCounter(
    currentState: CounterState, (1)
    command: example.GetCounter): ValueEntity.Effect[example.CurrentCounter] =
  effects.reply(CurrentCounter(currentState.value)) (2)
1 The current state is passed to the method.
2 We use its value to create the CurrentCounter value that is sent as a reply by returning an Effect with effects.reply.

Registering the Entity

To make Kalix aware of the Value Entity, we need to register it with the service.

From the code-generation, the registration gets automatically inserted in the generated KalixFactory.withComponents method from the Main class.

Java
/src/main/java/com/example/Main.java
public final class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);
  public static Kalix createKalix() {
    // The KalixFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `new Kalix()` instance.
    return KalixFactory.withComponents(
            Counter::new);
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Kalix service");
    createKalix().start();
  }
}
Scala
/src/main/scala/com/example/Main.scala
object Main {

  private val log = LoggerFactory.getLogger("com.example.Main")

  def createKalix(): Kalix = {
    // The KalixFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `Kalix()` instance.
    KalixFactory.withComponents(
      new Counter(_))
  }

  def main(args: Array[String]): Unit = {
    log.info("starting the Kalix service")
    createKalix().start()
  }
}

By default, the generated constructor has a ValueEntityContext parameter, but you can change this to accept other parameters. If you change the constructor of the Counter class you will see a compilation error here, and you have to adjust the factory function that is passed to KalixFactory.withComponents.

When more components are added the KalixFactory is regenerated, and you have to adjust the registration from the Main class.

Deleting state

The next example shows how to delete a Value Entity state by returning special deleteEntity() effect.

Java
/src/main/java/com/example/domain/Counter.java
@Override
public Effect<Empty> delete(CounterDomain.CounterState currentState,
                            CounterApi.DeleteCounter deleteCounter) {
  return effects()
      .deleteEntity() (1)
      .thenReply(Empty.getDefaultInstance());
}
1 We delete the state by returning an Effect with effects().deleteEntity().
Scala
/src/main/java/com/example/domain/Counter.scala
override def delete(currentState: CounterState, deleteCounter: DeleteCounter): ValueEntity.Effect[Empty] =
  effects
    .deleteEntity() (1)
    .thenReply(Empty.defaultInstance)
1 We delete the state by returning an Effect with effects.deleteEntity().

When you give the instruction to delete the entity it will still exist with an empty state for some time. The actual removal happens later to give downstream consumers time to process the change. By default, the existence of the entity is completely cleaned up after a week.

It is not allowed to make further changes after the entity has been "marked" as deleted. You can still handle read requests of the entity until it has been completely removed, but be the current state will be empty.

If you want to make changes after deleting the state you should use the updateState effect with an empty state, as a logical delete, instead of using deleteEntity.

It is best to not reuse the same entity id after deletion, but if that happens after the entity has been completely removed it will be instantiated as a completely new entity without any knowledge of previous state.

Note that deleting View state must be handled explicitly.

Running Side Effects

An Entity may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—​if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.

See this dedicated section regarding Actions, for more details.

Testing the Entity

There are two ways to test an Entity:

  • unit tests, which run the Entity class in the same JVM as the test code itself with the help of a test kit

  • integration tests, with the service deployed in a docker container running the entire service and the test interacting over gRPC with it.

Each way has its benefits, unit tests are faster and provide more immediate feedback about success or failure but can only test a single entity at a time and in isolation. Integration tests, on the other hand, are more realistic and allow many entities to interact with other components inside and outside the service.

Unit tests

To unit test the Entity a test kit class to use is generated as well as an example unit test class to start from. Test cases use the test kit to execute commands in the entity, get a ValueEntityResult back and assert the effects that the command led to, both the reply itself and the update to the state of the Entity.

Java
/src/test/java/com/example/domain/CounterTest.java
import kalix.javasdk.Metadata;
import kalix.javasdk.testkit.ValueEntityResult;
import com.example.CounterApi;
import com.google.protobuf.Empty;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

public class CounterTest {

  @Test
  public void increaseCounterTest() {
    CounterTestKit testKit = CounterTestKit.of(Counter::new);

    CounterApi.IncreaseValue increaseValueCommand = CounterApi.IncreaseValue.newBuilder()
        .setValue(1)
        .build();
    ValueEntityResult<Empty> result1 = testKit.increase(increaseValueCommand);
    assertEquals(Empty.getDefaultInstance(), result1.getReply());
    assertEquals(1, testKit.getState().getValue());

    // one more time
    ValueEntityResult<Empty> result2 = testKit.increase(increaseValueCommand);
    assertEquals(Empty.getDefaultInstance(), result2.getReply());
    assertEquals(2, testKit.getState().getValue());
  }

The unit tests can be run from maven using mvn test or if you prefer from inside your IDE the same way you usually run tests.

Scala
/src/test/scala/com/example/domain/CounterSpec.scala
class CounterSpec extends AnyWordSpec with Matchers {

  "Counter" must {

    "handle command Increase" in {
      val testKit = CounterTestKit(new Counter(_))

      val result1 = testKit.increase(IncreaseValue(value = 1))
      result1.reply shouldBe Empty.defaultInstance

      // one more time
      val result2 = testKit.increase(IncreaseValue(value = 1))
      result2.reply shouldBe Empty.defaultInstance
      testKit.currentState().value shouldBe 2
    }
The CounterTestKit is stateful, and it holds the state of a single entity instance in memory. If you want to test more than one entity in a test, you need to create multiple instance of CounterTestKit.

By default, the integration and unit tests are both invoked by sbt test. To only run unit tests run sbt -DonlyUnitTest test, or sbt -DonlyUnitTest=true test, or set up that value to true in the sbt session by set onlyUnitTest := true and then run test

Integration tests

An example integration test class to start from is also generated for you. It uses an KalixTestKitExtension KalixTestKit to start docker containers and interacts with the entity with an actual gRPC client.

Java
/src/it/java/com/example/CounterIntegrationTest.java
import kalix.javasdk.testkit.junit.jupiter.KalixTestKitExtension;
// ...

public class CounterIntegrationTest {

  /**
   * The test kit starts both the service container and the Kalix Runtime.
   */
  @RegisterExtension
  public static final KalixTestKitExtension testKit =
      new KalixTestKitExtension(Main.createKalix()); (1)

  /**
   * Use the generated gRPC client to call the service through the Kalix Runtime.
   */
  private final CounterService client;

  public CounterIntegrationTest() {
    client = testKit.getGrpcClient(CounterService.class); (2)
  }

  @Test
  public void increaseOnNonExistingEntity() throws Exception {
    String entityId = "new-id";
    client.increase(CounterApi.IncreaseValue.newBuilder().setCounterId(entityId).setValue(42).build()) (3)
        .toCompletableFuture().get(5, SECONDS);
    CounterApi.CurrentCounter reply = client.getCurrentCounter(CounterApi.GetCounter.newBuilder().setCounterId(entityId).build())
        .toCompletableFuture().get(5, SECONDS);
    assertThat(reply.getValue(), is(42));
  }
}
1 Using the TestKit to create the service container and Kalix Runtime.
2 Creating a client for interacting with the gRPC endpoints for CounterService.
3 Increasing counter and asserting on its current status.

The integration tests are in a special profile it of the project and can be run using mvn verify -Pit.

Scala
/src/test/scala/com/example/CounterServiceIntegrationSpec.scala
import kalix.scalasdk.testkit.KalixTestKit
// ...

class CounterServiceIntegrationSpec
    extends AnyWordSpec
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures {

  implicit private val patience: PatienceConfig =
    PatienceConfig(Span(5, Seconds), Span(500, Millis))

  private val testKit = KalixTestKit(Main.createKalix()).start() (1)
  import testKit.executionContext

  private val client = testKit.getGrpcClient(classOf[CounterService]) (2)

  "CounterService" must {

    "Increase and decrease a counter" in {
      val counterId = "42"

      val updateResult =
        for {
          _ <- client.increase(IncreaseValue(counterId, 42))
          done <- client.decrease(DecreaseValue(counterId, 32))
        } yield done

      updateResult.futureValue

      val getResult = client.getCurrentCounter(GetCounter(counterId)) (3)
      getResult.futureValue.value shouldBe(42-32)
    }

  }

  override def afterAll(): Unit = { (4)
    testKit.stop()
    super.afterAll()
  }
}
1 Using the TestKit to create the service container and Kalix Runtime.
2 Creating a client for interacting with the gRPC endpoints for CounterService.
3 Asserting on current value of counter after being increased.
4 Shutting down TestKit resources after test concludes.

By default, the integration and unit test are both invoked by sbt test.