Implementing Actions

Actions are stateless functions that can be used to implement different uses cases, such as:

  • a pure function.

  • request conversion - you can use Actions to convert incoming data into a different format before forwarding a call to a different component.

  • as a face or controller to fan out to multiple calls to different components.

  • publish messages to a Topic.

  • subscribe to events from an Event Sourced Entity.

  • subscribe to state changes from a Value Entity.

  • schedule and cancel Timers.

Actions can be triggered in multiple ways. For example, by:

  • a gRPC service call.

  • an HTTP service call.

  • a forwarded call from another component.

  • a scheduled call from a Timer.

  • an incoming message from a Topic.

  • an incoming event from an Event Sourced Entity, from within the same service or from a different service.

  • state changes notification from a Value Entity on the same service.

  • a service life-cycle event (e.g. on startup).

Defining the proto file

An Action may implement any service method defined in a Protobuf definition. In this first example, we will show how to implement an Action as a pure stateless function. We will define a FibonacciAction that takes a number and return the next number in the Fibonacci series.

Java
src/main/proto/com/example/fibonacci/fibonacci.proto
syntax = "proto3";
package com.example.fibonacci; (1)

import "kalix/annotations.proto"; (2)

option java_outer_classname = "FibonacciApi"; (3)

message Number {
  int64 value = 1;
}

service Fibonacci {
  option (kalix.codegen) = {
    action: {}  (4)
  };

  rpc NextNumber(Number) returns (Number) {}
}
1 Any classes generated from this protobuf file will be in the com.example.fibonacci package.
2 Import the Kalix protobuf annotations or options.
3 Let the messages declared in this protobuf file be inner classes to the Java class FibonacciApi.
4 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
Scala
src/main/proto/com/example/fibonacci/fibonacci.proto
syntax = "proto3";
package com.example.fibonacci; (1)

import "kalix/annotations.proto"; (2)

message Number {
  int64 value = 1;
}

service Fibonacci {
  option (kalix.codegen) = {
    action: {}  (3)
  };

  rpc NextNumber(Number) returns (Number) {}
}
1 Any classes generated from this protobuf file will be in the com.example.fibonacci package.
2 Import the Kalix protobuf annotations or options.
3 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.

Action’s Effect API

The Action’s Effect defines the operations that Kalix should perform when an incoming message is handled by an Action.

An Action Effect can either:

  • reply with a message to the caller

  • reply with a message to be published to a topic (in case the method is a publisher)

  • forward the message to another component

  • return an error

  • ignore the call

Implementing the Action

An Action implementation is a class where you define how each message is handled. The class FibonacciAction gets generated for us based on the proto file defined above. Once the FibonacciAction.java FibonacciAction.scala file exists, it is not overwritten, so you can freely add logic to it. FibonacciAction extends the generated class AbstractFibonacciAction which we’re not supposed to change as it gets regenerated in case we update the protobuf descriptors.

AbstractFibonacciAction contains all method signatures corresponding to the API of the service. If you change the API you will see compilation errors in the FibonacciAction class, and you have to implement the methods required by AbstractFibonacciAction.

Java
src/main/java/com/example/fibonacci/FibonacciAction.java
public class FibonacciAction extends AbstractFibonacciAction { (1)

  public FibonacciAction(ActionCreationContext creationContext) {
  }

  /**
   * Handler for "NextNumber".
   */
  @Override
  public Effect<FibonacciApi.Number> nextNumber(FibonacciApi.Number number) { (2)
    throw new RuntimeException("The command handler for `NextNumber` is not implemented, yet");
  }

}
1 Extends the generated AbstractFibonacciAction, which extends Action new tab.
2 A nextNumber method is generated. We will implement it next.
Scala
src/main/scala/com/example/fibonacci/FibonacciAction.scala
class FibonacciAction(creationContext: ActionCreationContext) extends AbstractFibonacciAction {  (1)

  override def nextNumber(number: Number): Action.Effect[Number] = { (2)
    throw new RuntimeException("The command handler for `NextNumber` is not implemented, yet")
  }
}
1 Extends the generated AbstractFibonacciAction, which extends Action new tab.
2 A nextNumber method is generated. We will implement it next.

Next, we can implement nextNumber method to complete our Action.

Java
src/main/java/com/example/fibonacci/FibonacciAction.java
private boolean isFibonacci(long num) {  (1)
  Predicate<Long> isPerfectSquare = (n) -> {
    long square = (long) Math.sqrt(n);
    return square*square == n;
  };
  return isPerfectSquare.test(5*num*num + 4) || isPerfectSquare.test(5*num*num - 4);
}
private long nextFib(long num) {
  double result = num * (1 + Math.sqrt(5)) / 2.0;
  return Math.round(result);
}

