Implementing Replicated Entities

Replicated Entities distribute state using a conflict-free replicated data type (CRDT). Data is shared across multiple instances of a Replicated Entity and is eventually consistent to provide high availability with low latency. The underlying CRDT semantics allow Replicated Entity instances to update their state independently and concurrently and without coordination. The state changes will always converge without conflicts, but note that with the state being eventually consistent, reading the current data may return an out-of-date value.

Kalix needs to serialize the data to replicate, and this is done with Protocol Buffers using protobuf types. While Protocol Buffers are the recommended format for state, we recommend that you do not use your service’s public protobuf messages in the replicated data. This may introduce some overhead to convert from one type to the other, but allows the service public interface logic to evolve independently of the data format, which should be private.

The steps necessary to implement a Replicated Entity include:

  1. Defining the API and domain objects in .proto files.

  2. Implementing behavior in command handlers.

  3. Creating and initializing the Replicated Entity.

The sections on this page walk through these steps using a shopping cart service as an example.

Defining the proto files

Our Replicated Entity example is a shopping cart service.

The following shoppingcart_domain.proto file defines our "Shopping Cart" Replicated Entity. The entity manages line items of a cart and stores these as a Replicated Counter Map, mapping from each item’s product details to its quantity. The counter for each item can be incremented independently in separate Replicated Entity instances and will converge to a total quantity.

Java
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// The messages and data that will be replicated for the shopping cart.

syntax = "proto3";

package com.example.shoppingcart.domain; (1)

option java_outer_classname = "ShoppingCartDomain"; (2)

message Product {
  string id = 1;
  string name = 2;
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.domain.
2 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartDomain.
Scala
src/main/proto/com/example/shoppingcart/domain/shoppingcart_domain.proto
// The messages and data that will be replicated for the shopping cart.

syntax = "proto3";

package com.example.shoppingcart.domain; (1)

message Product {
  string id = 1;
  string name = 2;
}
1 Any classes generated from this protobuf file will be in the package com.example.shoppingcart.domain.
Each Replicated Entity is associated with one underlying Replicated Data type. Replicated Data types that are generic, accepting type parameters for key, value, or element types, are used with protobuf messages and can represent structured data. In this shopping cart example, the keys of the counter map are products that have an id and name.

The shoppingcart_api.proto file defines the commands we can send to the shopping cart service to manipulate or access the cart’s state. They make up the service API:

Java
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the Shopping Cart Replicated Entity.

syntax = "proto3";

package com.example.shoppingcart;  (1)

import "google/protobuf/empty.proto"; (2)
import "kalix/annotations.proto";
import "google/api/annotations.proto";

option java_outer_classname = "ShoppingCartApi";  (3)

message AddLineItem { (4)
  string cart_id = 1 [(kalix.field).id = true];  (5)
  string product_id = 2;
  string name = 3;
  int32 quantity = 4;
}

message RemoveLineItem {
  string cart_id = 1 [(kalix.field).id = true];
  string product_id = 2;
  string name = 3;
}

message GetShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message RemoveShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message LineItem {
  string product_id = 1;
  string name = 2;
  int64 quantity = 3;
}

message Cart {  (6)
  repeated LineItem items = 1;
}

service ShoppingCartService {  (7)
  option (kalix.codegen) = { (8)
    replicated_entity: { (9)
      name: "com.example.shoppingcart.domain.ShoppingCart" (10)
      type_id: "shopping-cart" (11)
      replicated_counter_map: { (12)
        key: "com.example.shoppingcart.domain.Product" (13)
      }
    }
  };

  rpc AddItem (AddLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/add"
      body: "*"
    };
  }

  rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/{product_id}/remove"
    };
  }

  rpc GetCart (GetShoppingCart) returns (Cart) {
    option (google.api.http) = {
      get: "/carts/{cart_id}"
      additional_bindings: {
        get: "/carts/{cart_id}/items"
        response_body: "items"
      }
    };
  }

  rpc RemoveCart (RemoveShoppingCart) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/carts/{cart_id}/remove";
  }
}
1 Any classes generated from this protobuf file will be in the Java package com.example.shoppingcart.
2 Import the Kalix protobuf annotations, or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class ShoppingCartApi.
4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
5 Every Command must contain a string field that contains the entity ID and is marked with the (kalix.field).id option.
6 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
7 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
8 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin.
9 replicated_entity indicates that we want the codegen to generate a Replicated Entity for this service.
10 name denotes the base name for the Replicated Entity. The code-generation will create initial sources ShoppingCart and ShoppingCartIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
11 entity_type is a unique identifier for data replication. The entity name may be changed even after data has been created, the entity_type can’t be changed.
12 replicated_counter_map describes the Replicated Data type for this entity.
13 key points to the protobuf message representing the counter map’s key type.
Scala
src/main/proto/com/example/shoppingcart/shoppingcart_api.proto
// This is the public API offered by the Shopping Cart Replicated Entity.

syntax = "proto3";

package com.example.shoppingcart;  (1)

import "google/protobuf/empty.proto"; (2)
import "kalix/annotations.proto";
import "google/api/annotations.proto";

message AddLineItem { (3)
  string cart_id = 1 [(kalix.field).id = true];  (4)
  string product_id = 2;
  string name = 3;
  int32 quantity = 4;
}

message RemoveLineItem {
  string cart_id = 1 [(kalix.field).id = true];
  string product_id = 2;
  string name = 3;
}

message GetShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message RemoveShoppingCart {
  string cart_id = 1 [(kalix.field).id = true];
}

message LineItem {
  string product_id = 1;
  string name = 2;
  int64 quantity = 3;
}

message Cart {  (5)
  repeated LineItem items = 1;
}

service ShoppingCartService {  (6)
  option (kalix.codegen) = { (7)
    replicated_entity: { (8)
      name: "com.example.shoppingcart.domain.ShoppingCart" (9)
      type_id: "shopping-cart" (10)
      replicated_counter_map: { (11)
        key: "com.example.shoppingcart.domain.Product" (12)
      }
    }
  };

  rpc AddItem (AddLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/add"
      body: "*"
    };
  }

  rpc RemoveItem (RemoveLineItem) returns (google.protobuf.Empty) {
    option (google.api.http) = {
      post: "/cart/{cart_id}/items/{product_id}/remove"
    };
  }

  rpc GetCart (GetShoppingCart) returns (Cart) {
    option (google.api.http) = {
      get: "/carts/{cart_id}"
      additional_bindings: {
        get: "/carts/{cart_id}/items"
        response_body: "items"
      }
    };
  }

  rpc RemoveCart (RemoveShoppingCart) returns (google.protobuf.Empty) {
    option (google.api.http).post = "/carts/{cart_id}/remove";
  }
}
1 Any classes generated from this protobuf file will be in the package com.example.shoppingcart.
2 Import the Kalix protobuf annotations, or options.
3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data.
4 Every Command must contain a string field that contains the entity ID and is marked with the (kalix.field).id option.
5 Messages describe the return value for our API. For methods that don’t have return values, we use google.protobuf.Empty.
6 The service descriptor shows the API of the entity. It lists the methods a client can use to issue Commands to the entity.
7 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin.
8 replicated_entity indicates that we want the codegen to generate a Replicated Entity for this service.
9 name denotes the base name for the Replicated Entity. The code-generation will create initial sources ShoppingCart and ShoppingCartIntegrationTest. Once these files exist, they are not overwritten, so you can freely add logic to them.
10 entity_type is a unique identifier for data replication. The entity name may be changed even after data has been created, the entity_type can’t be changed.
11 replicated_counter_map describes the Replicated Data type for this entity.
12 key points to the protobuf message representing the counter map’s key type.

