Implementing Replicated Entities
Replicated Entities distribute state using a conflict-free replicated data type (CRDT). Data is shared across multiple instances of a Replicated Entity and is eventually consistent to provide high availability with low latency. The underlying CRDT semantics allow Replicated Entity instances to update their state independently and concurrently and without coordination. The state changes will always converge without conflicts, but note that with the state being eventually consistent, reading the current data may return an out-of-date value.
Kalix needs to serialize the data to replicate, and this is done with Protocol Buffers using protobuf
types. While Protocol Buffers are the recommended format for state, we recommend that you do not use your service’s public protobuf
messages in the replicated data. This may introduce some overhead to convert from one type to the other, but allows the service public interface logic to evolve independently of the data format, which should be private.
The steps necessary to implement a Replicated Entity include:
-
Defining the API and domain objects in
.proto
files. -
Implementing behavior in command handlers.
-
Creating and initializing the Replicated Entity.
The sections on this page walk through these steps using a shopping cart service as an example.
Defining the proto
files
Our Replicated Entity example is a shopping cart service. |
The following shoppingcart_domain.proto
file defines our "Shopping Cart" Replicated Entity. The entity manages line items of a cart and stores these as a Replicated Counter Map, mapping from each item’s product details to its quantity. The counter for each item can be incremented independently in separate Replicated Entity instances and will converge to a total quantity.
- Java
-
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// The messages and data that will be replicated for the shopping cart. syntax = "proto3"; package com.example.shoppingcart.domain; (1) option java_outer_classname = "ShoppingCartDomain"; (2) message Product { string id = 1; string name = 2; }
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.domain
.2 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartDomain
. - Scala
-
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// The messages and data that will be replicated for the shopping cart. syntax = "proto3"; package com.example.shoppingcart.domain; (1) message Product { string id = 1; string name = 2; }
1 Any classes generated from this protobuf file will be in the package com.example.shoppingcart.domain
.
Each Replicated Entity is associated with one underlying Replicated Data type. Replicated Data types that are generic, accepting type parameters for key, value, or element types, are used with protobuf messages and can represent structured data. In this shopping cart example, the keys of the counter map are products that have an id and name.
|
The shoppingcart_api.proto
file defines the commands we can send to the shopping cart service to manipulate or access the cart’s state. They make up the service API:
- Java
-
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the Shopping Cart Replicated Entity. syntax = "proto3"; package com.example.shoppingcart; (1) import "google/protobuf/empty.proto"; (2) import "kalix/annotations.proto"; import "google/api/annotations.proto"; option java_outer_classname = "ShoppingCartApi"; (3) message AddLineItem { (4) string cart_id = 1 [(kalix.field).id = true]; (5) string product_id = 2; string name = 3; int32 quantity = 4; } message RemoveLineItem { string cart_id = 1 [(kalix.field).id = true]; string product_id = 2; string name = 3; } message GetShoppingCart { string cart_id = 1 [(kalix.field).id = true]; } message RemoveShoppingCart { string cart_id = 1 [(kalix.field).id = true]; } message LineItem { string product_id = 1; string name = 2; int64 quantity = 3; } message Cart { (6) repeated LineItem items = 1; } service ShoppingCartService { (7) option (kalix.codegen) = { (8) replicated_entity: { (9) name: "com.example.shoppingcart.domain.ShoppingCart" (10) type_id: "shopping-cart" (11) replicated_counter_map: { (12) key: "com.example.shoppingcart.domain.Product" (13) } } }; rpc AddItem (AddLineItem) returns (google.protobuf.Empty) { option (google.api.http) = { post: "/cart/{cart_id}/items/add" body: "*" }; } rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) { option (google.api.http) = { post: "/cart/{cart_id}/items/{product_id}/remove" }; } rpc GetCart (GetShoppingCart) returns (Cart) { option (google.api.http) = { get: "/carts/{cart_id}" additional_bindings: { get: "/carts/{cart_id}/items" response_body: "items" } }; } rpc RemoveCart (RemoveShoppingCart) returns (google.protobuf.Empty) { option (google.api.http).post = "/carts/{cart_id}/remove"; } }
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart
.2 Import the Kalix protobuf annotations, or options. 3 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartApi
.4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data. 5 Every Command must contain a string
field that contains the entity ID and is marked with the(kalix.field).id
option.6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty
.7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity. 8 The protobuf option (kalix.codegen)
is specific to code-generation as provided by the Kalix Maven plugin.9 replicated_entity
indicates that we want the codegen to generate a Replicated Entity for this service.10 name
denotes the base name for the Replicated Entity. The code-generation will create initial sourcesShoppingCart
andShoppingCartIntegrationTest
. Once these files exist, they are not overwritten, so you can freely add logic to them.11 entity_type
is a unique identifier for data replication. The entity name may be changed even after data has been created, theentity_type
can’t be changed.12 replicated_counter_map
describes the Replicated Data type for this entity.13 key
points to the protobuf message representing the counter map’s key type. - Scala
-
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the Shopping Cart Replicated Entity. syntax = "proto3"; package com.example.shoppingcart; (1) import "google/protobuf/empty.proto"; (2) import "kalix/annotations.proto"; import "google/api/annotations.proto"; message AddLineItem { (3) string cart_id = 1 [(kalix.field).id = true]; (4) string product_id = 2; string name = 3; int32 quantity = 4; } message RemoveLineItem { string cart_id = 1 [(kalix.field).id = true]; string product_id = 2; string name = 3; } message GetShoppingCart { string cart_id = 1 [(kalix.field).id = true]; } message RemoveShoppingCart { string cart_id = 1 [(kalix.field).id = true]; } message LineItem { string product_id = 1; string name = 2; int64 quantity = 3; } message Cart { (5) repeated LineItem items = 1; } service ShoppingCartService { (6) option (kalix.codegen) = { (7) replicated_entity: { (8) name: "com.example.shoppingcart.domain.ShoppingCart" (9) type_id: "shopping-cart" (10) replicated_counter_map: { (11) key: "com.example.shoppingcart.domain.Product" (12) } } }; rpc AddItem (AddLineItem) returns (google.protobuf.Empty) { option (google.api.http) = { post: "/cart/{cart_id}/items/add" body: "*" }; } rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) { option (google.api.http) = { post: "/cart/{cart_id}/items/{product_id}/remove" }; } rpc GetCart (GetShoppingCart) returns (Cart) { option (google.api.http) = { get: "/carts/{cart_id}" additional_bindings: { get: "/carts/{cart_id}/items" response_body: "items" } }; } rpc RemoveCart (RemoveShoppingCart) returns (google.protobuf.Empty) { option (google.api.http).post = "/carts/{cart_id}/remove"; } }
1 Any classes generated from this protobuf file will be in the package com.example.shoppingcart
.2 Import the Kalix protobuf annotations, or options. 3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data. 4 Every Command must contain a string
field that contains the entity ID and is marked with the(kalix.field).id
option.5 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty
.6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity. 7 The protobuf option (kalix.codegen)
is specific to code-generation as provided by the Kalix Maven plugin.8 replicated_entity
indicates that we want the codegen to generate a Replicated Entity for this service.9 name
denotes the base name for the Replicated Entity. The code-generation will create initial sourcesShoppingCart
andShoppingCartIntegrationTest
. Once these files exist, they are not overwritten, so you can freely add logic to them.10 entity_type
is a unique identifier for data replication. The entity name may be changed even after data has been created, theentity_type
can’t be changed.11 replicated_counter_map
describes the Replicated Data type for this entity.12 key
points to the protobuf message representing the counter map’s key type.
Replicated Entity’s Effect API
The Replicated Entity’s Effect defines the operations that Kalix should perform when an incoming command is handled by a Replicated Entity.
A Replicated Entity Effect can either:
-
update the entity state and send a reply to the caller
-
directly reply to the caller if the command is not requesting any state change
-
rejected the command by returning an error
-
instruct Kalix to delete the entity
See also Understanding what an Effect is
Implementing behavior
A Replicated Entity implementation is a Java class where you define how each command is handled. The class ShoppingCart
gets generated for us based on the shoppingcart_api.proto
and shoppingcart_domain.proto
definitions. Once the file ShoppingCart.java
ShoppingCart.scala
exist, it is not overwritten, so you can freely add logic to it. ShoppingCart
extends the generated class AbstractShoppingCart
which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractShoppingCart
contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the ShoppingCart
class and you have to implement the methods required by AbstractShoppingCart
.
- Java
-
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public class ShoppingCart extends AbstractShoppingCart { (1)
1 Extends the generated AbstractShoppingCart
, which extendsReplicatedCounterMapEntity
. - Scala
-
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
class ShoppingCart(context: ReplicatedEntityContext) extends AbstractShoppingCart { (1)
1 Extends the generated AbstractShoppingCart
, which extendsReplicatedCounterMapEntity
.
We need to implement all methods our Replicated Entity offers as command handlers.
The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.
Command handlers are implemented in the ShoppingCart
class as methods that override abstract methods from AbstractShoppingCart
. The methods take the current data value as the first parameter and the request message as the second parameter. They return an Effect
, which describes next processing actions, such as updating state and sending a reply.
When adding or changing the rpc
definitions, including name, parameter and return messages, in the .proto
files the corresponding methods are regenerated in the abstract class (AbstractShoppingCart
). This means that the Java compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.
Updating state
In the example below, the AddItem
service call uses the request message AddLineItem
. It returns an Effect
to update the underlying data and then send a reply.
The only way for a command handler to modify the underlying data for a Replicated Entity is by returning an update effect with an updated Replicated Data object. Note that Replicated Data objects are immutable, with each modifying method returning a new instance of the Replicated Data type. |
- Java
-
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override public Effect<Empty> addItem( ReplicatedCounterMap<ShoppingCartDomain.Product> cart, ShoppingCartApi.AddLineItem addLineItem) { if (addLineItem.getQuantity() <= 0) { (1) return effects().error("Quantity for item " + addLineItem.getProductId() + " must be greater than zero."); } ShoppingCartDomain.Product product = (2) ShoppingCartDomain.Product.newBuilder() .setId(addLineItem.getProductId()) .setName(addLineItem.getName()) .build(); ReplicatedCounterMap<ShoppingCartDomain.Product> updatedCart = (3) cart.increment(product, addLineItem.getQuantity()); return effects() .update(updatedCart) (4) .thenReply(Empty.getDefaultInstance()); (5) }
1 The validation ensures quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect
witheffects().error
.2 From the current incoming AddLineItem
we create a newProduct
object to represent the item’s key in the counter map.3 We increment the counter for this item in the cart. A new counter will be created if the cart doesn’t contain this item already. 4 We update the underlying data for the Replicated Entity by returning an Effect
witheffects().update
and the updated data object.5 An acknowledgment that the command was successfully processed is sent with a reply message. - Scala
-
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
def addItem( cart: ReplicatedCounterMap[Product], addLineItem: shoppingcart.AddLineItem): ReplicatedEntity.Effect[Empty] = { if (addLineItem.quantity <= 0) { (1) effects.error(s"Quantity for item ${addLineItem.productId} must be greater than zero.") } else { val product = Product(addLineItem.productId, addLineItem.name) (2) val updatedCart = cart.increment(product, addLineItem.quantity) (3) effects .update(updatedCart) (4) .thenReply(Empty.defaultInstance) (5) } }
1 The validation ensures quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect
witheffects.error
.2 From the current incoming AddLineItem
we create a newProduct
object to represent the item’s key in the counter map.3 We increment the counter for this item in the cart. A new counter will be created if the cart doesn’t contain this item already. 4 We update the underlying data for the Replicated Entity by returning an Effect
witheffects.update
and the updated data object.5 An acknowledgment that the command was successfully processed is sent with a reply message.
Retrieving state
The following example shows the implementation of the GetCart
command handler. This command handler is a read-only command handler—it doesn’t update the state, it just returns it.
The state of Replicated Entities is eventually consistent. An individual Replicated Entity instance may have an out-of-date value, if there are concurrent modifications made by another instance. |
- Java
-
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override public Effect<ShoppingCartApi.Cart> getCart( ReplicatedCounterMap<ShoppingCartDomain.Product> cart, (1) ShoppingCartApi.GetShoppingCart getShoppingCart) { List<ShoppingCartApi.LineItem> allItems = cart.keySet().stream() .map( product -> ShoppingCartApi.LineItem.newBuilder() .setProductId(product.getId()) .setName(product.getName()) .setQuantity(cart.get(product)) .build()) .sorted(Comparator.comparing(ShoppingCartApi.LineItem::getProductId)) .collect(Collectors.toList()); ShoppingCartApi.Cart apiCart = (2) ShoppingCartApi.Cart.newBuilder().addAllItems(allItems).build(); return effects().reply(apiCart); }
1 The current data is passed to the method. Note that this may not be the most up-to-date value, with concurrent modifications made by other instances of this Replicated Entity being replicated eventually. 2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect
witheffects().reply
. - Scala
-
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
def getCart( cart: ReplicatedCounterMap[Product], (1) getShoppingCart: shoppingcart.GetShoppingCart): ReplicatedEntity.Effect[shoppingcart.Cart] = { val allItems = cart.keySet .map { product => val quantity = cart.get(product).getOrElse(0L) shoppingcart.LineItem(product.id, product.name, quantity) } .toSeq .sortBy(_.productId) val apiCart = shoppingcart.Cart(allItems) (2) effects.reply(apiCart) }
1 The current data is passed to the method. Note that this may not be the most up-to-date value, with concurrent modifications made by other instances of this Replicated Entity being replicated eventually. 2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect
witheffects.reply
.
Deleting state
The following example shows the implementation of the RemoveCart
command handler. Replicated Entity instances for a particular entity identifier can be deleted, using a delete Effect
. Once deleted, an entity instance cannot be recreated, and all subsequent commands for that entity identifier will be rejected with an error.
Caution should be taken with creating and deleting Replicated Entities, as Kalix maintains the replicated state in memory and also retains tombstones for each deleted entity. Over time, if many Replicated Entities are created and deleted, this will result in hitting memory limits. |
- Java
-
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override public Effect<Empty> removeCart( ReplicatedCounterMap<ShoppingCartDomain.Product> cart, ShoppingCartApi.RemoveShoppingCart removeShoppingCart) { return effects() .delete() (1) .thenReply(Empty.getDefaultInstance()); }
1 The Replicated Entity instances for the associated entity id are deleted by using effects().delete
. - Scala
-
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
def removeCart( cart: ReplicatedCounterMap[Product], removeShoppingCart: shoppingcart.RemoveShoppingCart): ReplicatedEntity.Effect[Empty] = effects.delete (1) .thenReply(Empty.defaultInstance)
1 The Replicated Entity instances for the associated entity id are deleted by using effects.delete
.
Registering the Entity
To make Kalix aware of the Replicated Entity, we need to register it with the service.
From the code-generation, the registration gets automatically inserted in the generated KalixFactory.withComponents
method from the Main
class.
- Java
-
src/main/java/com/example/shoppingcart/Main.java
/* This code was generated by Kalix tooling. * As long as this file exists it will not be re-generated. * You are free to make changes to this file. */ package com.example.shoppingcart; import kalix.javasdk.Kalix; import com.example.shoppingcart.domain.ShoppingCart; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public final class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); public static Kalix createKalix() { // The KalixFactory automatically registers any generated Actions, Views or Entities, // and is kept up-to-date with any changes in your protobuf definitions. // If you prefer, you may remove this and manually register these components in a // `new Kalix()` instance. return KalixFactory.withComponents(ShoppingCart::new); } public static void main(String[] args) throws Exception { LOG.info("starting the Kalix service"); createKalix().start(); } }
- Scala
-
src/main/scala/com/example/shoppingcart/Main.scala
package com.example.shoppingcart import kalix.scalasdk.Kalix import com.example.shoppingcart.domain.ShoppingCart import org.slf4j.LoggerFactory // This class was initially generated based on the .proto definition by Kalix tooling. // // As long as this file exists it will not be overwritten: you can maintain it yourself, // or delete it so it is regenerated as needed. object Main { private val log = LoggerFactory.getLogger("com.example.shoppingcart.Main") def createKalix(): Kalix = { // The KalixFactory automatically registers any generated Actions, Views or Entities, // and is kept up-to-date with any changes in your protobuf definitions. // If you prefer, you may remove this and manually register these components in a // `Kalix()` instance. KalixFactory.withComponents( new ShoppingCart(_)) } def main(args: Array[String]): Unit = { log.info("starting the Kalix service") createKalix().start() } }
By default, the generated constructor has a ReplicatedEntityContext
parameter, but you can change this to accept other parameters. If you change the constructor of the ShoppingCart
class you will see a compilation error here, and you have to adjust the factory function that is passed to KalixFactory.withComponents
.
When more components are added, the KalixFactory
is regenerated and you have to adjust the registration from the Main
class.
Replicated Data types
Each Replicated Entity is associated with one underlying Replicated Data type. Counter, Register, Set, and Map data structures are available. This section describes how to configure and implement a Replicated Entity with each of the Replicated Data types.
The only way for a command handler to modify the underlying data for a Replicated Entity is by returning an update effect with an updated Replicated Data object. Note that Replicated Data objects are immutable, with each modifying method returning a new instance of the Replicated Data type. |
Replicated Counter
A ReplicatedCounter
can be incremented and decremented.
To configure a Replicated Entity with a Replicated Counter, use the replicated_counter
option when defining the proto
file:
- Java
-
src/main/proto/com/example/replicated/counter/counter_api.proto
service CounterService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.counter.domain.SomeCounter" (1) type_id: "some-counter" replicated_counter: {} (2) } }; rpc Increase(IncreaseValue) returns (google.protobuf.Empty); rpc Decrease(DecreaseValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Counter. - Scala
-
src/main/proto/com/example/replicated/counter/counter_api.proto
service CounterService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.counter.domain.SomeCounter" (1) type_id: "some-counter" replicated_counter: {} (2) } }; rpc Increase(IncreaseValue) returns (google.protobuf.Empty); rpc Decrease(DecreaseValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Counter.
When implementing a Replicated Counter entity, the state can be updated by calling the increment
or decrement
methods on the current data object, and then triggering an update with the modified data object:
- Java
-
src/main/java/com/example/replicated/counter/domain/SomeCounter.java
@Override public Effect<Empty> increase(ReplicatedCounter counter, SomeCounterApi.IncreaseValue command) { return effects() .update(counter.increment(command.getValue())) (1) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> decrease(ReplicatedCounter counter, SomeCounterApi.DecreaseValue command) { return effects() .update(counter.decrement(command.getValue())) (1) .thenReply(Empty.getDefaultInstance()); }
1 Modify the Replicated Counter with increment
ordecrement
and trigger a replicated update by returning anEffect
witheffects().update
. - Scala
-
src/main/scala/com/example/replicated/counter/domain/SomeCounter.scala
def increase(currentData: ReplicatedCounter, increaseValue: counter.IncreaseValue): ReplicatedEntity.Effect[Empty] = effects .update(currentData.increment(increaseValue.value)) (1) .thenReply(Empty.defaultInstance) def decrease(currentData: ReplicatedCounter, decreaseValue: counter.DecreaseValue): ReplicatedEntity.Effect[Empty] = effects .update(currentData.decrement(decreaseValue.value)) (1) .thenReply(Empty.defaultInstance)
1 Modify the Replicated Counter with increment
ordecrement
and trigger a replicated update by returning anEffect
witheffects.update
.
- Java
-
The current value of a Replicated Counter can be retrieved using
getValue
:src/main/java/com/example/replicated/counter/domain/SomeCounter.java@Override public Effect<SomeCounterApi.CurrentValue> get( ReplicatedCounter counter, SomeCounterApi.GetValue command) { long value = counter.getValue(); (1) SomeCounterApi.CurrentValue currentValue = SomeCounterApi.CurrentValue.newBuilder().setValue(value).build(); return effects().reply(currentValue); }
1 Get the current value of a Replicated Counter using getValue
. - Scala
-
The current value of a Replicated Counter can be retrieved using
value
:src/main/scala/com/example/replicated/counter/domain/SomeCounter.scaladef get(currentData: ReplicatedCounter, getValue: counter.GetValue): ReplicatedEntity.Effect[counter.CurrentValue] = effects .reply(counter.CurrentValue(currentData.value)) (1)
1 Get the current value of a Replicated Counter using value
.
The current value may not be the most up-to-date value when there are concurrent modifications. |
Replicated Register
A ReplicatedRegister
ReplicatedRegister
can contain any (serializable) value. Updates to the value are replicated using last-write-wins semantics, where concurrent modifications are resolved by using the update with the highest timestamp.
To configure a Replicated Entity with a Replicated Register, use the replicated_register
option when defining the proto
file:
- Java
-
src/main/proto/com/example/replicated/register/register_api.proto
service RegisterService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.register.domain.SomeRegister" (1) type_id: "some-register" replicated_register: { (2) value: "com.example.replicated.register.domain.SomeValue" (3) } } }; rpc Set(SetValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Register. 3 Specify the protobuf
type for the value of the Replicated Register. - Scala
-
src/main/proto/com/example/replicated/register/register_api.proto
service RegisterService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.register.domain.SomeRegister" (1) type_id: "some-register" replicated_register: { (2) value: "com.example.replicated.register.domain.SomeValue" (3) } } }; rpc Set(SetValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Register. 3 Specify the protobuf
type for the value of the Replicated Register.
The type for the value can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. A message type is being used for the value type in this example.
|
When implementing a Replicated Register entity, an initial or empty value needs to be defined by overriding the emptyValue
method:
- Java
-
src/main/java/com/example/replicated/register/domain/SomeRegister.java
@Override public SomeRegisterDomain.SomeValue emptyValue() { return SomeRegisterDomain.SomeValue.getDefaultInstance(); }
- Scala
-
src/main/scala/com/example/replicated/register/domain/SomeRegister.scala
override def emptyValue: SomeValue = SomeValue.defaultInstance
The value can be updated by calling the set
method on the current data object, and then triggering an update with the modified data object:
- Java
-
src/main/java/com/example/replicated/register/domain/SomeRegister.java
@Override public Effect<Empty> set( ReplicatedRegister<SomeRegisterDomain.SomeValue> register, SomeRegisterApi.SetValue command) { SomeRegisterDomain.SomeValue newValue = (1) SomeRegisterDomain.SomeValue.newBuilder().setSomeField(command.getValue()).build(); return effects() .update(register.set(newValue)) (2) .thenReply(Empty.getDefaultInstance()); }
1 Create a domain object for the new value. 2 Update the Replicated Register value with set
and trigger a replicated update by returning anEffect
witheffects().update
. - Scala
-
src/main/scala/com/example/replicated/register/domain/SomeRegister.scala
def set(currentData: ReplicatedRegister[SomeValue], setValue: register.SetValue): ReplicatedEntity.Effect[Empty] = { val someValue = SomeValue(setValue.value) (1) effects .update(currentData.set(someValue)) (2) .thenReply(Empty.defaultInstance) }
1 Create a domain object for the new value. 2 Update the Replicated Register value with set
and trigger a replicated update by returning anEffect
witheffects().update
.
The current value of a Replicated Register can be retrieved using get
:
- Java
-
src/main/java/com/example/replicated/register/domain/SomeRegister.java
@Override public Effect<SomeRegisterApi.CurrentValue> get( ReplicatedRegister<SomeRegisterDomain.SomeValue> register, SomeRegisterApi.GetValue command) { SomeRegisterDomain.SomeValue value = register.get(); (1) SomeRegisterApi.CurrentValue currentValue = (2) SomeRegisterApi.CurrentValue.newBuilder().setValue(value.getSomeField()).build(); return effects().reply(currentValue); }
1 Get the current value of a Replicated Register using get
.2 Convert from the domain object to the API object. - Scala
-
src/main/scala/com/example/replicated/register/domain/SomeRegister.scala
def get(currentData: ReplicatedRegister[SomeValue], getValue: register.GetValue): ReplicatedEntity.Effect[register.CurrentValue] = { val someValue = currentData() (1) effects.reply(register.CurrentValue(someValue.someField)) (2) }
1 Get the current value of a Replicated Register using apply
.2 Convert from the domain object to the API object.
The current value may not be the most up-to-date value when there are concurrent modifications. |
Replicated Set
A ReplicatedSet
ReplicatedSet
is a set of (serializable) values, where elements can be added or removed.
To configure a Replicated Entity with a Replicated Set, use the replicated_set
option when defining the proto
file:
- Java
-
src/main/proto/com/example/replicated/set/set_api.proto
service SetService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.set.domain.SomeSet" (1) type_id: "some-set" replicated_set: { (2) element: "string" (3) } } }; rpc Add(AddElement) returns (google.protobuf.Empty); rpc Remove(RemoveElement) returns (google.protobuf.Empty); rpc Get(GetElements) returns (CurrentElements); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Set. 3 Specify the protobuf
type for the elements of the Replicated Set. In this case, the scalar typestring
. - Scala
-
src/main/proto/com/example/replicated/set/set_api.proto
service SetService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.set.domain.SomeSet" (1) type_id: "some-set" replicated_set: { (2) element: "string" (3) } } }; rpc Add(AddElement) returns (google.protobuf.Empty); rpc Remove(RemoveElement) returns (google.protobuf.Empty); rpc Get(GetElements) returns (CurrentElements); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Set. 3 Specify the protobuf
type for the elements of the Replicated Set. In this case, the scalar typestring
.
The type for the elements can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. The string scalar type is being used for the element type in this example, which corresponds to the Java String class.
|
Care needs to be taken to ensure that the serialized values for elements in the set are stable. |
When implementing a Replicated Set entity, the state can be updated by calling the add
or remove
methods on the current data object, and then triggering an update with the modified data object:
- Java
-
src/main/java/com/example/replicated/set/domain/SomeSet.java
@Override public Effect<Empty> add(ReplicatedSet<String> set, SomeSetApi.AddElement command) { return effects() .update(set.add(command.getElement())) (1) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> remove(ReplicatedSet<String> set, SomeSetApi.RemoveElement command) { return effects() .update(set.remove(command.getElement())) (1) .thenReply(Empty.getDefaultInstance()); }
1 Modify the elements of the Replicated Set with add
orremove
and trigger a replicated update by returning anEffect
witheffects().update
. - Scala
-
src/main/scala/com/example/replicated/set/domain/SomeSet.scala
def add(currentData: ReplicatedSet[String], addElement: set.AddElement): ReplicatedEntity.Effect[Empty] = effects .update(currentData.add(addElement.element)) (1) .thenReply(Empty.defaultInstance) def remove(currentData: ReplicatedSet[String], removeElement: set.RemoveElement): ReplicatedEntity.Effect[Empty] = effects .update(currentData.remove(removeElement.element)) (1) .thenReply(Empty.defaultInstance)
1 Modify the elements of the Replicated Set with add
orremove
and trigger a replicated update by returning anEffect
witheffects.update
.
- Java
-
The
elements
method for Replicated Set returns a regularjava.util.Set
that can be used to iterate over the current elements:src/main/java/com/example/replicated/set/domain/SomeSet.java@Override public Effect<SomeSetApi.CurrentElements> get( ReplicatedSet<String> set, SomeSetApi.GetElements command) { List<String> elements = set.elements().stream() (1) .sorted() .collect(Collectors.toList()); SomeSetApi.CurrentElements currentElements = SomeSetApi.CurrentElements.newBuilder().addAllElements(elements).build(); return effects().reply(currentElements); }
1 Iterate over the current elements of a Replicated Set. - Scala
-
The
elements
method for Replicated Set returns a regular ScalaSet
that contains the current elements:src/main/scala/com/example/replicated/set/domain/SomeSet.scaladef get(currentData: ReplicatedSet[String], getElements: set.GetElements): ReplicatedEntity.Effect[set.CurrentElements] = effects.reply(set.CurrentElements(currentData.elements.toSeq)) (1)
1 Turn the Set
into aSeq
for the response.
The current value may not be the most up-to-date view of the set when there are concurrent modifications. |
Replicated Counter Map
A ReplicatedCounterMap
ReplicatedCounterMap
maps (serializable) keys to replicated counters, where each value can be incremented and decremented.
To configure a Replicated Entity with a Replicated Counter Map, use the replicated_counter_map
option when defining the proto
file.
- Java
-
src/main/proto/com/example/replicated/countermap/counter_map_api.proto
service CounterMapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.countermap.domain.SomeCounterMap" (1) type_id: "some-counter-map" replicated_counter_map: { (2) key: "string" (3) } } }; rpc Increase(IncreaseValue) returns (google.protobuf.Empty); rpc Decrease(DecreaseValue) returns (google.protobuf.Empty); rpc Remove(RemoveValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); rpc GetAll(GetAllValues) returns (CurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Counter Map. 3 Specify the protobuf
type for the keys of the Replicated Counter Map. In this case, the scalar typestring
. - Scala
-
src/main/proto/com/example/replicated/countermap/counter_map_api.proto
service CounterMapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.countermap.domain.SomeCounterMap" (1) type_id: "some-counter-map" replicated_counter_map: { (2) key: "string" (3) } } }; rpc Increase(IncreaseValue) returns (google.protobuf.Empty); rpc Decrease(DecreaseValue) returns (google.protobuf.Empty); rpc Remove(RemoveValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); rpc GetAll(GetAllValues) returns (CurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Counter Map. 3 Specify the protobuf
type for the keys of the Replicated Counter Map. In this case, the scalar typestring
.
The type for the key can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. The string scalar type is being used for the key type in this example, which corresponds to the Java String class.
|
When implementing a Replicated Counter Map entity, the value of an entry can be updated by calling the increment
or decrement
methods on the current data object, and then triggering an update with the modified data object. Entries can be removed from the map using the remove
method.
- Java
-
src/main/java/com/example/replicated/countermap/domain/SomeCounterMap.java
@Override public Effect<Empty> increase( ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.IncreaseValue command) { return effects() .update(counterMap.increment(command.getKey(), command.getValue())) (1) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> decrease( ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.DecreaseValue command) { return effects() .update(counterMap.decrement(command.getKey(), command.getValue())) (1) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> remove( ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.RemoveValue command) { return effects() .update(counterMap.remove(command.getKey())) (1) .thenReply(Empty.getDefaultInstance()); }
1 Modify the values of the Replicated Counter Map with increment
,decrement
, orremove
and trigger a replicated update by returning anEffect
witheffects().update
. - Scala
-
src/main/scala/com/example/replicated/countermap/domain/SomeCounterMap.scala
def increase(currentData: ReplicatedCounterMap[String], increaseValue: countermap.IncreaseValue): ReplicatedEntity.Effect[Empty] = effects .update(currentData.increment(increaseValue.key, increaseValue.value)) (1) .thenReply(Empty.defaultInstance) def decrease(currentData: ReplicatedCounterMap[String], decreaseValue: countermap.DecreaseValue): ReplicatedEntity.Effect[Empty] = effects .update(currentData.decrement(decreaseValue.key, decreaseValue.value)) (1) .thenReply(Empty.defaultInstance) def remove(currentData: ReplicatedCounterMap[String], removeValue: countermap.RemoveValue): ReplicatedEntity.Effect[Empty] = effects .update(currentData.remove(removeValue.key)) (1) .thenReply(Empty.defaultInstance)
1 Modify the values of the Replicated Counter Map with increment
,decrement
, orremove
and trigger a replicated update by returning anEffect
witheffects.update
.
Individual counters in a Replicated Counter Map can be accessed, or the set of keys can be used to iterate over all counters.
- Java
-
src/main/java/com/example/replicated/countermap/domain/SomeCounterMap.java
@Override public Effect<SomeCounterMapApi.CurrentValue> get( ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.GetValue command) { long value = counterMap.get(command.getKey()); (1) SomeCounterMapApi.CurrentValue currentValue = SomeCounterMapApi.CurrentValue.newBuilder().setValue(value).build(); return effects().reply(currentValue); } @Override public Effect<SomeCounterMapApi.CurrentValues> getAll( ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.GetAllValues command) { Map<String, Long> values = counterMap.keySet().stream() (2) .map(key -> new SimpleEntry<>(key, counterMap.get(key))) .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)); SomeCounterMapApi.CurrentValues currentValues = SomeCounterMapApi.CurrentValues.newBuilder().putAllValues(values).build(); return effects().reply(currentValues); }
1 Get the current counter value for a key using get
.2 Iterate over the current entries of a Replicated Counter Map using keySet
. - Scala
-
src/main/scala/com/example/replicated/countermap/domain/SomeCounterMap.scala
def get(currentData: ReplicatedCounterMap[String], getValue: countermap.GetValue): ReplicatedEntity.Effect[countermap.CurrentValue] = { val value = currentData(getValue.key) (1) effects.reply(countermap.CurrentValue(value)) } def getAll(currentData: ReplicatedCounterMap[String], getAllValues: countermap.GetAllValues): ReplicatedEntity.Effect[countermap.CurrentValues] = { val keyValues = currentData.keySet.map { key => key -> currentData(key) }.toMap (2) effects.reply(countermap.CurrentValues(keyValues)) }
1 Get the current counter value for a key using apply
.2 Iterate over the current entries of a Replicated Counter Map using keySet
.
The get apply method returns a default value of 0L if the map does not contain the key.
|
Entries may not contain the most up-to-date values for counters when there are concurrent modifications. |
Replicated Register Map
A ReplicatedRegisterMap
ReplicatedRegisterMap
maps (serializable) keys to replicated registers of (serializable) values. Updates to values are replicated using last-write-wins semantics, where concurrent modifications are resolved by using the update with the highest timestamp.
To configure a Replicated Entity with a Replicated Register Map, use the replicated_register_map
option when defining the proto
file.
- Java
-
src/main/proto/com/example/replicated/registermap/register_map_api.proto
service RegisterMapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.registermap.domain.SomeRegisterMap" (1) type_id: "some-register-map" replicated_register_map: { (2) key: "com.example.replicated.registermap.domain.SomeKey" (3) value: "com.example.replicated.registermap.domain.SomeValue" (4) } } }; rpc Set(SetValue) returns (google.protobuf.Empty); rpc Remove(RemoveValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); rpc GetAll(GetAllValues) returns (CurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Register Map. 3 Specify the protobuf
type for the keys of the Replicated Register Map.4 Specify the protobuf
type for the values of the Replicated Register Map. - Scala
-
src/main/proto/com/example/replicated/registermap/domain/register_map_domain.proto
service RegisterMapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.registermap.domain.SomeRegisterMap" (1) type_id: "some-register-map" replicated_register_map: { (2) key: "com.example.replicated.registermap.domain.SomeKey" (3) value: "com.example.replicated.registermap.domain.SomeValue" (4) } } }; rpc Set(SetValue) returns (google.protobuf.Empty); rpc Remove(RemoveValue) returns (google.protobuf.Empty); rpc Get(GetValue) returns (CurrentValue); rpc GetAll(GetAllValues) returns (CurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Register Map. 3 Specify the protobuf
type for the keys of the Replicated Register Map.4 Specify the protobuf
type for the values of the Replicated Register Map.
The type for the key or value can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. Message types are being used for both the key and value types in this example.
|
When implementing a Replicated Register Map entity, the value of an entry can be updated by calling the setValue
method on the current data object, and then triggering an update with the modified data object. Entries can be removed from the map using the remove
method.
- Java
-
src/main/java/com/example/replicated/registermap/domain/SomeRegisterMap.java
@Override public Effect<Empty> set( ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue> registerMap, SomeRegisterMapApi.SetValue command) { SomeRegisterMapDomain.SomeKey key = (1) SomeRegisterMapDomain.SomeKey.newBuilder() .setSomeField(command.getKey().getField()) .build(); SomeRegisterMapDomain.SomeValue value = (2) SomeRegisterMapDomain.SomeValue.newBuilder() .setSomeField(command.getValue().getField()) .build(); return effects() .update(registerMap.setValue(key, value)) (3) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> remove( ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue> registerMap, SomeRegisterMapApi.RemoveValue command) { SomeRegisterMapDomain.SomeKey key = (1) SomeRegisterMapDomain.SomeKey.newBuilder() .setSomeField(command.getKey().getField()) .build(); return effects() .update(registerMap.remove(key)) (3) .thenReply(Empty.getDefaultInstance()); }
1 Create a domain object for the key. 2 Create a domain object for the value. 3 Modify the values of the Replicated Register Map with setValue
, orremove
and trigger a replicated update by returning anEffect
witheffects().update
. - Scala
-
src/main/scala/com/example/replicated/registermap/domain/SomeRegisterMap.scala
def set(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], setValue: registermap.SetValue): ReplicatedEntity.Effect[Empty] = { val key = SomeKey(setValue.getKey.field) (1) val value = SomeValue(setValue.getValue.field) (2) effects .update(currentData.setValue(key,value)) (3) .thenReply(Empty.defaultInstance) } def remove(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], removeValue: registermap.RemoveValue): ReplicatedEntity.Effect[Empty] = { val key = SomeKey(removeValue.getKey.field) (1) effects .update(currentData.remove(key)) (3) .thenReply(Empty.defaultInstance) }
1 Create a domain object for the key. 2 Create a domain object for the value. 3 Modify the values of the Replicated Register Map with setValue
, orremove
and trigger a replicated update by returning anEffect
witheffects.update
.
Individual registers in a Replicated Register Map can be accessed, or the set of keys can be used to iterate over all registers.
- Java
-
src/main/java/com/example/replicated/registermap/domain/SomeRegisterMap.java
@Override public Effect<SomeRegisterMapApi.CurrentValue> get( ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue> registerMap, SomeRegisterMapApi.GetValue command) { SomeRegisterMapDomain.SomeKey key = (1) SomeRegisterMapDomain.SomeKey.newBuilder() .setSomeField(command.getKey().getField()) .build(); Optional<SomeRegisterMapDomain.SomeValue> maybeValue = registerMap.getValue(key); (2) SomeRegisterMapApi.CurrentValue currentValue = SomeRegisterMapApi.CurrentValue.newBuilder() .setValue( SomeRegisterMapApi.Value.newBuilder() .setField( maybeValue.map(SomeRegisterMapDomain.SomeValue::getSomeField).orElse(""))) .build(); return effects().reply(currentValue); } @Override public Effect<SomeRegisterMapApi.CurrentValues> getAll( ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue> registerMap, SomeRegisterMapApi.GetAllValues command) { List<SomeRegisterMapApi.CurrentValue> allValues = registerMap.keySet().stream() (3) .map( key -> { String value = registerMap .getValue(key) .map(SomeRegisterMapDomain.SomeValue::getSomeField) .orElse(""); return SomeRegisterMapApi.CurrentValue.newBuilder() .setKey(SomeRegisterMapApi.Key.newBuilder().setField(key.getSomeField())) .setValue(SomeRegisterMapApi.Value.newBuilder().setField(value)) .build(); }) .collect(Collectors.toList()); SomeRegisterMapApi.CurrentValues currentValues = SomeRegisterMapApi.CurrentValues.newBuilder().addAllValues(allValues).build(); return effects().reply(currentValues); }
1 Create a domain object for the key. 2 Get the current register value for a key using getValue
.3 Iterate over the current entries of a Replicated Register Map using keySet
.The getValue
method returns anOptional
for when the map does not contain the given key. - Scala
-
src/main/scala/com/example/replicated/registermap/domain/SomeRegisterMap.scala
def get(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], getValue: registermap.GetValue): ReplicatedEntity.Effect[registermap.CurrentValue] = { val key = SomeKey(getValue.getKey.field) (1) val maybeValue = currentData.get(key) (2) val currentValue = registermap.CurrentValue(getValue.key, maybeValue.map(v => registermap.Value(v.someField))) effects.reply(currentValue) } def getAll(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], getAllValues: registermap.GetAllValues): ReplicatedEntity.Effect[registermap.CurrentValues] = { val allData = currentData.keySet.map { key => (3) val value = currentData.get(key).map(v => registermap.Value(v.someField)) registermap.CurrentValue(Some(registermap.Key(key.someField)), value) }.toSeq effects.reply(registermap.CurrentValues(allData)) }
1 Create a domain object for the key. 2 Get the current register value for a key using get
.3 Iterate over the current entries of a Replicated Register Map using keySet
.The get
method returns anOption
for when the map does not contain the given key.
Entries may not contain the most up-to-date values for registers when there are concurrent modifications. |
Replicated Multi-Map
A ReplicatedMultiMap
ReplicatedMultiMap
maps (serializable) keys to replicated sets of (serializable) values, providing a multi-map interface that can associate multiple values with each key.
To configure a Replicated Entity with a Replicated Multi-Map, use the replicated_multi_map
option when defining the proto
file.
- Java
-
src/main/proto/com/example/replicated/multimap/multi_map_api.proto
service MultiMapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.multimap.domain.SomeMultiMap" (1) type_id: "some-multi-map" replicated_multi_map: { (2) key: "string" (3) value: "double" (4) } } }; rpc Put(PutValue) returns (google.protobuf.Empty); rpc PutAll(PutAllValues) returns (google.protobuf.Empty); rpc Remove(RemoveValue) returns (google.protobuf.Empty); rpc RemoveAll(RemoveAllValues) returns (google.protobuf.Empty); rpc Get(GetValues) returns (CurrentValues); rpc GetAll(GetAllValues) returns (AllCurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Multi-Map. 3 Specify the protobuf
type for the keys of the Replicated Multi-Map. In this case, the scalar typestring
.4 Specify the protobuf
type for the values of the Replicated Multi-Map. In this case, the scalar typedouble
. - Scala
-
src/main/proto/com/example/replicated/multimap/domain/multi_map_domain.proto
service MultiMapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.multimap.domain.SomeMultiMap" (1) type_id: "some-multi-map" replicated_multi_map: { (2) key: "string" (3) value: "double" (4) } } }; rpc Put(PutValue) returns (google.protobuf.Empty); rpc PutAll(PutAllValues) returns (google.protobuf.Empty); rpc Remove(RemoveValue) returns (google.protobuf.Empty); rpc RemoveAll(RemoveAllValues) returns (google.protobuf.Empty); rpc Get(GetValues) returns (CurrentValues); rpc GetAll(GetAllValues) returns (AllCurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Multi-Map. 3 Specify the protobuf
type for the keys of the Replicated Multi-Map. In this case, the scalar typestring
.4 Specify the protobuf
type for the values of the Replicated Multi-Map. In this case, the scalar typedouble
.
The type for the key or value can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. The string scalar type is being used for the key type and the double scalar type for the value type in this example, which correspond to the Java types for String and Double .
|
When implementing a Replicated Multi-Map entity, the values of an entry can be updated by calling the put
, putAll
, or remove
methods on the current data object, and then triggering an update with the modified data object. Entries can be removed entirely from the map using the removeAll
method.
- Java
-
src/main/java/com/example/replicated/multimap/domain/SomeMultiMap.java
@Override public Effect<Empty> put( ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.PutValue command) { return effects() .update(multiMap.put(command.getKey(), command.getValue())) (1) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> putAll( ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.PutAllValues command) { return effects() .update(multiMap.putAll(command.getKey(), command.getValuesList())) (1) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> remove( ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.RemoveValue command) { return effects() .update(multiMap.remove(command.getKey(), command.getValue())) (1) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> removeAll( ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.RemoveAllValues command) { return effects() .update(multiMap.removeAll(command.getKey())) (1) .thenReply(Empty.getDefaultInstance()); }
1 Modify the values of the Replicated Multi-Map with put
,putAll
,remove
, orremoveAll
and trigger a replicated update by returning anEffect
witheffects().update
. - Scala
-
src/main/scala/com/example/replicated/multimap/domain/SomeMultiMap.scala
def put(currentData: ReplicatedMultiMap[String, Double], putValue: multimap.PutValue): ReplicatedEntity.Effect[Empty] = effects .update(currentData.put(putValue.key, putValue.value)) (1) .thenReply(Empty.defaultInstance) def putAll(currentData: ReplicatedMultiMap[String, Double], putAllValues: multimap.PutAllValues): ReplicatedEntity.Effect[Empty] = effects .update(currentData.putAll(putAllValues.key, putAllValues.values)) (1) .thenReply(Empty.defaultInstance) def remove(currentData: ReplicatedMultiMap[String, Double], removeValue: multimap.RemoveValue): ReplicatedEntity.Effect[Empty] = effects .update(currentData.remove(removeValue.key, removeValue.value)) (1) .thenReply(Empty.defaultInstance) def removeAll(currentData: ReplicatedMultiMap[String, Double], removeAllValues: multimap.RemoveAllValues): ReplicatedEntity.Effect[Empty] = effects .update(currentData.removeAll(removeAllValues.key)) (1) .thenReply(Empty.defaultInstance)
1 Modify the values of the Replicated Multi-Map with put
,putAll
,remove
, orremoveAll
and trigger a replicated update by returning anEffect
witheffects().update
.
Individual entries in a Replicated Multi-Map can be accessed, or the set of keys can be used to iterate over all value sets.
- Java
-
src/main/java/com/example/replicated/multimap/domain/SomeMultiMap.java
@Override public Effect<SomeMultiMapApi.CurrentValues> get( ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.GetValues command) { Set<Double> values = multiMap.get(command.getKey()); (1) SomeMultiMapApi.CurrentValues currentValues = SomeMultiMapApi.CurrentValues.newBuilder() .addAllValues(values.stream().sorted().collect(Collectors.toList())) .build(); return effects().reply(currentValues); } @Override public Effect<SomeMultiMapApi.AllCurrentValues> getAll( ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.GetAllValues command) { List<SomeMultiMapApi.CurrentValues> allValues = multiMap.keySet().stream() (2) .map( key -> { List<Double> values = multiMap.get(key).stream().sorted().collect(Collectors.toList()); return SomeMultiMapApi.CurrentValues.newBuilder() .setKey(key) .addAllValues(values) .build(); }) .collect(Collectors.toList()); SomeMultiMapApi.AllCurrentValues allCurrentValues = SomeMultiMapApi.AllCurrentValues.newBuilder().addAllValues(allValues).build(); return effects().reply(allCurrentValues); }
1 Get the current set of values for a key using get
.2 Iterate over the current entries of a Replicated Multi-Map using keySet
. - Scala
-
src/main/scala/com/example/replicated/multimap/domain/SomeMultiMap.scala
def get(currentData: ReplicatedMultiMap[String, Double], getValues: multimap.GetValues): ReplicatedEntity.Effect[multimap.CurrentValues] = { val values = currentData.get(getValues.key) (1) effects .reply(multimap.CurrentValues(getValues.key, values.toSeq)) } /** Command handler for "GetAll". */ def getAll(currentData: ReplicatedMultiMap[String, Double], getAllValues: multimap.GetAllValues): ReplicatedEntity.Effect[multimap.AllCurrentValues] = { val currentValues = currentData.keySet.map { key => (2) val values = currentData.get(key) multimap.CurrentValues(key, values.toSeq) } effects.reply(multimap.AllCurrentValues(currentValues.toSeq)) }
1 Get the current set of values for a key using get
.2 Iterate over the current entries of a Replicated Multi-Map using keySet
.
Entries may not contain the most up-to-date values when there are concurrent modifications. |
Replicated Map
A ReplicatedMap
ReplicatedMap
maps (serializable) keys to any other Replicated Data types, allowing a heterogeneous map where values can be of any Replicated Data type.
Prefer to use the specialized replicated maps (Replicated Counter Map, Replicated Register Map, or Replicated Multi-Map) whenever the values of the map are of the same type — counters, registers, or sets. |
To configure a Replicated Entity with a (heterogeneous) Replicated Map, use the replicated_map
option when defining the proto
file.
- Java
-
src/main/proto/com/example/replicated/map/map_api.proto
service MapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.map.domain.SomeMap" (1) type_id: "some-map" replicated_map: { (2) key: "com.example.replicated.map.domain.SomeKey" (3) } } }; rpc IncreaseFoo(IncreaseFooValue) returns (google.protobuf.Empty); rpc DecreaseFoo(DecreaseFooValue) returns (google.protobuf.Empty); rpc SetBar(SetBarValue) returns (google.protobuf.Empty); rpc AddBaz(AddBazValue) returns (google.protobuf.Empty); rpc RemoveBaz(RemoveBazValue) returns (google.protobuf.Empty); rpc Get(GetValues) returns (CurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Map. 3 Specify the protobuf
type for the keys of the map. - Scala
-
src/main/proto/com/example/replicated/map/map_api.proto
service MapService { option (kalix.codegen) = { replicated_entity: { name: "com.example.replicated.map.domain.SomeMap" (1) type_id: "some-map" replicated_map: { (2) key: "com.example.replicated.map.domain.SomeKey" (3) } } }; rpc IncreaseFoo(IncreaseFooValue) returns (google.protobuf.Empty); rpc DecreaseFoo(DecreaseFooValue) returns (google.protobuf.Empty); rpc SetBar(SetBarValue) returns (google.protobuf.Empty); rpc AddBaz(AddBazValue) returns (google.protobuf.Empty); rpc RemoveBaz(RemoveBazValue) returns (google.protobuf.Empty); rpc Get(GetValues) returns (CurrentValues); }
1 The fully-qualified class name for this replicated entity. 2 Specify the Replicated Data type as a Replicated Map. 3 Specify the protobuf
type for the keys of the map.
The type for the key can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. A message type is being used for the key type in this example.
|
The value type for a Replicated Map is not specified for code-generation, and will be set to ReplicatedData for a heterogeneous map (a Replicated Map that contains different types of Replicated Data values).
|
When implementing a Replicated Map entity, the replicated data for an entry can be updated by retrieving the data value using the get
or getOrElse
methods, updating values using the update
method, and then triggering an update effect with the modified Replicated Map. Entries can be removed from the map using the remove
method.
There are also accessors for each of the Replicated Data types to make a heterogeneous map easier to use, such as getReplicatedCounter
or getReplicatedRegister
. If a key is not present in the map, these will return an empty value for the associated Replicated Data type.
- Java
-
src/main/java/com/example/replicated/map/domain/SomeMap.java
@Override public Effect<Empty> increaseFoo( ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.IncreaseFooValue command) { ReplicatedCounter foo = map.getReplicatedCounter(FOO_KEY); (1) return effects() .update(map.update(FOO_KEY, foo.increment(command.getValue()))) (2) (3) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> decreaseFoo( ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.DecreaseFooValue command) { ReplicatedCounter foo = map.getReplicatedCounter(FOO_KEY); (1) return effects() .update(map.update(FOO_KEY, foo.decrement(command.getValue()))) (2) (3) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> setBar( ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.SetBarValue command) { ReplicatedRegister<String> bar = map.getReplicatedRegister(BAR_KEY); (1) return effects() .update(map.update(BAR_KEY, bar.set(command.getValue()))) (2) (3) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> addBaz( ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.AddBazValue command) { ReplicatedSet<String> baz = map.getReplicatedSet(BAZ_KEY); (1) return effects() .update(map.update(BAZ_KEY, baz.add(command.getValue()))) (2) (3) .thenReply(Empty.getDefaultInstance()); } @Override public Effect<Empty> removeBaz( ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.RemoveBazValue command) { ReplicatedSet<String> baz = map.getReplicatedSet(BAZ_KEY); (1) return effects() .update(map.update(BAZ_KEY, baz.remove(command.getValue()))) (2) (3) .thenReply(Empty.getDefaultInstance()); }
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type. 2 Modify the Replicated Data value using its modifying methods and update
on the Replicated Map.3 Trigger a replicated update by returning an Effect
witheffects().update
. - Scala
-
src/main/scala/com/example/replicated/map/domain/SomeMap.scala
def increaseFoo(currentData: ReplicatedMap[SomeKey, ReplicatedData], increaseFooValue: map.IncreaseFooValue): ReplicatedEntity.Effect[Empty] = { val foo = currentData.getReplicatedCounter(FooKey) (1) effects .update(currentData.update(FooKey, foo.increment(increaseFooValue.value)))(2) (3) .thenReply(Empty.defaultInstance) } def decreaseFoo(currentData: ReplicatedMap[SomeKey, ReplicatedData], decreaseFooValue: map.DecreaseFooValue): ReplicatedEntity.Effect[Empty] = { val foo = currentData.getReplicatedCounter(FooKey) (1) effects .update(currentData.update(FooKey, foo.decrement(decreaseFooValue.value))) (2) (3) .thenReply(Empty.defaultInstance) } def setBar(currentData: ReplicatedMap[SomeKey, ReplicatedData], setBarValue: map.SetBarValue): ReplicatedEntity.Effect[Empty] = { val bar: ReplicatedRegister[String] = currentData.getReplicatedRegister(BarKey) effects .update(currentData.update(BarKey, bar.set(setBarValue.value))) .thenReply(Empty.defaultInstance) } def addBaz(currentData: ReplicatedMap[SomeKey, ReplicatedData], addBazValue: map.AddBazValue): ReplicatedEntity.Effect[Empty] = { val baz: ReplicatedSet[String] = currentData.getReplicatedSet(BazKey) (1) effects .update(currentData.update(BazKey, baz.add(addBazValue.value))) (2) (3) .thenReply(Empty.defaultInstance) } def removeBaz(currentData: ReplicatedMap[SomeKey, ReplicatedData], removeBazValue: map.RemoveBazValue): ReplicatedEntity.Effect[Empty] = { val baz: ReplicatedSet[String] = currentData.getReplicatedSet(BazKey) (1) effects .update(currentData.update(BazKey, baz.remove(removeBazValue.value))) (2) (3) .thenReply(Empty.defaultInstance) }
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type. 2 Modify the Replicated Data value using its modifying methods and update
on the Replicated Map.3 Trigger a replicated update by returning an Effect
witheffects.update
.
Individual Replicated Data objects in the Replicated Map can also be accessed for reading the current values.
- Java
-
src/main/java/com/example/replicated/map/domain/SomeMap.java
@Override public Effect<SomeMapApi.CurrentValues> get( ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.GetValues command) { ReplicatedCounter foo = map.getReplicatedCounter(FOO_KEY); (1) ReplicatedRegister<String> bar = map.getReplicatedRegister(BAR_KEY, () -> ""); (1) ReplicatedSet<String> baz = map.getReplicatedSet(BAZ_KEY); (1) SomeMapApi.CurrentValues currentValues = SomeMapApi.CurrentValues.newBuilder() .setFoo(foo.getValue()) .setBar(bar.get()) .addAllBaz(baz.elements().stream().sorted().collect(Collectors.toList())) .build(); return effects().reply(currentValues); }
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type. - Scala
-
src/main/scala/com/example/replicated/map/domain/SomeMap.scala
def get(currentData: ReplicatedMap[SomeKey, ReplicatedData], getValues: map.GetValues): ReplicatedEntity.Effect[map.CurrentValues] = { val foo = currentData.getReplicatedCounter(FooKey) (1) val bar: ReplicatedRegister[String] = currentData.getReplicatedRegister(BarKey, () => "") (1) val baz: ReplicatedSet[String] = currentData.getReplicatedSet(BazKey) (1) val resp = map.CurrentValues(foo.value, bar(), baz.elements.toSeq) effects.reply(resp) }
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type.
Entries may not contain the most up-to-date values when there are concurrent modifications. |
All objects used within Replicated Data types - as keys, values, or elements - must be immutable, and their serialized form must be stable. Kalix uses the serialized form of these values to track changes in Replicated Sets or Maps. If the same value serializes to different bytes on different occasions, they will be treated as different keys, values, or elements in a Replicated Set or Map. This is particularly relevant when using Protocol Buffers ( For the rest of the protobuf specification, while no guarantees are made on the stability by the protobuf specification itself, the Java libraries do produce stable orderings for message fields and repeated fields. But care should be taken when changing the protobuf structure of any types used within Replicated Data objects—many changes that are backwards compatible from a protobuf standpoint do not necessarily translate into stable serializations. |