@Override
public Effect<FibonacciApi.Number> nextNumber(FibonacciApi.Number number) {
  long num = number.getValue();
  if (isFibonacci(num)) { (2)
    long nextFib = nextFib(num);
    FibonacciApi.Number response =
        FibonacciApi.Number
            .newBuilder()
            .setValue(nextFib)
            .build();
    return effects().reply(response);
  } else {
    return effects() (3)
             .error("Input number is not a Fibonacci number, received '" + num + "'");
  }
}
1 We add two private methods to support the computation. isFibonacci checks if a number is a Fibonacci number and nextFib calculates the next number.
2 The nextNumber implementation first checks if the input number belongs to the Fibonacci series. If so, it calculates the next number and builds a reply using effects().reply().
3 Otherwise, if the input number doesn’t belong to the Fibonacci series, it builds an Effect reply error.
Scala
src/main/scala/com/example/fibonacci/FibonacciAction.scala
private def isFibonacci(num: Long): Boolean = { (1)
  val isPerfectSquare = (n: Long) => {
    val square = Math.sqrt(n.toDouble).toLong
    square * square == n
  }
  isPerfectSquare(5 * num * num + 4) || isPerfectSquare(5 * num * num - 4)
}

private def nextFib(num: Long): Long = {
  val result = num * (1 + Math.sqrt(5)) / 2.0;
  Math.round(result)
}

override def nextNumber(number: Number): Action.Effect[Number] = {
  val num = number.value
  if (isFibonacci(num)) (2)
    effects.reply(Number(nextFib(num)))
  else
    effects.error(s"Input number is not a Fibonacci number, received '$num'") (3)
}
1 We add two private methods to support the computation. isFibonacci checks if a number is a Fibonacci number and nextFib calculates the next number.
2 The nextNumber implementation first checks if the input number belongs to the Fibonacci series. If so, it calculates the next number and builds a reply using effects.reply().
3 Otherwise, if the input number doesn’t belong to the Fibonacci series, it builds an Effect reply error.

Multiple replies / reply streaming

An Action may return data conditionally by marking the return type as stream in Protobuf. The Java method implementing that service must return an Akka Streams Source to fulfill that contract.

The Source may publish an arbitrary number of replies.

Registering the Action

To make Kalix aware of the Action, we need to register it with the service.

From the code-generation, the registration gets automatically inserted in the generated KalixFactory.withComponents method from the Main class.

Java
/src/main/java/com/example/Main.java
/* This code was generated by Kalix tooling.
 * As long as this file exists it will not be re-generated.
 * You are free to make changes to this file.
 */

package com.example;

import kalix.javasdk.Kalix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.fibonacci.FibonacciAction;

public final class Main {

  private static final Logger LOG = LoggerFactory.getLogger(Main.class);

  public static Kalix createKalix() {
    // The KalixFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `new Kalix()` instance.
    return KalixFactory.withComponents(
      FibonacciAction::new);
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Kalix service");
    createKalix().start();
  }
}
Scala
/src/main/scala/com/example/fibonacci/Main.scala
package com.example.fibonacci

import kalix.scalasdk.Kalix
import org.slf4j.LoggerFactory

object Main {

  private val log = LoggerFactory.getLogger("com.example.fibonacci.Main")

  def createKalix(): Kalix = {
    // The KalixFactory automatically registers any generated Actions, Views or Entities,
    // and is kept up-to-date with any changes in your protobuf definitions.
    // If you prefer, you may remove this and manually register these components in a
    // `Kalix()` instance.
    KalixFactory.withComponents(
      new FibonacciAction(_))
  }

  def main(args: Array[String]): Unit = {
    log.info("starting the Kalix service")
    createKalix().start()
  }
}

By default, the generated constructor has an ActionCreationContext parameter, but you can change this to accept other parameters. If you change the constructor of the FibonacciAction class you will see a compilation error here, and you have to adjust the factory function that is passed to KalixFactory.withComponents.

When more components are added the KalixFactory is regenerated, and you have to adjust the registration from the Main class.

Actions as Controllers

Actions can be used to implement MVC Controllers by acting as the external interface of a service, receiving requests, operating over the requests values and forwarding the call to other components in the same service.

To illustrate how you can use an Action as a Controller, we will build on top of the Value Entity Shopping Cart example, adding a new Action to the existing shopping cart service.

Forwarding Commands

The forward effect allows us to transform or further validate an incoming request before passing it on to another component and have the response message directly passed back to the client making the request. The response from the forwarded operation must have the same response type as the original request.

In this example we accept the same command as the entity, AddLineItem, but add some additional verification of the request and only conditionally forward the request to the entity if the verification is successful:

Java
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
  @Override
  public Effect<Empty> verifiedAddItem(ShoppingCartApi.AddLineItem addLineItem) {
    if (addLineItem.getName().equalsIgnoreCase("carrot")) { (1)
      return effects().error("Carrots no longer for sale"); (2)
    } else {
      DeferredCall<ShoppingCartApi.AddLineItem, Empty> call =
          components().shoppingCart().addItem(addLineItem); (3)
      return effects().forward(call); (4)
    }
  }
1 Check if the added item is carrots.
2 If it is "carrots" immediately return an error, disallowing adding the item.
3 For allowed requests, use components().shoppingCart().addItem() to get a DeferredCall.
4 The DeferredCall is used with effects().forward() to forward the request to the entity.
Scala
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
  override def verifiedAddItem(addLineItem: AddLineItem): Effect[Empty] =
    if (addLineItem.name.equalsIgnoreCase("carrot")) (1)
      effects.error("Carrots no longer for sale") (2)
    else {
      val call = components.shoppingCart.addItem(addLineItem) (3)
      effects.forward(call) (4)
    }