Replicated Entity’s Effect API

The Replicated Entity’s Effect defines the operations that Kalix should perform when an incoming command is handled by a Replicated Entity.

A Replicated Entity Effect can either:

  • update the entity state and send a reply to the caller

  • directly reply to the caller if the command is not requesting any state change

  • rejected the command by returning an error

  • instruct Kalix to delete the entity

Implementing behavior

A Replicated Entity implementation is a Java class where you define how each command is handled. The class ShoppingCart gets generated for us based on the shoppingcart_api.proto and shoppingcart_domain.proto definitions. Once the file ShoppingCart.java ShoppingCart.scala exist, it is not overwritten, so you can freely add logic to it. ShoppingCart extends the generated class AbstractShoppingCart which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors. AbstractShoppingCart contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the ShoppingCart class and you have to implement the methods required by AbstractShoppingCart.

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
public class ShoppingCart extends AbstractShoppingCart { (1)
1 Extends the generated AbstractShoppingCart, which extends ReplicatedCounterMapEntity new tab.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
class ShoppingCart(context: ReplicatedEntityContext) extends AbstractShoppingCart { (1)
1 Extends the generated AbstractShoppingCart, which extends ReplicatedCounterMapEntity new tab.

We need to implement all methods our Replicated Entity offers as command handlers.

The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.

Command handlers are implemented in the ShoppingCart class as methods that override abstract methods from AbstractShoppingCart. The methods take the current data value as the first parameter and the request message as the second parameter. They return an Effect, which describes next processing actions, such as updating state and sending a reply.

When adding or changing the rpc definitions, including name, parameter and return messages, in the .proto files the corresponding methods are regenerated in the abstract class (AbstractShoppingCart). This means that the Java compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.

Updating state

In the example below, the AddItem service call uses the request message AddLineItem. It returns an Effect to update the underlying data and then send a reply.

The only way for a command handler to modify the underlying data for a Replicated Entity is by returning an update effect with an updated Replicated Data object. Note that Replicated Data objects are immutable, with each modifying method returning a new instance of the Replicated Data type.
Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<Empty> addItem(
    ReplicatedCounterMap<ShoppingCartDomain.Product> cart,
    ShoppingCartApi.AddLineItem addLineItem) {

  if (addLineItem.getQuantity() <= 0) { (1)
    return effects().error("Quantity for item " + addLineItem.getProductId() + " must be greater than zero.");
  }

  ShoppingCartDomain.Product product = (2)
      ShoppingCartDomain.Product.newBuilder()
          .setId(addLineItem.getProductId())
          .setName(addLineItem.getName())
          .build();

  ReplicatedCounterMap<ShoppingCartDomain.Product> updatedCart = (3)
      cart.increment(product, addLineItem.getQuantity());

  return effects()
      .update(updatedCart) (4)
      .thenReply(Empty.getDefaultInstance()); (5)
}
1 The validation ensures quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect with effects().error.
2 From the current incoming AddLineItem we create a new Product object to represent the item’s key in the counter map.
3 We increment the counter for this item in the cart. A new counter will be created if the cart doesn’t contain this item already.
4 We update the underlying data for the Replicated Entity by returning an Effect with effects().update and the updated data object.
5 An acknowledgment that the command was successfully processed is sent with a reply message.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
def addItem(
    cart: ReplicatedCounterMap[Product],
    addLineItem: shoppingcart.AddLineItem): ReplicatedEntity.Effect[Empty] = {
  if (addLineItem.quantity <= 0) { (1)
    effects.error(s"Quantity for item ${addLineItem.productId} must be greater than zero.")
  } else {
    val product = Product(addLineItem.productId, addLineItem.name) (2)
    val updatedCart = cart.increment(product, addLineItem.quantity) (3)

    effects
      .update(updatedCart) (4)
      .thenReply(Empty.defaultInstance) (5)
  }
}
1 The validation ensures quantity of items added is greater than zero and it fails calls with illegal values by returning an Effect with effects.error.
2 From the current incoming AddLineItem we create a new Product object to represent the item’s key in the counter map.
3 We increment the counter for this item in the cart. A new counter will be created if the cart doesn’t contain this item already.
4 We update the underlying data for the Replicated Entity by returning an Effect with effects.update and the updated data object.
5 An acknowledgment that the command was successfully processed is sent with a reply message.

Retrieving state

The following example shows the implementation of the GetCart command handler. This command handler is a read-only command handler—​it doesn’t update the state, it just returns it.

The state of Replicated Entities is eventually consistent. An individual Replicated Entity instance may have an out-of-date value, if there are concurrent modifications made by another instance.
Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<ShoppingCartApi.Cart> getCart(
    ReplicatedCounterMap<ShoppingCartDomain.Product> cart, (1)
    ShoppingCartApi.GetShoppingCart getShoppingCart) {

  List<ShoppingCartApi.LineItem> allItems =
      cart.keySet().stream()
          .map(
              product ->
                  ShoppingCartApi.LineItem.newBuilder()
                      .setProductId(product.getId())
                      .setName(product.getName())
                      .setQuantity(cart.get(product))
                      .build())
          .sorted(Comparator.comparing(ShoppingCartApi.LineItem::getProductId))
          .collect(Collectors.toList());

  ShoppingCartApi.Cart apiCart = (2)
      ShoppingCartApi.Cart.newBuilder().addAllItems(allItems).build();

  return effects().reply(apiCart);
}
1 The current data is passed to the method. Note that this may not be the most up-to-date value, with concurrent modifications made by other instances of this Replicated Entity being replicated eventually.
2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect with effects().reply.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
def getCart(
    cart: ReplicatedCounterMap[Product], (1)
    getShoppingCart: shoppingcart.GetShoppingCart): ReplicatedEntity.Effect[shoppingcart.Cart] = {

  val allItems =
    cart.keySet
      .map { product =>
        val quantity = cart.get(product).getOrElse(0L)
        shoppingcart.LineItem(product.id, product.name, quantity)
      }
      .toSeq
      .sortBy(_.productId)

  val apiCart = shoppingcart.Cart(allItems) (2)
  effects.reply(apiCart)
}
1 The current data is passed to the method. Note that this may not be the most up-to-date value, with concurrent modifications made by other instances of this Replicated Entity being replicated eventually.
2 We convert the domain representation to the API representation that is sent as a reply by returning an Effect with effects.reply.

Deleting state

The following example shows the implementation of the RemoveCart command handler. Replicated Entity instances for a particular entity identifier can be deleted, using a delete Effect. Once deleted, an entity instance cannot be recreated, and all subsequent commands for that entity identifier will be rejected with an error.

Caution should be taken with creating and deleting Replicated Entities, as Kalix maintains the replicated state in memory and also retains tombstones for each deleted entity. Over time, if many Replicated Entities are created and deleted, this will result in hitting memory limits.
Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override
public Effect<Empty> removeCart(
    ReplicatedCounterMap<ShoppingCartDomain.Product> cart,
    ShoppingCartApi.RemoveShoppingCart removeShoppingCart) {

  return effects()
      .delete() (1)
      .thenReply(Empty.getDefaultInstance());
}
1 The Replicated Entity instances for the associated entity id are deleted by using effects().delete.
Scala
src/main/scala/com/example/shoppingcart/domain/ShoppingCart.scala
def removeCart(
    cart: ReplicatedCounterMap[Product],
    removeShoppingCart: shoppingcart.RemoveShoppingCart): ReplicatedEntity.Effect[Empty] =
  effects.delete (1)
    .thenReply(Empty.defaultInstance)
1 The Replicated Entity instances for the associated entity id are deleted by using effects.delete.

Registering the Entity

To make Kalix aware of the Replicated Entity, we need to register it with the service.

From the code-generation, the registration gets automatically inserted in the generated KalixFactory.withComponents method from the Main class.

Java
src/main/java/com/example/shoppingcart/Main.java
/* This code was generated by Kalix tooling.
 * As long as this file exists it will not be re-generated.
 * You are free to make changes to this file.
 */

package com.example.shoppingcart;

import kalix.javasdk.Kalix;
import com.example.shoppingcart.domain.ShoppingCart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  public static Kalix createKalix() {
    // The KalixFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `new Kalix()` instance.
    return KalixFactory.withComponents(ShoppingCart::new);
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Kalix service");
    createKalix().start();
  }
}
Scala
src/main/scala/com/example/shoppingcart/Main.scala
package com.example.shoppingcart

import kalix.scalasdk.Kalix
import com.example.shoppingcart.domain.ShoppingCart
import org.slf4j.LoggerFactory

// This class was initially generated based on the .proto definition by Kalix tooling.
//
// As long as this file exists it will not be overwritten: you can maintain it yourself,
// or delete it so it is regenerated as needed.

object Main {

