Implementing Value Entities
Value Entities persist state on every change and Kalix needs to serialize that data to send it to the underlying data store, this is done with Protocol Buffers using protobuf
types.
Kalix Value Entities have nothing in common with the domain-driven design concept of Value Objects. The Value in the name refers to directly modifying a value for the entity’s state - in contrast to Event-sourced Entities that persist events and the entity state is derived from them. |
While Protocol Buffers are the recommended format for persisting state, we recommend that you do not persist your service’s public protobuf
messages. This may introduce some overhead to convert from one type to the other but allows the service public interface logic to evolve independently of the data storage format, which should be private.
The steps necessary to implement a Value Entity include:
-
Defining the API and domain objects in
.proto
files. -
Implementing behavior in command handlers.
-
Creating and initializing the Entity.
The sections on this page walk through these steps using a counter service as an example.
Defining the proto
files
Our Value Entity example starts with the "Counter" service as included in the project template. |
The following counter_domain.proto
file defines state the Entity will hold. The entity stores an integer value as represented in the message CounterState
. Real-world entities store much more data - often structured data - they represent an Entity in the domain-driven design sense of the term.
- Java
-
src/main/proto/com/example/domain/counter_domain.proto
syntax = "proto3"; package com.example.domain; (1) option java_outer_classname = "CounterDomain"; (2) message CounterState { (3) int32 value = 1; }
1 Any classes generated from this protobuf file will be in the Java package com.example.domain
.2 Let the messages declared in this protobuf file be inner classes to the Java class CounterDomain
.3 The CounterState
protobuf message is what Kalix stores for this entity. - Scala
-
src/main/proto/com/example/domain/counter_domain.proto
syntax = "proto3"; package com.example.domain; (1) message CounterState { (2) int32 value = 1; }
1 Any classes generated from this protobuf file will be in the Scala package com.example.domain
.2 The CounterState
protobuf message is what Kalix stores for this entity.
The counter_api.proto
file defines the commands we can send to the Counter service to manipulate or access the Counter’s state.
In this file we instruct the Kalix code generation tooling (codegen) which kind of component we want to create. The codegen will generate all stubs for your entity/service and corresponding tests, as well as an abstract class for your implementation to extend.
They make up the service API:
- Java
-
src/main/proto/com/example/counter_api.proto
// This is the public API offered by your entity. syntax = "proto3"; package com.example; (1) import "kalix/annotations.proto"; (2) import "google/api/annotations.proto"; import "google/protobuf/empty.proto"; option java_outer_classname = "CounterApi"; (3) message IncreaseValue { (4) string counter_id = 1 [(kalix.field).id = true]; (5) int32 value = 2; } message DecreaseValue { string counter_id = 1 [(kalix.field).id = true]; int32 value = 2; } message ResetValue { string counter_id = 1 [(kalix.field).id = true]; } message GetCounter { string counter_id = 1 [(kalix.field).id = true]; } message DeleteCounter { string counter_id = 1 [(kalix.field).id = true]; } message CurrentCounter { (6) int32 value = 1; } service CounterService { (7) option (kalix.codegen) = { (8) value_entity: { (9) name: "com.example.domain.Counter" (10) type_id: "counter" (11) state: "com.example.domain.CounterState" (12) } }; rpc Increase (IncreaseValue) returns (google.protobuf.Empty); rpc IncreaseWithConditional (IncreaseValue) returns (google.protobuf.Empty); rpc Decrease (DecreaseValue) returns (google.protobuf.Empty); rpc Reset (ResetValue) returns (google.protobuf.Empty); rpc GetCurrentCounter (GetCounter) returns (CurrentCounter); rpc Delete (DeleteCounter) returns (google.protobuf.Empty); }
1 Any classes generated from this protobuf file will be in the Java package com.example
.2 Import the Kalix protobuf annotations, or options. 3 Let the messages declared in this protobuf file be inner classes to the class CounterApi
.4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data. 5 Every Command must contain a string
field that contains the entity ID and is marked with the(kalix.field).id
option.6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty
.7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity. 8 The protobuf option (kalix.codegen)
is specific to code-generation as provided by the Kalix Maven plugin.9 value_entity
indicates that we want the codegen to generate a Value Entity for this service.10 name
denotes the base name for the Value Entity, the code-generation will create initial sourcesCounter
,CounterTest
andCounterIntegrationTest
. Once these files exist, they are not overwritten, so you can freely add logic to them.11 entity_type
is a unique identifier of the "state storage." The entity name may be changed even after data has been created, theentity_type
can’t.12 state
points to the protobuf message representing the Value Entity’s state which is kept by Kalix. Note, the package and name follow the definition in the domain.proto file. - Scala
-
src/main/proto/com/example/counter_api.proto
// This is the public API offered by your entity. syntax = "proto3"; package com.example; (1) import "kalix/annotations.proto"; (2) import "google/api/annotations.proto"; import "google/protobuf/empty.proto"; message IncreaseValue { (3) string counter_id = 1 [(kalix.field).id = true]; (4) int32 value = 2; } message DecreaseValue { string counter_id = 1 [(kalix.field).id = true]; int32 value = 2; } message ResetValue { string counter_id = 1 [(kalix.field).id = true]; } message GetCounter { string counter_id = 1 [(kalix.field).id = true]; } message DeleteCounter { string counter_id = 1 [(kalix.field).id = true]; } message CurrentCounter { (5) int32 value = 1; } service CounterService { (6) option (kalix.codegen) = { (7) value_entity: { (8) name: "com.example.domain.Counter" (9) type_id: "counter" (10) state: "com.example.domain.CounterState" (11) } }; rpc Increase (IncreaseValue) returns (google.protobuf.Empty); rpc IncreaseWithConditional (IncreaseValue) returns (google.protobuf.Empty); rpc Decrease (DecreaseValue) returns (google.protobuf.Empty); rpc Reset (ResetValue) returns (google.protobuf.Empty); rpc GetCurrentCounter (GetCounter) returns (CurrentCounter); rpc Delete (DeleteCounter) returns (google.protobuf.Empty); }
1 Any classes generated from this protobuf file will be in the Scala package com.example
.2 Import the Kalix protobuf annotations or options. 3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data. 4 Every Command must contain a string
field that contains the entity ID and is marked with the(kalix.field).id
option.5 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty
.6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity. 7 The protobuf option (kalix.codegen)
is specific to code-generation as provided by the Kalix Maven plugin.8 value_entity
indicates that we want the codegen to generate a Value Entity for this service.9 name
denotes the base name for the Value Entity, the code-generation will create initial sourcesCounter
,CounterTest
andCounterIntegrationTest
. Once these files exist, they are not overwritten, so you can freely add logic to them.10 entity_type
is a unique identifier of the "state storage." The entity name may be changed even after data has been created, theentity_type
can’t.11 state
points to the protobuf message representing the Value Entity’s state which is kept by Kalix. Note, the package and name follow the definition in the domain.proto file.
Value Entity’s Effect API
The Value Entity’s Effect defines the operations that Kalix should perform when an incoming command is handled by a Value Entity.
A Value Entity Effect can either:
-
update the entity state and send a reply to the caller
-
directly reply to the caller if the command is not requesting any state change
-
rejected the command by returning an error
-
instruct Kalix to delete the entity
See also Understanding what an Effect is
Implementing behavior
A Value Entity implementation is a class where you define how each command is handled. The class Counter
gets generated for us based on the counter_api.proto
and counter_domain.proto
definitions. Once the generated file exists, it is not overwritten, so you can freely add logic to it. Counter
extends the generated class AbstractCounter
which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractCounter
contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the Counter
class and you have to implement the methods required by AbstractCounter
.
- Java
-
src/main/java/com/example/domain/Counter.java
/** * A Counter represented as a value entity. */ public class Counter extends AbstractCounter { (1) private final String entityId; public Counter(ValueEntityContext context) { this.entityId = context.entityId(); } @Override public CounterDomain.CounterState emptyState() { (2) return CounterDomain.CounterState.getDefaultInstance(); }
1 Extends the generated AbstractCounter
, which extendsValueEntity
.2 Defines the initial, empty, state that is used before any updates. - Scala
-
src/main/java/com/example/domain/Counter.scala
class Counter(context: ValueEntityContext) extends AbstractCounter { (1) override def emptyState: CounterState = CounterState() (2)
1 Extends the generated AbstractCounter
, which extendsValueEntity
.2 Defines the initial, empty, state that is used before any updates.
We need to implement all methods our Value Entity offers as command handlers.
The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.
Command handlers are implemented in the Counter
class as methods that override abstract methods from AbstractCounter
. The methods take the current state as the first parameter and the request message as the second parameter. They return an Effect
, which describes the next processing actions, such as updating state and sending a reply.
When adding or changing the rpc
definitions, including name, parameter, and return messages, in the .proto
files the corresponding methods are regenerated in the abstract class (AbstractCounter
). This means that the compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.
Updating state
In the example below, the increase
service call uses the value from the request message IncreaseValue
. It returns an Effect
to update the entity state and send a reply.
For Value Entities, modify the state and then trigger a save of that state in the returned Effect . The Java/Protobuf SDK has an effects().updateState(newState) method for this purpose. If you change the state but do not call updateState in the returned Effect , that state change is lost.
|
- Java
-
/src/main/java/com/example/domain/Counter.java
@Override public Effect<Empty> increase( CounterDomain.CounterState currentState, CounterApi.IncreaseValue command) { if (command.getValue() < 0) { (1) return effects().error("Increase requires a positive value. It was [" + command.getValue() + "]."); } CounterDomain.CounterState newState = (2) currentState.toBuilder().setValue(currentState.getValue() + command.getValue()).build(); return effects() .updateState(newState) (3) .thenReply(Empty.getDefaultInstance()); (4) }
1 The validation ensures acceptance of positive values and it fails calls with illegal values by returning an Effect
witheffects().error
.2 From the current state we create a new state with the increased value. 3 We store the new state by returning an Effect
witheffects().updateState
.4 The acknowledgment that the command was successfully processed is only sent if the state update was successful, otherwise there will be an error reply. - Scala
-
/src/main/java/com/example/domain/Counter.scala
override def increase(currentState: CounterState, command: example.IncreaseValue): ValueEntity.Effect[Empty] = if (command.value < 0) (1) effects.error(s"Increase requires a positive value. It was [${command.value}].") else { val newState = currentState.copy(value = currentState.value + command.value) (2) effects .updateState(newState) (3) .thenReply(Empty.defaultInstance) (4) }
1 The validation ensures acceptance of positive values and it fails calls with illegal values by returning an Effect
witheffects.error
.2 From the current state we create a new state with the increased value. 3 We store the new state by returning an Effect
witheffects.updateState
.4 The acknowledgment that the command was successfully processed is only sent if the state update was successful, otherwise there will be an error reply.
Retrieving state
The following example shows the implementation of the GetCurrentCounter
command handler. This command handler is a read-only command handler—it doesn’t update the state, it just returns it:
- Java
-
src/main/java/com/example/domain/Counter.java
@Override public Effect<CounterApi.CurrentCounter> getCurrentCounter( CounterDomain.CounterState currentState, (1) CounterApi.GetCounter command) { CounterApi.CurrentCounter current = CounterApi.CurrentCounter.newBuilder() .setValue(currentState.getValue()).build(); (2) return effects().reply(current); }
1 The current state is passed to the method. 2 We use its value to create the CurrentCounter
value that is sent as a reply by returning anEffect
witheffects().reply
. - Scala
-
src/main/scala/com/example/domain/Counter.scala
override def getCurrentCounter( currentState: CounterState, (1) command: example.GetCounter): ValueEntity.Effect[example.CurrentCounter] = effects.reply(CurrentCounter(currentState.value)) (2)
1 The current state is passed to the method. 2 We use its value to create the CurrentCounter
value that is sent as a reply by returning anEffect
witheffects.reply
.
Registering the Entity
To make Kalix aware of the Value Entity, we need to register it with the service.
From the code-generation, the registration gets automatically inserted in the generated KalixFactory.withComponents
method from the Main
class.
- Java
-
/src/main/java/com/example/Main.java
public final class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); public static Kalix createKalix() { // The KalixFactory automatically registers any generated Actions, Views or Entities, // and is kept up-to-date with any changes in your protobuf definitions. // If you prefer, you may remove this and manually register these components in a // `new Kalix()` instance. return KalixFactory.withComponents( Counter::new); } public static void main(String[] args) throws Exception { LOG.info("starting the Kalix service"); createKalix().start(); } }
- Scala
-
/src/main/scala/com/example/Main.scala
object Main { private val log = LoggerFactory.getLogger("com.example.Main") def createKalix(): Kalix = { // The KalixFactory automatically registers any generated Actions, Views or Entities, // and is kept up-to-date with any changes in your protobuf definitions. // If you prefer, you may remove this and manually register these components in a // `Kalix()` instance. KalixFactory.withComponents( new Counter(_)) } def main(args: Array[String]): Unit = { log.info("starting the Kalix service") createKalix().start() } }
By default, the generated constructor has a ValueEntityContext
parameter, but you can change this to accept other parameters. If you change the constructor of the Counter
class you will see a compilation error here, and you have to adjust the factory function that is passed to KalixFactory.withComponents
.
When more components are added the KalixFactory
is regenerated, and you have to adjust the registration from the Main
class.
Deleting state
The next example shows how to delete a Value Entity state by returning special deleteEntity()
effect.
- Java
-
/src/main/java/com/example/domain/Counter.java
@Override public Effect<Empty> delete(CounterDomain.CounterState currentState, CounterApi.DeleteCounter deleteCounter) { return effects() .deleteEntity() (1) .thenReply(Empty.getDefaultInstance()); }
1 We delete the state by returning an Effect
witheffects().deleteEntity()
. - Scala
-
/src/main/java/com/example/domain/Counter.scala
override def delete(currentState: CounterState, deleteCounter: DeleteCounter): ValueEntity.Effect[Empty] = effects .deleteEntity() (1) .thenReply(Empty.defaultInstance)
1 We delete the state by returning an Effect
witheffects.deleteEntity()
.
When you give the instruction to delete the entity it will still exist with an empty state for some time. The actual removal happens later to give downstream consumers time to process the change. By default, the existence of the entity is completely cleaned up after a week.
It is not allowed to make further changes after the entity has been "marked" as deleted. You can still handle read requests of the entity until it has been completely removed, but be the current state will be empty.
If you want to make changes after deleting the state you should use the updateState effect with an empty state, as a logical delete, instead of using deleteEntity .
|
It is best to not reuse the same entity id after deletion, but if that happens after the entity has been completely removed it will be instantiated as a completely new entity without any knowledge of previous state.
Note that deleting View state must be handled explicitly.
Running Side Effects
An Entity may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.
See this dedicated section regarding Actions, for more details.
Testing the Entity
There are two ways to test an Entity:
-
unit tests, which run the Entity class in the same JVM as the test code itself with the help of a test kit
-
integration tests, with the service deployed in a docker container running the entire service and the test interacting over gRPC with it.
Each way has its benefits, unit tests are faster and provide more immediate feedback about success or failure but can only test a single entity at a time and in isolation. Integration tests, on the other hand, are more realistic and allow many entities to interact with other components inside and outside the service.
Unit tests
To unit test the Entity a test kit class to use is generated as well as an example unit test class to start from. Test cases use the test kit to execute commands in the entity, get a ValueEntityResult
back and assert the effects that the command led to, both the reply itself and the update to the state of the Entity.
- Java
-
/src/test/java/com/example/domain/CounterTest.java
import kalix.javasdk.Metadata; import kalix.javasdk.testkit.ValueEntityResult; import com.example.CounterApi; import com.google.protobuf.Empty; import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.*; public class CounterTest { @Test public void increaseCounterTest() { CounterTestKit testKit = CounterTestKit.of(Counter::new); CounterApi.IncreaseValue increaseValueCommand = CounterApi.IncreaseValue.newBuilder() .setValue(1) .build(); ValueEntityResult<Empty> result1 = testKit.increase(increaseValueCommand); assertEquals(Empty.getDefaultInstance(), result1.getReply()); assertEquals(1, testKit.getState().getValue()); // one more time ValueEntityResult<Empty> result2 = testKit.increase(increaseValueCommand); assertEquals(Empty.getDefaultInstance(), result2.getReply()); assertEquals(2, testKit.getState().getValue()); }
The unit tests can be run from maven using
mvn test
or if you prefer from inside your IDE the same way you usually run tests. - Scala
-
/src/test/scala/com/example/domain/CounterSpec.scala
class CounterSpec extends AnyWordSpec with Matchers { "Counter" must { "handle command Increase" in { val testKit = CounterTestKit(new Counter(_)) val result1 = testKit.increase(IncreaseValue(value = 1)) result1.reply shouldBe Empty.defaultInstance // one more time val result2 = testKit.increase(IncreaseValue(value = 1)) result2.reply shouldBe Empty.defaultInstance testKit.currentState().value shouldBe 2 }
The CounterTestKit
is stateful, and it holds the state of a single entity instance in memory. If you want to test more than one entity in a test, you need to create multiple instance ofCounterTestKit
.
By default, the integration and unit tests are both invoked by sbt test
. To only run unit tests run sbt -DonlyUnitTest test
, or sbt -DonlyUnitTest=true test
, or set up that value to true
in the sbt session by set onlyUnitTest := true
and then run test
Integration tests
An example integration test class to start from is also generated for you. It uses an KalixTestKitExtension
KalixTestKit
to start docker containers and interacts with the entity with an actual gRPC client.
- Java
-
/src/it/java/com/example/CounterIntegrationTest.java
import kalix.javasdk.testkit.junit.jupiter.KalixTestKitExtension; // ... public class CounterIntegrationTest { /** * The test kit starts both the service container and the Kalix Runtime. */ @RegisterExtension public static final KalixTestKitExtension testKit = new KalixTestKitExtension(Main.createKalix()); (1) /** * Use the generated gRPC client to call the service through the Kalix Runtime. */ private final CounterService client; public CounterIntegrationTest() { client = testKit.getGrpcClient(CounterService.class); (2) } @Test public void increaseOnNonExistingEntity() throws Exception { String entityId = "new-id"; client.increase(CounterApi.IncreaseValue.newBuilder().setCounterId(entityId).setValue(42).build()) (3) .toCompletableFuture().get(5, SECONDS); CounterApi.CurrentCounter reply = client.getCurrentCounter(CounterApi.GetCounter.newBuilder().setCounterId(entityId).build()) .toCompletableFuture().get(5, SECONDS); assertThat(reply.getValue(), is(42)); } }
1 Using the TestKit to create the service container and Kalix Runtime. 2 Creating a client for interacting with the gRPC endpoints for CounterService
.3 Increasing counter and asserting on its current status. The integration tests are in a special profile
it
of the project and can be run usingmvn verify -Pit
. - Scala
-
/src/test/scala/com/example/CounterServiceIntegrationSpec.scala
import kalix.scalasdk.testkit.KalixTestKit // ... class CounterServiceIntegrationSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with ScalaFutures { implicit private val patience: PatienceConfig = PatienceConfig(Span(5, Seconds), Span(500, Millis)) private val testKit = KalixTestKit(Main.createKalix()).start() (1) import testKit.executionContext private val client = testKit.getGrpcClient(classOf[CounterService]) (2) "CounterService" must { "Increase and decrease a counter" in { val counterId = "42" val updateResult = for { _ <- client.increase(IncreaseValue(counterId, 42)) done <- client.decrease(DecreaseValue(counterId, 32)) } yield done updateResult.futureValue val getResult = client.getCurrentCounter(GetCounter(counterId)) (3) getResult.futureValue.value shouldBe(42-32) } } override def afterAll(): Unit = { (4) testKit.stop() super.afterAll() } }
1 Using the TestKit to create the service container and Kalix Runtime. 2 Creating a client for interacting with the gRPC endpoints for CounterService
.3 Asserting on current value of counter after being increased. 4 Shutting down TestKit resources after test concludes. By default, the integration and unit test are both invoked by
sbt test
.