1 Check if the added item is carrots.
2 If it is "carrots" immediately return an error, disallowing adding the item.
3 For allowed requests, use components.shoppingCart.addItem() to get a DeferredCall.
4 The DeferredCall is used with effects.forward() to forward the request to the entity.

Forwarding Headers

By default, Kalix does not forward gRPC/HTTP headers to Kalix components. This can be overridden with component options.

Java
src/main/java/com/example/shoppingcart/Main.java
public final class Main {
  public static Kalix createKalix() {
    Kalix kalix = new Kalix();
    ActionOptions forwardHeaders = ActionOptions.defaults()
        .withForwardHeaders(Set.of("UserRole")); (1)
    return kalix
        .register(ShoppingCartActionProvider.of(ShoppingCartActionImpl::new).
            withOptions(forwardHeaders)) (2)
  }

  public static void main(String[] args) throws Exception {
    LOG.info("starting the Kalix service");
    createKalix().start();
  }
}
1 Specify headers names that should be forwarded to this component, with withForwardHeaders options builder method.
2 Register component with custom options.
Scala
src/main/java/com/example/shoppingcart/Main.scala
object Main {

  def createKalix(): Kalix = {
    val kalix = Kalix()
    val forwardHeaders = ActionOptions.defaults
      .withForwardHeaders(Set("UserRole")) (1)
    kalix
      .register(ShoppingCartActionProvider(new ShoppingCartActionImpl(_))
        .withOptions(forwardHeaders)) (2)
  }

  def main(args: Array[String]): Unit = {
    log.info("starting the Kalix service")
    createKalix().start()
  }
}
1 Specify headers names that should be forwarded to this component, with withForwardHeaders options builder method.
2 Register component with custom options.

Accessing and forwarding headers to other component is done via context Metadata.

Java
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
  @Override
  public Effect<Empty> removeCart(ShoppingCartApi.RemoveShoppingCart removeShoppingCart) {
    var userRole = actionContext().metadata().get("UserRole").get(); (1)
    Metadata metadata = Metadata.EMPTY.add("Role", userRole);
    return effects().forward(
        components().shoppingCart().removeCart(removeShoppingCart)
            .withMetadata(metadata) (2)
    );
  }
1 Get the header value from context Metadata.
2 Forward different header to next call, by using withMetadata method of from DeferredCall class.
Scala
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
  override def removeCart(removeShoppingCart: RemoveShoppingCart): Effect[Empty] = {
    val userRole = actionContext.metadata.get("UserRole").get (1)
    val metadata = Metadata.empty.add("Role", userRole)
    effects.forward(
      components.shoppingCart
        .removeCart(removeShoppingCart)
        .withMetadata(metadata)) (2)
  }
1 Get the header value from context Metadata.
2 Forward different header to next call, by using withMetadata method of from DeferredCall class.

Transform Request and Response to Another Component

The asyncReply and asyncEffect effects allow us to process and transform a request before calling another component and then also transform the response.

As an example, let us look at the problem of creating a new entity with an id generated by another component.

In this example we implement an Initialize command for the controller Action which returns the message NewCartCreated with the entity id that can subsequently be used to interact with the cart.

Java
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
  @Override
  public Effect<ShoppingCartController.NewCartCreated> initializeCart(ShoppingCartController.NewCart newCart) {
    final String cartId = UUID.randomUUID().toString(); (1)
    CompletionStage<Empty> shoppingCartCreated =
        components().shoppingCart().create(ShoppingCartApi.CreateCart.newBuilder().setCartId(cartId).build()) (2)
        .execute(); (3)

    // transform response
    CompletionStage<Effect<ShoppingCartController.NewCartCreated>> effect =
        shoppingCartCreated.handle((empty, error) -> { (4)
          if (error == null) {
            return effects().reply(ShoppingCartController.NewCartCreated.newBuilder().setCartId(cartId).build()); (5)
          } else {
            return effects().error("Failed to create cart, please retry"); (6)
          }
        });

    return effects().asyncEffect(effect); (7)
  }
1 We generate a new UUID.
2 We use components().shoppingCart().create(…​) to create a DeferredCall for create on the shopping cart.
3 execute() on the DeferredCall immediately triggers a call and returns a CompletionStage for the response.
4 Once the call succeeds or fails the CompletionStage is completed or failed, we can transform the result from CompletionStage<Empty> to CompletionStage<Effect<NewCartCreated>> using handle.
5 On a successful response, we create a reply effect with a NewCartCreated.
6 If the call leads to an error, we create an error effect asking the client to retry.
7 effects().asyncEffect() allows us to reply with a CompletionStage<Effect<NewCartCreated>>.
Scala
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
  override def initializeCart(newCart: NewCart): Effect[NewCartCreated] = {
    val cartId = UUID.randomUUID().toString (1)

    val created: Future[Empty] =
      components.shoppingCart.create(CreateCart(cartId)).execute() (2)

    val effect: Future[Effect[NewCartCreated]] = (3)
      created
        .map(_ => effects.reply(NewCartCreated(cartId))) (4)
        .recover(_ => effects.error("Failed to create cart, please retry")) (5)

    effects.asyncEffect(effect) (6)
  }