  private val log = LoggerFactory.getLogger("com.example.shoppingcart.Main")

  def createKalix(): Kalix = {
    // The KalixFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `Kalix()` instance.
    KalixFactory.withComponents(
      new ShoppingCart(_))
  }

  def main(args: Array[String]): Unit = {
    log.info("starting the Kalix service")
    createKalix().start()
  }
}

By default, the generated constructor has a ReplicatedEntityContext parameter, but you can change this to accept other parameters. If you change the constructor of the ShoppingCart class you will see a compilation error here, and you have to adjust the factory function that is passed to KalixFactory.withComponents.

When more components are added, the KalixFactory is regenerated and you have to adjust the registration from the Main class.

Replicated Data types

Each Replicated Entity is associated with one underlying Replicated Data type. Counter, Register, Set, and Map data structures are available. This section describes how to configure and implement a Replicated Entity with each of the Replicated Data types.

The only way for a command handler to modify the underlying data for a Replicated Entity is by returning an update effect with an updated Replicated Data object. Note that Replicated Data objects are immutable, with each modifying method returning a new instance of the Replicated Data type.

Replicated Counter

A ReplicatedCounter new tab can be incremented and decremented.

To configure a Replicated Entity with a Replicated Counter, use the replicated_counter option when defining the proto file:

Java
src/main/proto/com/example/replicated/counter/counter_api.proto
service CounterService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.counter.domain.SomeCounter" (1)
      type_id: "some-counter"
      replicated_counter: {} (2)
    }
  };

  rpc Increase(IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease(DecreaseValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Counter.
Scala
src/main/proto/com/example/replicated/counter/counter_api.proto
service CounterService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.counter.domain.SomeCounter" (1)
      type_id: "some-counter"
      replicated_counter: {} (2)
    }
  };

  rpc Increase(IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease(DecreaseValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Counter.

When implementing a Replicated Counter entity, the state can be updated by calling the increment or decrement methods on the current data object, and then triggering an update with the modified data object:

Java
src/main/java/com/example/replicated/counter/domain/SomeCounter.java
@Override
public Effect<Empty> increase(ReplicatedCounter counter, SomeCounterApi.IncreaseValue command) {
  return effects()
      .update(counter.increment(command.getValue())) (1)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> decrease(ReplicatedCounter counter, SomeCounterApi.DecreaseValue command) {
  return effects()
      .update(counter.decrement(command.getValue())) (1)
      .thenReply(Empty.getDefaultInstance());
}
1 Modify the Replicated Counter with increment or decrement and trigger a replicated update by returning an Effect with effects().update.
Scala
src/main/scala/com/example/replicated/counter/domain/SomeCounter.scala
def increase(currentData: ReplicatedCounter, increaseValue: counter.IncreaseValue): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.increment(increaseValue.value)) (1)
    .thenReply(Empty.defaultInstance)

def decrease(currentData: ReplicatedCounter, decreaseValue: counter.DecreaseValue): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.decrement(decreaseValue.value)) (1)
    .thenReply(Empty.defaultInstance)
1 Modify the Replicated Counter with increment or decrement and trigger a replicated update by returning an Effect with effects.update.
Java

The current value of a Replicated Counter can be retrieved using getValue:

src/main/java/com/example/replicated/counter/domain/SomeCounter.java
@Override
public Effect<SomeCounterApi.CurrentValue> get(
    ReplicatedCounter counter, SomeCounterApi.GetValue command) {
  long value = counter.getValue(); (1)
  SomeCounterApi.CurrentValue currentValue =
      SomeCounterApi.CurrentValue.newBuilder().setValue(value).build();
  return effects().reply(currentValue);
}
1 Get the current value of a Replicated Counter using getValue.
Scala

The current value of a Replicated Counter can be retrieved using value:

src/main/scala/com/example/replicated/counter/domain/SomeCounter.scala
def get(currentData: ReplicatedCounter, getValue: counter.GetValue): ReplicatedEntity.Effect[counter.CurrentValue] =
  effects
    .reply(counter.CurrentValue(currentData.value)) (1)
1 Get the current value of a Replicated Counter using value.
The current value may not be the most up-to-date value when there are concurrent modifications.

Replicated Register

A ReplicatedRegister new tab ReplicatedRegister new tab can contain any (serializable) value. Updates to the value are replicated using last-write-wins semantics, where concurrent modifications are resolved by using the update with the highest timestamp.

To configure a Replicated Entity with a Replicated Register, use the replicated_register option when defining the proto file:

Java
src/main/proto/com/example/replicated/register/register_api.proto
service RegisterService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.register.domain.SomeRegister" (1)
      type_id: "some-register"
      replicated_register: { (2)
        value: "com.example.replicated.register.domain.SomeValue" (3)
      }
    }
  };

  rpc Set(SetValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Register.
3 Specify the protobuf type for the value of the Replicated Register.
Scala
src/main/proto/com/example/replicated/register/register_api.proto
service RegisterService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.register.domain.SomeRegister" (1)
      type_id: "some-register"
      replicated_register: { (2)
        value: "com.example.replicated.register.domain.SomeValue" (3)
      }
    }
  };

  rpc Set(SetValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Register.
3 Specify the protobuf type for the value of the Replicated Register.
The type for the value can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. A message type is being used for the value type in this example.

When implementing a Replicated Register entity, an initial or empty value needs to be defined by overriding the emptyValue method:

Java
src/main/java/com/example/replicated/register/domain/SomeRegister.java
@Override
public SomeRegisterDomain.SomeValue emptyValue() {
  return SomeRegisterDomain.SomeValue.getDefaultInstance();
}
Scala
src/main/scala/com/example/replicated/register/domain/SomeRegister.scala
override def emptyValue: SomeValue = SomeValue.defaultInstance

The value can be updated by calling the set method on the current data object, and then triggering an update with the modified data object:

Java
src/main/java/com/example/replicated/register/domain/SomeRegister.java
@Override
public Effect<Empty> set(
    ReplicatedRegister<SomeRegisterDomain.SomeValue> register, SomeRegisterApi.SetValue command) {
  SomeRegisterDomain.SomeValue newValue = (1)
      SomeRegisterDomain.SomeValue.newBuilder().setSomeField(command.getValue()).build();
  return effects()
      .update(register.set(newValue)) (2)
      .thenReply(Empty.getDefaultInstance());
}
1 Create a domain object for the new value.
2 Update the Replicated Register value with set and trigger a replicated update by returning an Effect with effects().update.
Scala
src/main/scala/com/example/replicated/register/domain/SomeRegister.scala
def set(currentData: ReplicatedRegister[SomeValue], setValue: register.SetValue): ReplicatedEntity.Effect[Empty] = {
  val someValue = SomeValue(setValue.value) (1)
  effects
  .update(currentData.set(someValue)) (2)
  .thenReply(Empty.defaultInstance)
}
1 Create a domain object for the new value.
2 Update the Replicated Register value with set and trigger a replicated update by returning an Effect with effects().update.

The current value of a Replicated Register can be retrieved using get:

Java
src/main/java/com/example/replicated/register/domain/SomeRegister.java
@Override
public Effect<SomeRegisterApi.CurrentValue> get(
    ReplicatedRegister<SomeRegisterDomain.SomeValue> register, SomeRegisterApi.GetValue command) {
  SomeRegisterDomain.SomeValue value = register.get(); (1)
  SomeRegisterApi.CurrentValue currentValue = (2)
      SomeRegisterApi.CurrentValue.newBuilder().setValue(value.getSomeField()).build();
  return effects().reply(currentValue);
}
1 Get the current value of a Replicated Register using get.
2 Convert from the domain object to the API object.
Scala
src/main/scala/com/example/replicated/register/domain/SomeRegister.scala
def get(currentData: ReplicatedRegister[SomeValue], getValue: register.GetValue): ReplicatedEntity.Effect[register.CurrentValue] = {
  val someValue = currentData() (1)
  effects.reply(register.CurrentValue(someValue.someField)) (2)
}
1 Get the current value of a Replicated Register using apply.
2 Convert from the domain object to the API object.
The current value may not be the most up-to-date value when there are concurrent modifications.

Replicated Set

A ReplicatedSet new tab ReplicatedSet new tab is a set of (serializable) values, where elements can be added or removed.

To configure a Replicated Entity with a Replicated Set, use the replicated_set option when defining the proto file:

Java
src/main/proto/com/example/replicated/set/set_api.proto
service SetService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.set.domain.SomeSet" (1)
      type_id: "some-set"
      replicated_set: { (2)
        element: "string" (3)
      }
    }
  };

  rpc Add(AddElement) returns (google.protobuf.Empty);
  rpc Remove(RemoveElement) returns (google.protobuf.Empty);
  rpc Get(GetElements) returns (CurrentElements);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Set.