1 We generate a new UUID.
2 We use components.shoppingCart.create(…​) to create a DeferredCall for create on the shopping cart.
3 execute() on the DeferredCall immediately triggers a call and returns a Future for the response.
4 On a successful response, we map the Empty reply to a reply effect with the reply NewCartCreated.
5 If the call leads to an error, we recover and return an error effect asking the client to retry.
6 effects.asyncEffect() allows us to reply with a Future[Effect[NewCartCreated]] rather than a reply we already have created.

The action generates a UUID to use as entity id for the shopping cart. UUIDs are extremely unlikely to lead to the same id being generated, but to completely guarantee two calls can never be assigned the same shopping cart we make use of the "boundary of consistency" provided by the entity - the entity will only process a single command at a time and can safely make decisions based on its state - for example to only allow creation once by storing something in its state signifying that it has been created.

In this case we mark that the entity has been created using a creation timestamp in the shopping cart state stored on first create call - when the timestamp has the default value of 0). If the cart has already been stored with a timestamp we return an error effect:

Java
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
  @Override
  public Effect<Empty> create(ShoppingCartDomain.Cart currentState, ShoppingCartApi.CreateCart createCart) {
    if (currentState.getCreationTimestamp() > 0L) {
      return effects().error("Cart was already created");
    } else {
      return effects().updateState(currentState.toBuilder().setCreationTimestamp(Instant.now().toEpochMilli()).build())
          .thenReply(Empty.getDefaultInstance());
    }
  }
Scala
src/main/java/com/example/shoppingcart/domain/ShoppingCart.scala
  override def create(currentState: Cart, createCart: shoppingcart.CreateCart): ValueEntity.Effect[Empty] =
    if (currentState.creationTimestamp > 0)
      effects.error("Cart was already created")
    else
      effects.updateState(currentState.copy(creationTimestamp = Instant.now().toEpochMilli))
        .thenReply(Empty.defaultInstance)

Composing calls

The async call shown in the previous section, can also be used to chain or compose multiple calls to a single action response.

In this example we build on the previous cart creation by adding an initial item in the cart once it has been created, but before we return the new id to the client:

Java
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
  @Override
  public Effect<ShoppingCartController.NewCartCreated> createPrePopulated(ShoppingCartController.NewCart newCart) {
    final String cartId = UUID.randomUUID().toString();
    CompletionStage<Empty> shoppingCartCreated =
        components().shoppingCart().create(ShoppingCartApi.CreateCart.newBuilder().setCartId(cartId).build())
            .execute();

    CompletionStage<Empty> cartPopulated = shoppingCartCreated.thenCompose(empty -> { (1)
      ShoppingCartApi.AddLineItem initialItem = (2)
          ShoppingCartApi.AddLineItem.newBuilder()
              .setCartId(cartId)
              .setProductId("e")
              .setName("eggplant")
              .setQuantity(1)
              .build();

      return components().shoppingCart().addItem(initialItem).execute(); (3)
    });

    CompletionStage<ShoppingCartController.NewCartCreated> reply =
        cartPopulated.thenApply(empty -> (4)
          ShoppingCartController.NewCartCreated.newBuilder().setCartId(cartId).build()
        );

    return effects().asyncReply(reply); (5)
  }
1 CompletionStage#thenCompose allow us to perform an additional async operation, returning a CompletionStage once the current one completes successfully.
2 Create a request to add an initial item to the cart.
3 Executing the addItem call returns a CompletionStage<Empty> once it succeeds.
4 handle allows us to transform the successful completion of addItem with Empty to the response type of this method - NewCartCreated.
5 effects().asyncReply() lets us reply once the CompletionStage<NewCartCreated> completes.
Scala
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
  def createPrePopulated(newCart: NewCart): Effect[NewCartCreated] = {
    val cartId = UUID.randomUUID().toString

    val reply: Future[NewCartCreated] =
      for { (1)
        created <- components.shoppingCart.create(CreateCart(cartId)).execute()
        populated <- components.shoppingCart.addItem(AddLineItem(cartId, "e", "eggplant", 1)).execute()
      } yield NewCartCreated(cartId) (2)

    effects.asyncReply(reply) (3)
  }
1 For comprehensions (or directly using flatMap) allow us to compose the individual async steps returning Future.
2 Once both steps have completed, create a NewCartCreated leading to a Future[NewCartCreated] coming out of the for-comprehension.
3 effect.asyncReply lets us reply once the Future[NewCartCreated] completes.

In this sample it is safe to base a subsequent call to the entity on the reply of the previous one, no client will know of the cart id until createPrePopulated replies.

For many other use cases it is important to understand that there is no transaction or consistency boundary outside of the entity, so for a sequence of calls from an action to an entity, the state of the entity could be updated by other calls it receives in-between.

For example, imagine an action that for a cart id retrieves the state using getState to verify if too many items are already in the cart, and once that has been verified, it adds the item to the cart.

Java
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
  @Override
  public Effect<Empty> unsafeValidation(ShoppingCartApi.AddLineItem addLineItem) {
    // NOTE: This is an example of an anti-pattern, do not copy this
    CompletionStage<ShoppingCartApi.Cart> cartReply = components().shoppingCart().getCart(
            ShoppingCartApi.GetShoppingCart.newBuilder()
                .setCartId(addLineItem.getCartId())
                .build())
        .execute(); (1)

    CompletionStage<Effect<Empty>> effect = cartReply.thenApply(cart -> {
      int totalCount = cart.getItemsList().stream()
          .mapToInt(ShoppingCartApi.LineItem::getQuantity)
          .sum();

      if (totalCount < 10) {
        return effects().error("Max 10 items in a cart");
      } else {
        return effects().forward(components().shoppingCart().addItem(addLineItem)); (2)
      }
    });

    return effects().asyncEffect(effect);
  }
1 Between this call returning.
2 And this next call to the same entity, the entity could accept other commands that change the total count of items in the cart.
Scala
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
  override def unsafeValidation(addLineItem: AddLineItem): Effect[Empty] = {
    // NOTE: This is an example of an anti-pattern, do not copy this
    val cartReply = components.shoppingCart
      .getCart(GetShoppingCart(addLineItem.cartId))
      .execute() (1)

    val effect =
      cartReply.map { cart =>
        val totalCount = cart.items.map(_.quantity).sum

        if (totalCount < 10)
          effects.error("Max 10 items in a cart")
        else
          effects.forward(components.shoppingCart.addItem(addLineItem)) (2)
      }

    effects.asyncEffect(effect)
  }
1 Between this call returning.
2 And this next call to the same entity, the entity could accept other commands that changes the total count of items in the cart.

The problem with this is that an addItem call directly to the entity happening between the getState action returning and the addItem call from the action would lead to more items in the cart than the allowed limit.

Such validation that depends on state can only be done safely while handling the command inside the entity.

Actions as Life-cycle Hooks

An Action’s gRPC method can be triggered automatically when some predefined service life-cycle event happens (currently, only on startup is available), serving as a custom hook. For such use, the method needs to be annotated with a specific flag and its input type must be google.protobuf.Empty as shown below.

The on startup hook is called every time a service instance boots up. This can happen for very different reasons: restarting / redeploying the service, scaling up to more instances or even without any user-driven action (e.g. Kalix Runtime versions being rolled out, infrastructure-related incidents, etc.). Therefore, you should carefully consider how you use this hook and its implementation.
service OnStartup {
  option (kalix.codegen) = {
    action: {} (1)
  };

  rpc Init (google.protobuf.Empty) returns (google.protobuf.Empty) { (2)
    option (kalix.method).trigger = {
      on: STARTUP, (3)
      max_retries: 3 (4)
    };
  }
}
1 Only methods belonging to an Action can be configured as a hook.
2 The method must receive google.protobuf.Empty as input type.
3 This hook will be triggered once the instance startup is concluded (i.e. will be called 3 times if 3 instances are running).
4 Optionally, set the max amount of retries to 3.
If the call to the hook returns a failure and the max_retries is set to a value greater than the default value (0), a number of retry calls will be executed with a fixed delay up until the configure amount is reached.

Running Side Effects

Emitting effects on another component

An Entity or an Action may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—​if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.

There is no guarantee that a side effect will be executed successfully. If a failure occurs after the command is fully handled, effects might not be executed. Side effects are not retried in case of failures.

Side effects may be declared as synchronous or asynchronous. Asynchronous commands run in a "fire and forget" fashion. The code flow of the caller (the command handler of the entity which emitted the asynchronous command) continues while the command is being asynchronously processed. Meanwhile, synchronous commands run sequentially, that is, the commands are processed in order, one at a time. The final result of the command handler, either a reply or a forward, is not sent until all synchronous commands are completed.

Use case: mobile notification

You might want to run side effects to notify interested parties of a change in state. For example, after a withdrawal is made from a bank account, an account entity could send a notification to the account owner’s mobile phone.

Emitting a side effect

To illustrate how you can emit a side effect, we can build on top of the Action as a Controller example. In that previous example, we build a controller around the Value Entity Counter and forwarded the incoming request after modifying it.

This time, instead of using a forward, we will call the entity using a side effect.

Java
src/main/proto/com/example/actions/double-counter.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/counter_api.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "DoubleCounterApi";

service DoubleCounter {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc Increase (com.example.IncreaseValue) returns (google.protobuf.Empty); (3)

  rpc IncreaseWithSideEffect (com.example.IncreaseValue) returns (google.protobuf.Empty); (4)

  rpc forwardWithGrpcApi (com.example.IncreaseValue) returns (google.protobuf.Empty);
  rpc sequentialComposition (com.example.IncreaseValue) returns (com.example.CurrentCounter);
  rpc sumOfMy3FavouriteCounterValues (google.protobuf.Empty) returns (com.example.CurrentCounter);

}
1 Import the Counter API definition.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The Action mimics the Counter API with a forward.
4 The Action mimics the Counter API with a side effect.
Scala
src/main/proto/com/example/actions/double-counter.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/counter_api.proto"; (1)
import "google/protobuf/empty.proto";