3 Specify the protobuf type for the elements of the Replicated Set. In this case, the scalar type string.
Scala
src/main/proto/com/example/replicated/set/set_api.proto
service SetService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.set.domain.SomeSet" (1)
      type_id: "some-set"
      replicated_set: { (2)
        element: "string" (3)
      }
    }
  };

  rpc Add(AddElement) returns (google.protobuf.Empty);
  rpc Remove(RemoveElement) returns (google.protobuf.Empty);
  rpc Get(GetElements) returns (CurrentElements);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Set.
3 Specify the protobuf type for the elements of the Replicated Set. In this case, the scalar type string.
The type for the elements can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. The string scalar type is being used for the element type in this example, which corresponds to the Java String class.
Care needs to be taken to ensure that the serialized values for elements in the set are stable.

When implementing a Replicated Set entity, the state can be updated by calling the add or remove methods on the current data object, and then triggering an update with the modified data object:

Java
src/main/java/com/example/replicated/set/domain/SomeSet.java
@Override
public Effect<Empty> add(ReplicatedSet<String> set, SomeSetApi.AddElement command) {
  return effects()
      .update(set.add(command.getElement())) (1)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> remove(ReplicatedSet<String> set, SomeSetApi.RemoveElement command) {
  return effects()
      .update(set.remove(command.getElement())) (1)
      .thenReply(Empty.getDefaultInstance());
}
1 Modify the elements of the Replicated Set with add or remove and trigger a replicated update by returning an Effect with effects().update.
Scala
src/main/scala/com/example/replicated/set/domain/SomeSet.scala
def add(currentData: ReplicatedSet[String], addElement: set.AddElement): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.add(addElement.element)) (1)
    .thenReply(Empty.defaultInstance)

def remove(currentData: ReplicatedSet[String], removeElement: set.RemoveElement): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.remove(removeElement.element)) (1)
    .thenReply(Empty.defaultInstance)
1 Modify the elements of the Replicated Set with add or remove and trigger a replicated update by returning an Effect with effects.update.
Java

The elements method for Replicated Set returns a regular java.util.Set that can be used to iterate over the current elements:

src/main/java/com/example/replicated/set/domain/SomeSet.java
@Override
public Effect<SomeSetApi.CurrentElements> get(
    ReplicatedSet<String> set, SomeSetApi.GetElements command) {
  List<String> elements =
      set.elements().stream() (1)
          .sorted()
          .collect(Collectors.toList());

  SomeSetApi.CurrentElements currentElements =
      SomeSetApi.CurrentElements.newBuilder().addAllElements(elements).build();

  return effects().reply(currentElements);
}
1 Iterate over the current elements of a Replicated Set.
Scala

The elements method for Replicated Set returns a regular Scala Set that contains the current elements:

src/main/scala/com/example/replicated/set/domain/SomeSet.scala
def get(currentData: ReplicatedSet[String], getElements: set.GetElements): ReplicatedEntity.Effect[set.CurrentElements] =
  effects.reply(set.CurrentElements(currentData.elements.toSeq)) (1)
1 Turn the Set into a Seq for the response.
The current value may not be the most up-to-date view of the set when there are concurrent modifications.

Replicated Counter Map

A ReplicatedCounterMap new tab ReplicatedCounterMap new tab maps (serializable) keys to replicated counters, where each value can be incremented and decremented.

To configure a Replicated Entity with a Replicated Counter Map, use the replicated_counter_map option when defining the proto file.