service DoubleCounter {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc Increase (com.example.IncreaseValue) returns (google.protobuf.Empty); (3)

  rpc IncreaseWithSideEffect (com.example.IncreaseValue) returns (google.protobuf.Empty); (4)

}
1 Import the Counter API definition.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The Action mimics the Counter API with a forward.
4 The Action mimics the Counter API with a side effect.

Implementing the Action

The class DoubleCounterAction gets generated for us based on the same proto file defined in Action as a Controller.

src/main/java/com/example/actions/DoubleCounterAction.java
/**
 * An action.
 */
public class DoubleCounterAction extends AbstractDoubleCounterAction {

  public DoubleCounterAction(ActionCreationContext creationContext) {
  }


  // Handler for "Increase" not shown in this snippet

  /**
   * Handler for "IncreaseWithSideEffect".
   */
  @Override
  public Effect<Empty> increaseWithSideEffect(CounterApi.IncreaseValue increaseValue) {
    int doubled = increaseValue.getValue() * 2;
    CounterApi.IncreaseValue increaseValueDoubled =
        increaseValue.toBuilder().setValue(doubled).build(); (1)

    return effects()
            .reply(Empty.getDefaultInstance()) (2)
            .addSideEffect( (3)
                SideEffect.of(components().counter().increase(increaseValueDoubled)));
  }
}
1 On incoming requests, we double the value of IncreaseValue.
2 We build a reply using Empty.getDefaultInstance().
3 And we attach a side effect to it. The side effect is the call to the Counter entity.

Please note that, the result of a side effect is ignored by the current command meaning that even if the call to the Counter entity fails, the Action reply will succeed.

Adding tracing spans

To create spansnew tab in your actions you need the tracernew tab available through the actionContext() method.

Java

actionContext() gives you the ActionContextnew tab. That is, some methods that apply to the context of the action’s request.

src/main/java/com/example/ControllerAction.java
Tracer tracer = actionContext().getTracer();
Scala

actionContext() gives you the ActionContextnew tab. That is, some methods that apply to the context of the action’s request.

src/main/scala/com/example/ControllerAction.java
val tracer = actionContext.getTracer
If tracing is enabled, you will get a tracer that actually creates span and exports data as expected. But if tracing is not enabled, you will get a no operational tracer instead, which will not create traces.

Trace generation is disabled by default. To enable it in a service deployed to Kalix, see here. To enable it in a service running on your local machine you need to add the following JAVA_TOOL_OPTIONS to your docker-compose.yml in the base of your project. It’s also necessary to have where to collect the traces. For example, an extra container, like the jaeger below.

docker-compose.yml
---
services:
  kalix-runtime:
    image: ...
    ...
    environment:
      JAVA_TOOL_OPTIONS: >
        -Dkalix.proxy.telemetry.tracing.enabled=true
        -Dkalix.proxy.telemetry.tracing.collector-endpoint=http://jaeger:4317
    ...
  jaeger:
    image: jaegertracing/all-in-one:1.54
    ports:
      - 4317:4317
      - 16686:16686
---

Here the traces are pushed to a jaeger docker image, to the port 4317. And you can check them out at http://localhost:16686.

To create a span and end it over an asynchronous call, you can do the following:

Java
src/main/java/com/example/ControllerAction.java
@Override
public Effect<ControllerActionApi.MessageResponse> callAsyncEndpoint(Empty empty) {
  Tracer tracer = actionContext().getTracer();

  Span span = tracer
            .spanBuilder(url+"/{}")
            .setParent(actionContext().metadata().traceContext().asOpenTelemetryContext())(1)
            .startSpan() (2)
            .setAttribute("post", "1");(3)

  CompletableFuture<ControllerActionApi.MessageResponse> asyncComputation = callAsyncService()
    .whenComplete((response, ex) -> {
      if (ex != null) {
        span
            .setStatus(StatusCode.ERROR, ex.getMessage())(4)
            .end();(5)
      } else {
        span
            .setAttribute("result", response.body().title)(3)
            .end();(5)
      }
    })
    .thenApply(response ->
        ControllerActionApi.MessageResponse.newBuilder().setMessage(response.body().title).build()
    );
  return effects().asyncReply(asyncComputation);
}
1 Sets the action’s TraceContext the parent of this span. Linking the action’s trace to this span.
2 Creates and starts the span.
3 Adds some attribute.
4 Sets the status of the span as error.
5 Closes the span.
Scala
src/main/scala/com/example/ControllerAction.java
override def callAsyncEndpoint(empty: Empty): Action.Effect[MessageResponse] = {
  val tracer = actionContext.getTracer
  val span = tracer.spanBuilder(s"$url/{}")
    .setParent(actionContext.metadata.traceContext.asOpenTelemetryContext)(1)
    .startSpan()(2)
    .setAttribute("post", "1")(3)

    val responseBody: Future[MessageResponse] = callAsync()
    responseBody.onComplete {
      case Failure(exception) =>
        span
          .setStatus(StatusCode.ERROR, exception.getMessage)(4)
          .end()(5)
      case Success(response) =>
        span
          .setAttribute("result", response.message)(3)
          .end()(5)
    }
   effects.asyncReply(responseBody)
}
1 Sets the action’s TraceContext the parent of this span. Linking the action’s trace to this span.
2 Creates and starts the span.
3 Adds some attribute.
4 Sets the status of the span as error.
5 Closes the span. NOTE: You can find how tracing is enabled and more info here.

Unit testing the side effects

The side effects of an Action can be tested in isolation. To test the side effects of DoubleCounterAction, shown on the previous snippet, we can leverage ActionResult new tab. This class has the method getSideEffects() that returns the list of side effects added to the Action.

Java
src/test/java/com/example/actions/DoubleCounterActionTest.java
  @Test
  public void increaseWithSideEffectTest() {
    DoubleCounterActionTestKit testKit = DoubleCounterActionTestKit.of(DoubleCounterAction::new);
    int increase = 3;
    CounterApi.IncreaseValue increaseValueCommand = CounterApi.IncreaseValue.newBuilder()
        .setValue(increase)
        .build();
    ActionResult<Empty> result1 = testKit.increaseWithSideEffect(increaseValueCommand);(1)
    DeferredCallDetails<?, ?> sideEffect = result1.getSideEffects().get(0);(2)
    assertEquals("com.example.CounterService", sideEffect.getServiceName());(3)
    assertEquals("Increase", sideEffect.getMethodName());(4)
    CounterApi.IncreaseValue doubledIncreased =  CounterApi.IncreaseValue.newBuilder()
        .setValue(increase * 2)
        .build();
    assertEquals(doubledIncreased, sideEffect.getMessage());(5)
  }
}
1 Executing the DoubleCounterAction.increase RPC call through the test kit.
2 Retrieving the first side effect. There is only one in DoubleConterAction.increase. It’s worth noting the side effects are DeferredCall objects that represent Kalix RPC services. DeferredCallDetails is the representation of a DeferredCall on the Kalix test kit framework.
3 Retrieving and asserting the name of the service.
4 Retrieving and asserting the RPC’s name of the service.
5 Retrieving and asserting the RPC’s input.
Scala
src/test/scala/com/example/actions/DoubleCounterActionSpec.java
  "DoubleCounterAction" must {
    "handle command IncreaseWithSideEffect" in {
      val testKit = DoubleCounterActionTestKit(new DoubleCounterAction(_))

      val result: ActionResult[Empty] = testKit.increaseWithSideEffect(IncreaseValue(value = 1))(1)
      result.reply shouldBe Empty.defaultInstance

      val sideEffect = result.sideEffects.head (2)
      sideEffect.serviceName shouldBe "com.example.CounterService" (3)
      sideEffect.methodName shouldBe "Increase" (4)
      sideEffect.message shouldBe IncreaseValue(value = 2) (5)
    }
  }
}
1 Executing the DoubleCounterAction.increaseWithSideEffect RPC call through the test kit.
2 Retrieving the first side effect. There is only one in DoubleConterAction.increaseWithSideEffect implementation. It’s worth noting the side effects are DeferredCall objects that represent Kalix RPC services. DeferredCallDetails is the representation of a DeferredCall on the Kalix test kit framework.
3 Retrieving and asserting the name of the service.
4 Retrieving and asserting the RPC’s name of the service.
5 Retrieving and asserting the RPC’s input.

Testing the Action

Unit tests

The following snippet shows how the FibonacciActionTestKit is used to test the FibonacciAction implementation.

Kalix generates the FibonacciActionTestKit that allows us to call the methods of FibonacciAction. For each Action Kalix generates a specific test kit for it, with the name [ActionName]TestKit. Each call we execute over to the test kit returns an ActionResult that holds the effect produced by the underlying action method.

Apart from the test kit Kalix generates test classes based on the Action defined in the .proto files. This is shown in the snippet below.

Actions are unique units of computation where no local state is shared with previous or subsequent calls. The framework doesn’t reuse an Action instance but creates a new one for each command handled and therefore it is also how the test kit behaves.
Java
src/test/java/com/example/actions/FibonacciActionTest.java
public class FibonacciActionTest {

  @Test
  public void nextNumberTest() {
    FibonacciActionTestKit testKit = FibonacciActionTestKit.of(FibonacciAction::new); (1)
    ActionResult<FibonacciApi.Number> result = testKit.nextNumber(FibonacciApi.Number.newBuilder().setValue(5).build()); (2)
    assertEquals(8, result.getReply().getValue()); (3)
  }

}
1 The test kit is created to allow us to test the Action’s method.
2 We call nextNumber method with some value.
3 The reply message from the result is retrieved using getReply().

ActionResult