Java
src/main/proto/com/example/replicated/countermap/counter_map_api.proto
service CounterMapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.countermap.domain.SomeCounterMap" (1)
      type_id: "some-counter-map"
      replicated_counter_map: { (2)
        key: "string" (3)
      }
    }
  };

  rpc Increase(IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease(DecreaseValue) returns (google.protobuf.Empty);
  rpc Remove(RemoveValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
  rpc GetAll(GetAllValues) returns (CurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Counter Map.
3 Specify the protobuf type for the keys of the Replicated Counter Map. In this case, the scalar type string.
Scala
src/main/proto/com/example/replicated/countermap/counter_map_api.proto
service CounterMapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.countermap.domain.SomeCounterMap" (1)
      type_id: "some-counter-map"
      replicated_counter_map: { (2)
        key: "string" (3)
      }
    }
  };

  rpc Increase(IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease(DecreaseValue) returns (google.protobuf.Empty);
  rpc Remove(RemoveValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
  rpc GetAll(GetAllValues) returns (CurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Counter Map.
3 Specify the protobuf type for the keys of the Replicated Counter Map. In this case, the scalar type string.
The type for the key can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. The string scalar type is being used for the key type in this example, which corresponds to the Java String class.

When implementing a Replicated Counter Map entity, the value of an entry can be updated by calling the increment or decrement methods on the current data object, and then triggering an update with the modified data object. Entries can be removed from the map using the remove method.

Java
src/main/java/com/example/replicated/countermap/domain/SomeCounterMap.java
@Override
public Effect<Empty> increase(
    ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.IncreaseValue command) {
  return effects()
      .update(counterMap.increment(command.getKey(), command.getValue())) (1)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> decrease(
    ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.DecreaseValue command) {
  return effects()
      .update(counterMap.decrement(command.getKey(), command.getValue())) (1)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> remove(
    ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.RemoveValue command) {
  return effects()
      .update(counterMap.remove(command.getKey())) (1)
      .thenReply(Empty.getDefaultInstance());
}
1 Modify the values of the Replicated Counter Map with increment, decrement, or remove and trigger a replicated update by returning an Effect with effects().update.
Scala
src/main/scala/com/example/replicated/countermap/domain/SomeCounterMap.scala
def increase(currentData: ReplicatedCounterMap[String], increaseValue: countermap.IncreaseValue): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.increment(increaseValue.key, increaseValue.value)) (1)
    .thenReply(Empty.defaultInstance)

def decrease(currentData: ReplicatedCounterMap[String], decreaseValue: countermap.DecreaseValue): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.decrement(decreaseValue.key, decreaseValue.value)) (1)
    .thenReply(Empty.defaultInstance)

def remove(currentData: ReplicatedCounterMap[String], removeValue: countermap.RemoveValue): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.remove(removeValue.key)) (1)
    .thenReply(Empty.defaultInstance)
1 Modify the values of the Replicated Counter Map with increment, decrement, or remove and trigger a replicated update by returning an Effect with effects.update.

Individual counters in a Replicated Counter Map can be accessed, or the set of keys can be used to iterate over all counters.

Java
src/main/java/com/example/replicated/countermap/domain/SomeCounterMap.java
@Override
public Effect<SomeCounterMapApi.CurrentValue> get(
    ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.GetValue command) {
  long value = counterMap.get(command.getKey()); (1)
  SomeCounterMapApi.CurrentValue currentValue =
      SomeCounterMapApi.CurrentValue.newBuilder().setValue(value).build();
  return effects().reply(currentValue);
}

@Override
public Effect<SomeCounterMapApi.CurrentValues> getAll(
    ReplicatedCounterMap<String> counterMap, SomeCounterMapApi.GetAllValues command) {
  Map<String, Long> values =
      counterMap.keySet().stream() (2)
          .map(key -> new SimpleEntry<>(key, counterMap.get(key)))
          .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
  SomeCounterMapApi.CurrentValues currentValues =
      SomeCounterMapApi.CurrentValues.newBuilder().putAllValues(values).build();
  return effects().reply(currentValues);
}
1 Get the current counter value for a key using get.
2 Iterate over the current entries of a Replicated Counter Map using keySet.
Scala
src/main/scala/com/example/replicated/countermap/domain/SomeCounterMap.scala
def get(currentData: ReplicatedCounterMap[String], getValue: countermap.GetValue): ReplicatedEntity.Effect[countermap.CurrentValue] = {
  val value = currentData(getValue.key) (1)
  effects.reply(countermap.CurrentValue(value))
}

def getAll(currentData: ReplicatedCounterMap[String], getAllValues: countermap.GetAllValues): ReplicatedEntity.Effect[countermap.CurrentValues] = {
  val keyValues = currentData.keySet.map { key => key -> currentData(key) }.toMap (2)
  effects.reply(countermap.CurrentValues(keyValues))
}
1 Get the current counter value for a key using apply.
2 Iterate over the current entries of a Replicated Counter Map using keySet.
The get apply method returns a default value of 0L if the map does not contain the key.
Entries may not contain the most up-to-date values for counters when there are concurrent modifications.

Replicated Register Map

A ReplicatedRegisterMap new tab ReplicatedRegisterMap new tab maps (serializable) keys to replicated registers of (serializable) values. Updates to values are replicated using last-write-wins semantics, where concurrent modifications are resolved by using the update with the highest timestamp.

To configure a Replicated Entity with a Replicated Register Map, use the replicated_register_map option when defining the proto file.

Java
src/main/proto/com/example/replicated/registermap/register_map_api.proto
service RegisterMapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.registermap.domain.SomeRegisterMap" (1)
      type_id: "some-register-map"
      replicated_register_map: { (2)
        key: "com.example.replicated.registermap.domain.SomeKey" (3)
        value: "com.example.replicated.registermap.domain.SomeValue" (4)
      }
    }
  };

  rpc Set(SetValue) returns (google.protobuf.Empty);
  rpc Remove(RemoveValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
  rpc GetAll(GetAllValues) returns (CurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Register Map.
3 Specify the protobuf type for the keys of the Replicated Register Map.
4 Specify the protobuf type for the values of the Replicated Register Map.
Scala
src/main/proto/com/example/replicated/registermap/domain/register_map_domain.proto
service RegisterMapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.registermap.domain.SomeRegisterMap" (1)
      type_id: "some-register-map"
      replicated_register_map: { (2)
        key: "com.example.replicated.registermap.domain.SomeKey" (3)
        value: "com.example.replicated.registermap.domain.SomeValue" (4)
      }
    }
  };

  rpc Set(SetValue) returns (google.protobuf.Empty);
  rpc Remove(RemoveValue) returns (google.protobuf.Empty);
  rpc Get(GetValue) returns (CurrentValue);
  rpc GetAll(GetAllValues) returns (CurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Register Map.
3 Specify the protobuf type for the keys of the Replicated Register Map.
4 Specify the protobuf type for the values of the Replicated Register Map.
The type for the key or value can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. Message types are being used for both the key and value types in this example.

When implementing a Replicated Register Map entity, the value of an entry can be updated by calling the setValue method on the current data object, and then triggering an update with the modified data object. Entries can be removed from the map using the remove method.

Java
src/main/java/com/example/replicated/registermap/domain/SomeRegisterMap.java
@Override
public Effect<Empty> set(
    ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue>
        registerMap,
    SomeRegisterMapApi.SetValue command) {
  SomeRegisterMapDomain.SomeKey key = (1)
      SomeRegisterMapDomain.SomeKey.newBuilder()
          .setSomeField(command.getKey().getField())
          .build();
  SomeRegisterMapDomain.SomeValue value = (2)
      SomeRegisterMapDomain.SomeValue.newBuilder()
          .setSomeField(command.getValue().getField())
          .build();
  return effects()
      .update(registerMap.setValue(key, value)) (3)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> remove(
    ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue>
        registerMap,
    SomeRegisterMapApi.RemoveValue command) {
  SomeRegisterMapDomain.SomeKey key = (1)
      SomeRegisterMapDomain.SomeKey.newBuilder()
          .setSomeField(command.getKey().getField())
          .build();
  return effects()
      .update(registerMap.remove(key)) (3)
      .thenReply(Empty.getDefaultInstance());
}
1 Create a domain object for the key.
2 Create a domain object for the value.
3 Modify the values of the Replicated Register Map with setValue, or remove and trigger a replicated update by returning an Effect with effects().update.
Scala
src/main/scala/com/example/replicated/registermap/domain/SomeRegisterMap.scala
def set(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], setValue: registermap.SetValue): ReplicatedEntity.Effect[Empty] = {
  val key = SomeKey(setValue.getKey.field) (1)
  val value = SomeValue(setValue.getValue.field) (2)
  effects
    .update(currentData.setValue(key,value)) (3)
    .thenReply(Empty.defaultInstance)
}

def remove(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], removeValue: registermap.RemoveValue): ReplicatedEntity.Effect[Empty] = {
  val key = SomeKey(removeValue.getKey.field) (1)
  effects
    .update(currentData.remove(key)) (3)
    .thenReply(Empty.defaultInstance)
}
1 Create a domain object for the key.
2 Create a domain object for the value.
3 Modify the values of the Replicated Register Map with setValue, or remove and trigger a replicated update by returning an Effect with effects.update.

Individual registers in a Replicated Register Map can be accessed, or the set of keys can be used to iterate over all registers.

Java
src/main/java/com/example/replicated/registermap/domain/SomeRegisterMap.java
@Override
public Effect<SomeRegisterMapApi.CurrentValue> get(
    ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue>
        registerMap,
    SomeRegisterMapApi.GetValue command) {
  SomeRegisterMapDomain.SomeKey key = (1)
      SomeRegisterMapDomain.SomeKey.newBuilder()
          .setSomeField(command.getKey().getField())
          .build();
  Optional<SomeRegisterMapDomain.SomeValue> maybeValue = registerMap.getValue(key); (2)
  SomeRegisterMapApi.CurrentValue currentValue =
      SomeRegisterMapApi.CurrentValue.newBuilder()
          .setValue(
              SomeRegisterMapApi.Value.newBuilder()
                  .setField(
                      maybeValue.map(SomeRegisterMapDomain.SomeValue::getSomeField).orElse("")))
          .build();
  return effects().reply(currentValue);
}

@Override
public Effect<SomeRegisterMapApi.CurrentValues> getAll(
    ReplicatedRegisterMap<SomeRegisterMapDomain.SomeKey, SomeRegisterMapDomain.SomeValue>
        registerMap,
    SomeRegisterMapApi.GetAllValues command) {
  List<SomeRegisterMapApi.CurrentValue> allValues =
      registerMap.keySet().stream() (3)
          .map(
              key -> {
                String value =
                    registerMap
                        .getValue(key)
                        .map(SomeRegisterMapDomain.SomeValue::getSomeField)
                        .orElse("");
                return SomeRegisterMapApi.CurrentValue.newBuilder()
                    .setKey(SomeRegisterMapApi.Key.newBuilder().setField(key.getSomeField()))
                    .setValue(SomeRegisterMapApi.Value.newBuilder().setField(value))
                    .build();
              })
          .collect(Collectors.toList());
  SomeRegisterMapApi.CurrentValues currentValues =
      SomeRegisterMapApi.CurrentValues.newBuilder().addAllValues(allValues).build();
  return effects().reply(currentValues);
}
1 Create a domain object for the key.
2 Get the current register value for a key using getValue.
3 Iterate over the current entries of a Replicated Register Map using keySet.
The getValue method returns an Optional for when the map does not contain the given key.
Scala
src/main/scala/com/example/replicated/registermap/domain/SomeRegisterMap.scala
def get(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], getValue: registermap.GetValue): ReplicatedEntity.Effect[registermap.CurrentValue] = {
  val key = SomeKey(getValue.getKey.field) (1)
  val maybeValue = currentData.get(key) (2)
  val currentValue = registermap.CurrentValue(getValue.key, maybeValue.map(v => registermap.Value(v.someField)))
  effects.reply(currentValue)
}

def getAll(currentData: ReplicatedRegisterMap[SomeKey, SomeValue], getAllValues: registermap.GetAllValues): ReplicatedEntity.Effect[registermap.CurrentValues] = {

  val allData =
    currentData.keySet.map { key => (3)
      val value = currentData.get(key).map(v => registermap.Value(v.someField))
      registermap.CurrentValue(Some(registermap.Key(key.someField)), value)
    }.toSeq

  effects.reply(registermap.CurrentValues(allData))
}
1 Create a domain object for the key.
2 Get the current register value for a key using get.
3 Iterate over the current entries of a Replicated Register Map using keySet.
The get method returns an Option for when the map does not contain the given key.
Entries may not contain the most up-to-date values for registers when there are concurrent modifications.

Replicated Multi-Map

A ReplicatedMultiMap new tab ReplicatedMultiMap new tab maps (serializable) keys to replicated sets of (serializable) values, providing a multi-map interface that can associate multiple values with each key.

To configure a Replicated Entity with a Replicated Multi-Map, use the replicated_multi_map option when defining the proto file.

Java
src/main/proto/com/example/replicated/multimap/multi_map_api.proto
service MultiMapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.multimap.domain.SomeMultiMap" (1)
      type_id: "some-multi-map"
      replicated_multi_map: { (2)
        key: "string" (3)
        value: "double" (4)
      }
    }
  };

  rpc Put(PutValue) returns (google.protobuf.Empty);
  rpc PutAll(PutAllValues) returns (google.protobuf.Empty);
  rpc Remove(RemoveValue) returns (google.protobuf.Empty);
  rpc RemoveAll(RemoveAllValues) returns (google.protobuf.Empty);
  rpc Get(GetValues) returns (CurrentValues);
  rpc GetAll(GetAllValues) returns (AllCurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Multi-Map.
3 Specify the protobuf type for the keys of the Replicated Multi-Map. In this case, the scalar type string.
4 Specify the protobuf type for the values of the Replicated Multi-Map. In this case, the scalar type double.
Scala
src/main/proto/com/example/replicated/multimap/domain/multi_map_domain.proto
service MultiMapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.multimap.domain.SomeMultiMap" (1)
      type_id: "some-multi-map"
      replicated_multi_map: { (2)
        key: "string" (3)
        value: "double" (4)
      }
    }
  };

  rpc Put(PutValue) returns (google.protobuf.Empty);
  rpc PutAll(PutAllValues) returns (google.protobuf.Empty);
  rpc Remove(RemoveValue) returns (google.protobuf.Empty);
  rpc RemoveAll(RemoveAllValues) returns (google.protobuf.Empty);
  rpc Get(GetValues) returns (CurrentValues);
  rpc GetAll(GetAllValues) returns (AllCurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Multi-Map.
3 Specify the protobuf type for the keys of the Replicated Multi-Map. In this case, the scalar type string.
4 Specify the protobuf type for the values of the Replicated Multi-Map. In this case, the scalar type double.
The type for the key or value can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. The string scalar type is being used for the key type and the double scalar type for the value type in this example, which correspond to the Java types for String and Double.

When implementing a Replicated Multi-Map entity, the values of an entry can be updated by calling the put, putAll, or remove methods on the current data object, and then triggering an update with the modified data object. Entries can be removed entirely from the map using the removeAll method.

Java
src/main/java/com/example/replicated/multimap/domain/SomeMultiMap.java
@Override
public Effect<Empty> put(
    ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.PutValue command) {
  return effects()
      .update(multiMap.put(command.getKey(), command.getValue())) (1)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> putAll(
    ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.PutAllValues command) {
  return effects()
      .update(multiMap.putAll(command.getKey(), command.getValuesList())) (1)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> remove(
    ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.RemoveValue command) {
  return effects()
      .update(multiMap.remove(command.getKey(), command.getValue())) (1)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> removeAll(
    ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.RemoveAllValues command) {
  return effects()
      .update(multiMap.removeAll(command.getKey())) (1)
      .thenReply(Empty.getDefaultInstance());
}
1 Modify the values of the Replicated Multi-Map with put, putAll, remove, or removeAll and trigger a replicated update by returning an Effect with effects().update.
Scala
src/main/scala/com/example/replicated/multimap/domain/SomeMultiMap.scala
def put(currentData: ReplicatedMultiMap[String, Double], putValue: multimap.PutValue): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.put(putValue.key, putValue.value)) (1)
    .thenReply(Empty.defaultInstance)

def putAll(currentData: ReplicatedMultiMap[String, Double], putAllValues: multimap.PutAllValues): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.putAll(putAllValues.key, putAllValues.values)) (1)
    .thenReply(Empty.defaultInstance)

def remove(currentData: ReplicatedMultiMap[String, Double], removeValue: multimap.RemoveValue): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.remove(removeValue.key, removeValue.value)) (1)
    .thenReply(Empty.defaultInstance)

def removeAll(currentData: ReplicatedMultiMap[String, Double], removeAllValues: multimap.RemoveAllValues): ReplicatedEntity.Effect[Empty] =
  effects
    .update(currentData.removeAll(removeAllValues.key)) (1)
    .thenReply(Empty.defaultInstance)
1 Modify the values of the Replicated Multi-Map with put, putAll, remove, or removeAll and trigger a replicated update by returning an Effect with effects().update.

Individual entries in a Replicated Multi-Map can be accessed, or the set of keys can be used to iterate over all value sets.

Java
src/main/java/com/example/replicated/multimap/domain/SomeMultiMap.java
@Override
public Effect<SomeMultiMapApi.CurrentValues> get(
    ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.GetValues command) {
  Set<Double> values = multiMap.get(command.getKey()); (1)
  SomeMultiMapApi.CurrentValues currentValues =
      SomeMultiMapApi.CurrentValues.newBuilder()
          .addAllValues(values.stream().sorted().collect(Collectors.toList()))
          .build();
  return effects().reply(currentValues);
}

@Override
public Effect<SomeMultiMapApi.AllCurrentValues> getAll(
    ReplicatedMultiMap<String, Double> multiMap, SomeMultiMapApi.GetAllValues command) {
  List<SomeMultiMapApi.CurrentValues> allValues =
      multiMap.keySet().stream() (2)
          .map(
              key -> {
                List<Double> values =
                    multiMap.get(key).stream().sorted().collect(Collectors.toList());
                return SomeMultiMapApi.CurrentValues.newBuilder()
                    .setKey(key)
                    .addAllValues(values)
                    .build();
              })
          .collect(Collectors.toList());
  SomeMultiMapApi.AllCurrentValues allCurrentValues =
      SomeMultiMapApi.AllCurrentValues.newBuilder().addAllValues(allValues).build();
  return effects().reply(allCurrentValues);
}
1 Get the current set of values for a key using get.
2 Iterate over the current entries of a Replicated Multi-Map using keySet.
Scala
src/main/scala/com/example/replicated/multimap/domain/SomeMultiMap.scala
def get(currentData: ReplicatedMultiMap[String, Double], getValues: multimap.GetValues): ReplicatedEntity.Effect[multimap.CurrentValues] = {
  val values = currentData.get(getValues.key) (1)
  effects
    .reply(multimap.CurrentValues(getValues.key, values.toSeq))
}

/** Command handler for "GetAll". */
def getAll(currentData: ReplicatedMultiMap[String, Double], getAllValues: multimap.GetAllValues): ReplicatedEntity.Effect[multimap.AllCurrentValues] = {
  val currentValues =
    currentData.keySet.map { key => (2)
      val values = currentData.get(key)
      multimap.CurrentValues(key, values.toSeq)
    }

  effects.reply(multimap.AllCurrentValues(currentValues.toSeq))
}
1 Get the current set of values for a key using get.
2 Iterate over the current entries of a Replicated Multi-Map using keySet.
Entries may not contain the most up-to-date values when there are concurrent modifications.

Replicated Map

A ReplicatedMap new tab ReplicatedMap new tab maps (serializable) keys to any other Replicated Data types, allowing a heterogeneous map where values can be of any Replicated Data type.

Prefer to use the specialized replicated maps (Replicated Counter Map, Replicated Register Map, or Replicated Multi-Map) whenever the values of the map are of the same type — counters, registers, or sets.

To configure a Replicated Entity with a (heterogeneous) Replicated Map, use the replicated_map option when defining the proto file.

Java
src/main/proto/com/example/replicated/map/map_api.proto
service MapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.map.domain.SomeMap" (1)
      type_id: "some-map"
      replicated_map: { (2)
        key: "com.example.replicated.map.domain.SomeKey" (3)
      }
    }
  };

  rpc IncreaseFoo(IncreaseFooValue) returns (google.protobuf.Empty);
  rpc DecreaseFoo(DecreaseFooValue) returns (google.protobuf.Empty);
  rpc SetBar(SetBarValue) returns (google.protobuf.Empty);
  rpc AddBaz(AddBazValue) returns (google.protobuf.Empty);
  rpc RemoveBaz(RemoveBazValue) returns (google.protobuf.Empty);
  rpc Get(GetValues) returns (CurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Map.
3 Specify the protobuf type for the keys of the map.
Scala
src/main/proto/com/example/replicated/map/map_api.proto
service MapService {
  option (kalix.codegen) = {
    replicated_entity: {
      name: "com.example.replicated.map.domain.SomeMap" (1)
      type_id: "some-map"
      replicated_map: { (2)
        key: "com.example.replicated.map.domain.SomeKey" (3)
      }
    }
  };

  rpc IncreaseFoo(IncreaseFooValue) returns (google.protobuf.Empty);
  rpc DecreaseFoo(DecreaseFooValue) returns (google.protobuf.Empty);
  rpc SetBar(SetBarValue) returns (google.protobuf.Empty);
  rpc AddBaz(AddBazValue) returns (google.protobuf.Empty);
  rpc RemoveBaz(RemoveBazValue) returns (google.protobuf.Empty);
  rpc Get(GetValues) returns (CurrentValues);
}
1 The fully-qualified class name for this replicated entity.
2 Specify the Replicated Data type as a Replicated Map.
3 Specify the protobuf type for the keys of the map.
The type for the key can be a protobuf message type or scalar value type. The generated code will use the corresponding Java type. A message type is being used for the key type in this example.
The value type for a Replicated Map is not specified for code-generation, and will be set to ReplicatedData for a heterogeneous map (a Replicated Map that contains different types of Replicated Data values).

When implementing a Replicated Map entity, the replicated data for an entry can be updated by retrieving the data value using the get or getOrElse methods, updating values using the update method, and then triggering an update effect with the modified Replicated Map. Entries can be removed from the map using the remove method.

There are also accessors for each of the Replicated Data types to make a heterogeneous map easier to use, such as getReplicatedCounter or getReplicatedRegister. If a key is not present in the map, these will return an empty value for the associated Replicated Data type.

Java
src/main/java/com/example/replicated/map/domain/SomeMap.java
@Override
public Effect<Empty> increaseFoo(
    ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map,
    SomeMapApi.IncreaseFooValue command) {
  ReplicatedCounter foo = map.getReplicatedCounter(FOO_KEY); (1)
  return effects()
      .update(map.update(FOO_KEY, foo.increment(command.getValue()))) (2) (3)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> decreaseFoo(
    ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map,
    SomeMapApi.DecreaseFooValue command) {
  ReplicatedCounter foo = map.getReplicatedCounter(FOO_KEY); (1)
  return effects()
      .update(map.update(FOO_KEY, foo.decrement(command.getValue()))) (2) (3)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> setBar(
    ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.SetBarValue command) {
  ReplicatedRegister<String> bar = map.getReplicatedRegister(BAR_KEY); (1)
  return effects()
      .update(map.update(BAR_KEY, bar.set(command.getValue()))) (2) (3)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> addBaz(
    ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.AddBazValue command) {
  ReplicatedSet<String> baz = map.getReplicatedSet(BAZ_KEY); (1)
  return effects()
      .update(map.update(BAZ_KEY, baz.add(command.getValue()))) (2) (3)
      .thenReply(Empty.getDefaultInstance());
}

@Override
public Effect<Empty> removeBaz(
    ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.RemoveBazValue command) {
  ReplicatedSet<String> baz = map.getReplicatedSet(BAZ_KEY); (1)
  return effects()
      .update(map.update(BAZ_KEY, baz.remove(command.getValue()))) (2) (3)
      .thenReply(Empty.getDefaultInstance());
}
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type.
2 Modify the Replicated Data value using its modifying methods and update on the Replicated Map.
3 Trigger a replicated update by returning an Effect with effects().update.
Scala
src/main/scala/com/example/replicated/map/domain/SomeMap.scala
def increaseFoo(currentData: ReplicatedMap[SomeKey, ReplicatedData], increaseFooValue: map.IncreaseFooValue): ReplicatedEntity.Effect[Empty] = {
  val foo = currentData.getReplicatedCounter(FooKey) (1)
  effects
    .update(currentData.update(FooKey, foo.increment(increaseFooValue.value)))(2) (3)
    .thenReply(Empty.defaultInstance)
}

def decreaseFoo(currentData: ReplicatedMap[SomeKey, ReplicatedData], decreaseFooValue: map.DecreaseFooValue): ReplicatedEntity.Effect[Empty] = {
  val foo = currentData.getReplicatedCounter(FooKey) (1)
  effects
    .update(currentData.update(FooKey, foo.decrement(decreaseFooValue.value))) (2) (3)
    .thenReply(Empty.defaultInstance)
}

def setBar(currentData: ReplicatedMap[SomeKey, ReplicatedData], setBarValue: map.SetBarValue): ReplicatedEntity.Effect[Empty] = {
  val bar: ReplicatedRegister[String] = currentData.getReplicatedRegister(BarKey)
  effects
    .update(currentData.update(BarKey, bar.set(setBarValue.value)))
    .thenReply(Empty.defaultInstance)
}

def addBaz(currentData: ReplicatedMap[SomeKey, ReplicatedData], addBazValue: map.AddBazValue): ReplicatedEntity.Effect[Empty] = {
  val baz: ReplicatedSet[String] = currentData.getReplicatedSet(BazKey) (1)
  effects
    .update(currentData.update(BazKey, baz.add(addBazValue.value))) (2) (3)
    .thenReply(Empty.defaultInstance)
}

def removeBaz(currentData: ReplicatedMap[SomeKey, ReplicatedData], removeBazValue: map.RemoveBazValue): ReplicatedEntity.Effect[Empty] = {
  val baz: ReplicatedSet[String] = currentData.getReplicatedSet(BazKey) (1)
  effects
    .update(currentData.update(BazKey, baz.remove(removeBazValue.value))) (2) (3)
    .thenReply(Empty.defaultInstance)
}
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type.
2 Modify the Replicated Data value using its modifying methods and update on the Replicated Map.
3 Trigger a replicated update by returning an Effect with effects.update.

Individual Replicated Data objects in the Replicated Map can also be accessed for reading the current values.

Java
src/main/java/com/example/replicated/map/domain/SomeMap.java
@Override
public Effect<SomeMapApi.CurrentValues> get(
    ReplicatedMap<SomeMapDomain.SomeKey, ReplicatedData> map, SomeMapApi.GetValues command) {
  ReplicatedCounter foo = map.getReplicatedCounter(FOO_KEY); (1)
  ReplicatedRegister<String> bar = map.getReplicatedRegister(BAR_KEY, () -> ""); (1)
  ReplicatedSet<String> baz = map.getReplicatedSet(BAZ_KEY); (1)
  SomeMapApi.CurrentValues currentValues =
      SomeMapApi.CurrentValues.newBuilder()
          .setFoo(foo.getValue())
          .setBar(bar.get())
          .addAllBaz(baz.elements().stream().sorted().collect(Collectors.toList()))
          .build();
  return effects().reply(currentValues);
}
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type.
Scala
src/main/scala/com/example/replicated/map/domain/SomeMap.scala
def get(currentData: ReplicatedMap[SomeKey, ReplicatedData], getValues: map.GetValues): ReplicatedEntity.Effect[map.CurrentValues] = {

  val foo = currentData.getReplicatedCounter(FooKey) (1)
  val bar: ReplicatedRegister[String] = currentData.getReplicatedRegister(BarKey, () => "") (1)
  val baz: ReplicatedSet[String] = currentData.getReplicatedSet(BazKey) (1)

  val resp = map.CurrentValues(foo.value, bar(), baz.elements.toSeq)
  effects.reply(resp)
}
1 Get the Replicated Data value for a key using an accessor method for the Replicated Data type.
Entries may not contain the most up-to-date values when there are concurrent modifications.

All objects used within Replicated Data types - as keys, values, or elements - must be immutable, and their serialized form must be stable.

Kalix uses the serialized form of these values to track changes in Replicated Sets or Maps. If the same value serializes to different bytes on different occasions, they will be treated as different keys, values, or elements in a Replicated Set or Map.

This is particularly relevant when using Protocol Buffers (protobuf) for serialization. The serialized ordering for the entries of a protobuf map type is undefined, so protobuf map types should not be used within protobuf messages that are keys, values, or elements in Replicated Data objects.

For the rest of the protobuf specification, while no guarantees are made on the stability by the protobuf specification itself, the Java libraries do produce stable orderings for message fields and repeated fields. But care should be taken when changing the protobuf structure of any types used within Replicated Data objects—​many changes that are backwards compatible from a protobuf standpoint do not necessarily translate into stable serializations.