Calling an action method through the test kit gives us back an ActionResult new tab. This class has methods that we can use to assert our expectations, such as:

  • getReply() returns the reply message passed to effects().reply() or throws an exception failing the test, if the effect returned was not a reply.

  • getError() returns the error description when effects().error() was returned to signal an error.

  • getForward() returns details about what message was forwarded and where the call was forwarded (since it is a unit test the forward is not actually executed).

Scala
src/test/java/com/example/actions/FibonacciActionSpec.scala
class FibonacciActionSpec
  extends AnyWordSpec
    with ScalaFutures
    with Matchers {

  "FibonacciAction" must {

    "handle command NextNumber" in {
      val testKit = FibonacciActionTestKit(new FibonacciAction(_)) (1)
      val result = testKit.nextNumber(Number(5)) (2)
      result.reply shouldBe (Number(8)) (3)
    }
  }
}
1 The test kit is created to allow us to test the Action’s method.
2 We call nextNumber method with some value.
3 The reply message from the result is retrieved using reply.

ActionResult

Calling an action method through the test kit gives us back an ActionResult new tab. This class has methods that we can use to assert our expectations, such as:

  • reply returns the reply message passed to effects.reply() or throws an exception failing the test, if the effect returned was not a reply.

  • errorDescription returns the error description when effects().error() was returned to signal an error.

  • forwardedTo returns details about what message was forwarded and where the call was forwarded (since it is a unit test the forward is not actually executed).

    By default the integration and unit test are both invoked by sbt test. To only run unit tests run sbt -DonlyUnitTest test, or sbt -DonlyUnitTest=true test, or set up that value to true in the sbt session by set onlyUnitTest := true and then run test

Unit tests (with cross-component calls)

Testing an Action serving as a controller, or more generally, one that depends on calling other components, requires that a mock registry containing the mocks to be used be provided to TestKit. Later, at runtime, the TestKit will try to find the appropriate mock object it needs by matching those with the dependency component’s class type.

So, let’s say we want to test the previous example where we rely on 2 external calls to create and populate the shopping cart before replying. A unit test for such action method would look like:

Java
src/test/java/com/example/shoppingcart/ShoppingCartActionImplTest.java
public class ShoppingCartActionImplTest {

  @Mock
  private ShoppingCartService shoppingCartService; (1)

  @Test
  public void prePopulatedCartTest() throws ExecutionException, InterruptedException, TimeoutException {
    when(shoppingCartService.create(notNull())) (2)
        .thenReturn(CompletableFuture.completedFuture(Empty.getDefaultInstance()));
    when(shoppingCartService.addItem(any()))
        .thenReturn(CompletableFuture.completedFuture(Empty.getDefaultInstance()));
    var mockRegistry = MockRegistry.create().withMock(ShoppingCartService.class, shoppingCartService); (3)

    var service = ShoppingCartActionImplTestKit.of(ShoppingCartActionImpl::new, mockRegistry); (4)
    var result = service.createPrePopulated(NewCart.getDefaultInstance()).getAsyncResult();
    var reply = ((CompletableFuture<ActionResult<NewCartCreated>>) result)
        .get(1, TimeUnit.SECONDS)
        .getReply();

    // assertions go here
  }
}
1 First step is to declare our mock object. In this example, shoppingCartService is a @Mock object by Mockito framework.
2 We start by configuring our mock service how to reply to the two calls: create and addItem.
3 Then we use the TestKit-provided MockRegistry to initialize and add shoppingCartService to serve as a mock for class type ShoppingCartService.
4 Finally, we just need to pass the mockRegistry while initializing the ShoppingCartActionImplTestKit and the TestKit will make sure to try to find our mock object when it needs.
Scala
src/main/test/com/example/shoppingcart/ShoppingCartActionImplSpec.scala
class ShoppingCartActionImplSpec
    extends AsyncWordSpec
    with Matchers
    with AsyncMockFactory {

  "ShoppingCartActionImpl" must {

    "create a prepopulated cart" in {
      val mockShoppingCart = stub[ShoppingCartService] (1)
      (mockShoppingCart.create _) (2)
        .when(*)
        .returns(Future.successful(Empty.defaultInstance))
      (mockShoppingCart.addItem _)
        .when(where { (li: AddLineItem) => li.name == "eggplant"})
        .returns(Future.successful(Empty.defaultInstance))
      val mockRegistry = MockRegistry.withMock(mockShoppingCart) (3)

      val service = ShoppingCartActionImplTestKit(new ShoppingCartActionImpl(_), mockRegistry) (4)
      val cartId = service.createPrePopulated(NewCart.defaultInstance).asyncResult

      // assertions go here
    }
  }
}
1 First step is to declare our mock object. In this case, shoppingCartService is a stub object provided by ScalaMock framework.
2 Then we configure our mock service how to reply to the two calls: create and addItem.
3 We use the TestKit-provided MockRegistry to initialize and add shoppingCartService to serve as a mock for class type ShoppingCartService.
4 Finally, we just need to pass the mockRegistry while initializing the ShoppingCartActionImplTestKit and the TestKit will make sure to try to find our mock object when it needs.