Implementing Actions
Actions are stateless functions that can be used to implement different uses cases, such as:
-
a pure function.
-
request conversion - you can use Actions to convert incoming data into a different format before forwarding a call to a different component.
-
as a face or controller to fan out to multiple calls to different components.
-
publish messages to a Topic.
-
subscribe to events from an Event Sourced Entity.
-
subscribe to state changes from a Value Entity.
-
schedule and cancel Timers.
Actions can be triggered in multiple ways. For example, by:
-
a gRPC service call.
-
an HTTP service call.
-
a forwarded call from another component.
-
a scheduled call from a Timer.
-
an incoming message from a Topic.
-
an incoming event from an Event Sourced Entity, from within the same service or from a different service.
-
state changes notification from a Value Entity on the same service.
-
a service life-cycle event (e.g. on startup).
Defining the proto
file
An Action may implement any service method defined in a Protobuf definition. In this first example, we will show how to
implement an Action as a pure stateless function. We will define a FibonacciAction
that takes a number and return the
next number in the Fibonacci series.
- Java
-
src/main/proto/com/example/fibonacci/fibonacci.proto
syntax = "proto3"; package com.example.fibonacci; (1) import "kalix/annotations.proto"; (2) option java_outer_classname = "FibonacciApi"; (3) message Number { int64 value = 1; } service Fibonacci { option (kalix.codegen) = { action: {} (4) }; rpc NextNumber(Number) returns (Number) {} }
1 Any classes generated from this protobuf file will be in the com.example.fibonacci
package.2 Import the Kalix protobuf annotations or options. 3 Let the messages declared in this protobuf file be inner classes to the Java class FibonacciApi
.4 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. - Scala
-
src/main/proto/com/example/fibonacci/fibonacci.proto
syntax = "proto3"; package com.example.fibonacci; (1) import "kalix/annotations.proto"; (2) message Number { int64 value = 1; } service Fibonacci { option (kalix.codegen) = { action: {} (3) }; rpc NextNumber(Number) returns (Number) {} }
1 Any classes generated from this protobuf file will be in the com.example.fibonacci
package.2 Import the Kalix protobuf annotations or options. 3 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.
Action’s Effect API
The Action’s Effect defines the operations that Kalix should perform when an incoming message is handled by an Action.
An Action Effect can either:
-
reply with a message to the caller
-
reply with a message to be published to a topic (in case the method is a publisher)
-
forward the message to another component
-
return an error
-
ignore the call
See also Understanding what an Effect is
Implementing the Action
An Action implementation is a class where you define how each message is handled. The class
FibonacciAction
gets generated for us based on the proto file defined above. Once the
FibonacciAction.java
FibonacciAction.scala
file exists, it is not overwritten, so you can freely add logic to it.
FibonacciAction
extends the generated class AbstractFibonacciAction
which we’re
not supposed to change as it gets regenerated in case we update the protobuf descriptors.
AbstractFibonacciAction
contains all method signatures corresponding to the API of the service.
If you change the API you will see compilation errors in the FibonacciAction
class, and you have to
implement the methods required by AbstractFibonacciAction
.
- Java
-
src/main/java/com/example/fibonacci/FibonacciAction.java
public class FibonacciAction extends AbstractFibonacciAction { (1) public FibonacciAction(ActionCreationContext creationContext) { } /** * Handler for "NextNumber". */ @Override public Effect<FibonacciApi.Number> nextNumber(FibonacciApi.Number number) { (2) throw new RuntimeException("The command handler for `NextNumber` is not implemented, yet"); } }
1 Extends the generated AbstractFibonacciAction
, which extendsAction
.2 A nextNumber
method is generated. We will implement it next. - Scala
-
src/main/scala/com/example/fibonacci/FibonacciAction.scala
class FibonacciAction(creationContext: ActionCreationContext) extends AbstractFibonacciAction { (1) override def nextNumber(number: Number): Action.Effect[Number] = { (2) throw new RuntimeException("The command handler for `NextNumber` is not implemented, yet") } }
1 Extends the generated AbstractFibonacciAction
, which extendsAction
.2 A nextNumber
method is generated. We will implement it next.
Next, we can implement nextNumber
method to complete our Action.
- Java
-
src/main/java/com/example/fibonacci/FibonacciAction.java
private boolean isFibonacci(long num) { (1) Predicate<Long> isPerfectSquare = (n) -> { long square = (long) Math.sqrt(n); return square*square == n; }; return isPerfectSquare.test(5*num*num + 4) || isPerfectSquare.test(5*num*num - 4); } private long nextFib(long num) { double result = num * (1 + Math.sqrt(5)) / 2.0; return Math.round(result); } @Override public Effect<FibonacciApi.Number> nextNumber(FibonacciApi.Number number) { long num = number.getValue(); if (isFibonacci(num)) { (2) long nextFib = nextFib(num); FibonacciApi.Number response = FibonacciApi.Number .newBuilder() .setValue(nextFib) .build(); return effects().reply(response); } else { return effects() (3) .error("Input number is not a Fibonacci number, received '" + num + "'"); } }
1 We add two private methods to support the computation. isFibonacci
checks if a number is a Fibonacci number andnextFib
calculates the next number.2 The nextNumber
implementation first checks if the input number belongs to the Fibonacci series. If so, it calculates the next number and builds a reply usingeffects().reply()
.3 Otherwise, if the input number doesn’t belong to the Fibonacci series, it builds an Effect
reply error. - Scala
-
src/main/scala/com/example/fibonacci/FibonacciAction.scala
private def isFibonacci(num: Long): Boolean = { (1) val isPerfectSquare = (n: Long) => { val square = Math.sqrt(n.toDouble).toLong square * square == n } isPerfectSquare(5 * num * num + 4) || isPerfectSquare(5 * num * num - 4) } private def nextFib(num: Long): Long = { val result = num * (1 + Math.sqrt(5)) / 2.0; Math.round(result) } override def nextNumber(number: Number): Action.Effect[Number] = { val num = number.value if (isFibonacci(num)) (2) effects.reply(Number(nextFib(num))) else effects.error(s"Input number is not a Fibonacci number, received '$num'") (3) }
1 We add two private methods to support the computation. isFibonacci
checks if a number is a Fibonacci number andnextFib
calculates the next number.2 The nextNumber
implementation first checks if the input number belongs to the Fibonacci series. If so, it calculates the next number and builds a reply usingeffects.reply()
.3 Otherwise, if the input number doesn’t belong to the Fibonacci series, it builds an Effect
reply error.
Multiple replies / reply streaming
An Action may return data conditionally by marking the return type as stream
in Protobuf. The Java method implementing
that service must return an Akka Streams Source
to fulfill that contract.
The Source may publish an arbitrary number of replies.
Registering the Action
To make Kalix aware of the Action, we need to register it with the service.
From the code-generation, the registration gets automatically inserted in the generated KalixFactory.withComponents
method from the Main
class.
- Java
-
/src/main/java/com/example/Main.java
/* This code was generated by Kalix tooling. * As long as this file exists it will not be re-generated. * You are free to make changes to this file. */ package com.example; import kalix.javasdk.Kalix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.example.fibonacci.FibonacciAction; public final class Main { private static final Logger LOG = LoggerFactory.getLogger(Main.class); public static Kalix createKalix() { // The KalixFactory automatically registers any generated Actions, Views or Entities, // and is kept up-to-date with any changes in your protobuf definitions. // If you prefer, you may remove this and manually register these components in a // `new Kalix()` instance. return KalixFactory.withComponents( FibonacciAction::new); } public static void main(String[] args) throws Exception { LOG.info("starting the Kalix service"); createKalix().start(); } }
- Scala
-
/src/main/scala/com/example/fibonacci/Main.scala
package com.example.fibonacci import kalix.scalasdk.Kalix import org.slf4j.LoggerFactory object Main { private val log = LoggerFactory.getLogger("com.example.fibonacci.Main") def createKalix(): Kalix = { // The KalixFactory automatically registers any generated Actions, Views or Entities, // and is kept up-to-date with any changes in your protobuf definitions. // If you prefer, you may remove this and manually register these components in a // `Kalix()` instance. KalixFactory.withComponents( new FibonacciAction(_)) } def main(args: Array[String]): Unit = { log.info("starting the Kalix service") createKalix().start() } }
By default, the generated constructor has an ActionCreationContext
parameter, but you can change this to accept other parameters.
If you change the constructor of the FibonacciAction
class you will see a compilation error here, and you have to adjust the
factory function that is passed to KalixFactory.withComponents
.
When more components are added the KalixFactory
is regenerated, and you have to adjust the registration from the Main
class.
Actions as Controllers
Actions can be used to implement MVC Controllers by acting as the external interface of a service, receiving requests, operating over the requests values and forwarding the call to other components in the same service.
To illustrate how you can use an Action as a Controller, we will build on top of the Value Entity Shopping Cart example, adding a new Action to the existing shopping cart service.
Forwarding Commands
The forward
effect allows us to transform or further validate an incoming request before passing it on to another
component and have the response message directly passed back to the client making the request. The response from the
forwarded operation must have the same response type as the original request.
In this example we accept the same command as the entity, AddLineItem
, but add some additional verification of the
request and only conditionally forward the request to the entity if the verification is successful:
- Java
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
@Override public Effect<Empty> verifiedAddItem(ShoppingCartApi.AddLineItem addLineItem) { if (addLineItem.getName().equalsIgnoreCase("carrot")) { (1) return effects().error("Carrots no longer for sale"); (2) } else { DeferredCall<ShoppingCartApi.AddLineItem, Empty> call = components().shoppingCart().addItem(addLineItem); (3) return effects().forward(call); (4) } }
1 Check if the added item is carrots. 2 If it is "carrots" immediately return an error, disallowing adding the item. 3 For allowed requests, use components().shoppingCart().addItem()
to get aDeferredCall
.4 The DeferredCall
is used witheffects().forward()
to forward the request to the entity. - Scala
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
override def verifiedAddItem(addLineItem: AddLineItem): Effect[Empty] = if (addLineItem.name.equalsIgnoreCase("carrot")) (1) effects.error("Carrots no longer for sale") (2) else { val call = components.shoppingCart.addItem(addLineItem) (3) effects.forward(call) (4) }
1 Check if the added item is carrots. 2 If it is "carrots" immediately return an error, disallowing adding the item. 3 For allowed requests, use components.shoppingCart.addItem()
to get aDeferredCall
.4 The DeferredCall
is used witheffects.forward()
to forward the request to the entity.
Forwarding Headers
By default, Kalix does not forward gRPC/HTTP headers to Kalix components. This can be overridden with component options.
- Java
-
src/main/java/com/example/shoppingcart/Main.java
public final class Main { public static Kalix createKalix() { Kalix kalix = new Kalix(); ActionOptions forwardHeaders = ActionOptions.defaults() .withForwardHeaders(Set.of("UserRole")); (1) return kalix .register(ShoppingCartActionProvider.of(ShoppingCartActionImpl::new). withOptions(forwardHeaders)) (2) } public static void main(String[] args) throws Exception { LOG.info("starting the Kalix service"); createKalix().start(); } }
1 Specify headers names that should be forwarded to this component, with withForwardHeaders
options builder method.2 Register component with custom options. - Scala
-
src/main/java/com/example/shoppingcart/Main.scala
object Main { def createKalix(): Kalix = { val kalix = Kalix() val forwardHeaders = ActionOptions.defaults .withForwardHeaders(Set("UserRole")) (1) kalix .register(ShoppingCartActionProvider(new ShoppingCartActionImpl(_)) .withOptions(forwardHeaders)) (2) } def main(args: Array[String]): Unit = { log.info("starting the Kalix service") createKalix().start() } }
1 Specify headers names that should be forwarded to this component, with withForwardHeaders
options builder method.2 Register component with custom options.
Accessing and forwarding headers to other component is done via context Metadata
.
- Java
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
@Override public Effect<Empty> removeCart(ShoppingCartApi.RemoveShoppingCart removeShoppingCart) { var userRole = actionContext().metadata().get("UserRole").get(); (1) Metadata metadata = Metadata.EMPTY.add("Role", userRole); return effects().forward( components().shoppingCart().removeCart(removeShoppingCart) .withMetadata(metadata) (2) ); }
1 Get the header value from context Metadata
.2 Forward different header to next call, by using withMetadata
method of fromDeferredCall
class. - Scala
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
override def removeCart(removeShoppingCart: RemoveShoppingCart): Effect[Empty] = { val userRole = actionContext.metadata.get("UserRole").get (1) val metadata = Metadata.empty.add("Role", userRole) effects.forward( components.shoppingCart .removeCart(removeShoppingCart) .withMetadata(metadata)) (2) }
1 Get the header value from context Metadata
.2 Forward different header to next call, by using withMetadata
method of fromDeferredCall
class.
Transform Request and Response to Another Component
The asyncReply
and asyncEffect
effects allow us to process and transform a request before calling another component and then also transform the response.
As an example, let us look at the problem of creating a new entity with an id generated by another component.
In this example we implement an Initialize
command for the controller Action which returns the message NewCartCreated
with the entity id that can subsequently be used to interact with the cart.
- Java
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
@Override public Effect<ShoppingCartController.NewCartCreated> initializeCart(ShoppingCartController.NewCart newCart) { final String cartId = UUID.randomUUID().toString(); (1) CompletionStage<Empty> shoppingCartCreated = components().shoppingCart().create(ShoppingCartApi.CreateCart.newBuilder().setCartId(cartId).build()) (2) .execute(); (3) // transform response CompletionStage<Effect<ShoppingCartController.NewCartCreated>> effect = shoppingCartCreated.handle((empty, error) -> { (4) if (error == null) { return effects().reply(ShoppingCartController.NewCartCreated.newBuilder().setCartId(cartId).build()); (5) } else { return effects().error("Failed to create cart, please retry"); (6) } }); return effects().asyncEffect(effect); (7) }
1 We generate a new UUID. 2 We use components().shoppingCart().create(…)
to create aDeferredCall
forcreate
on the shopping cart.3 execute()
on theDeferredCall
immediately triggers a call and returns aCompletionStage
for the response.4 Once the call succeeds or fails the CompletionStage
is completed or failed, we can transform the result fromCompletionStage<Empty>
toCompletionStage<Effect<NewCartCreated>>
usinghandle
.5 On a successful response, we create a reply effect with a NewCartCreated
.6 If the call leads to an error, we create an error effect asking the client to retry. 7 effects().asyncEffect()
allows us to reply with aCompletionStage<Effect<NewCartCreated>>
. - Scala
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
override def initializeCart(newCart: NewCart): Effect[NewCartCreated] = { val cartId = UUID.randomUUID().toString (1) val created: Future[Empty] = components.shoppingCart.create(CreateCart(cartId)).execute() (2) val effect: Future[Effect[NewCartCreated]] = (3) created .map(_ => effects.reply(NewCartCreated(cartId))) (4) .recover(_ => effects.error("Failed to create cart, please retry")) (5) effects.asyncEffect(effect) (6) }
1 We generate a new UUID. 2 We use components.shoppingCart.create(…)
to create aDeferredCall
forcreate
on the shopping cart.3 execute()
on theDeferredCall
immediately triggers a call and returns aFuture
for the response.4 On a successful response, we map
theEmpty
reply to a reply effect with the replyNewCartCreated
.5 If the call leads to an error, we recover
and return an error effect asking the client to retry.6 effects.asyncEffect()
allows us to reply with aFuture[Effect[NewCartCreated]]
rather than a reply we already have created.
The action generates a UUID to use as entity id for the shopping cart. UUIDs are extremely unlikely to lead to the same id being generated, but to completely guarantee two calls can never be assigned the same shopping cart we make use of the "boundary of consistency" provided by the entity - the entity will only process a single command at a time and can safely make decisions based on its state - for example to only allow creation once by storing something in its state signifying that it has been created.
In this case we mark that the entity has been created using a creation timestamp in the shopping cart state stored on first
create
call - when the timestamp has the default value of 0
). If the cart has already been stored with a timestamp we return an error effect:
- Java
-
src/main/java/com/example/shoppingcart/domain/ShoppingCart.java
@Override public Effect<Empty> create(ShoppingCartDomain.Cart currentState, ShoppingCartApi.CreateCart createCart) { if (currentState.getCreationTimestamp() > 0L) { return effects().error("Cart was already created"); } else { return effects().updateState(currentState.toBuilder().setCreationTimestamp(Instant.now().toEpochMilli()).build()) .thenReply(Empty.getDefaultInstance()); } }
- Scala
-
src/main/java/com/example/shoppingcart/domain/ShoppingCart.scala
override def create(currentState: Cart, createCart: shoppingcart.CreateCart): ValueEntity.Effect[Empty] = if (currentState.creationTimestamp > 0) effects.error("Cart was already created") else effects.updateState(currentState.copy(creationTimestamp = Instant.now().toEpochMilli)) .thenReply(Empty.defaultInstance)
Composing calls
The async call shown in the previous section, can also be used to chain or compose multiple calls to a single action response.
In this example we build on the previous cart creation by adding an initial item in the cart once it has been created, but before we return the new id to the client:
- Java
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
@Override public Effect<ShoppingCartController.NewCartCreated> createPrePopulated(ShoppingCartController.NewCart newCart) { final String cartId = UUID.randomUUID().toString(); CompletionStage<Empty> shoppingCartCreated = components().shoppingCart().create(ShoppingCartApi.CreateCart.newBuilder().setCartId(cartId).build()) .execute(); CompletionStage<Empty> cartPopulated = shoppingCartCreated.thenCompose(empty -> { (1) ShoppingCartApi.AddLineItem initialItem = (2) ShoppingCartApi.AddLineItem.newBuilder() .setCartId(cartId) .setProductId("e") .setName("eggplant") .setQuantity(1) .build(); return components().shoppingCart().addItem(initialItem).execute(); (3) }); CompletionStage<ShoppingCartController.NewCartCreated> reply = cartPopulated.thenApply(empty -> (4) ShoppingCartController.NewCartCreated.newBuilder().setCartId(cartId).build() ); return effects().asyncReply(reply); (5) }
1 CompletionStage#thenCompose
allow us to perform an additional async operation, returning aCompletionStage
once the current one completes successfully.2 Create a request to add an initial item to the cart. 3 Executing the addItem
call returns aCompletionStage<Empty>
once it succeeds.4 handle
allows us to transform the successful completion ofaddItem
withEmpty
to the response type of this method -NewCartCreated
.5 effects().asyncReply()
lets us reply once theCompletionStage<NewCartCreated>
completes. - Scala
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
def createPrePopulated(newCart: NewCart): Effect[NewCartCreated] = { val cartId = UUID.randomUUID().toString val reply: Future[NewCartCreated] = for { (1) created <- components.shoppingCart.create(CreateCart(cartId)).execute() populated <- components.shoppingCart.addItem(AddLineItem(cartId, "e", "eggplant", 1)).execute() } yield NewCartCreated(cartId) (2) effects.asyncReply(reply) (3) }
1 For comprehensions (or directly using flatMap
) allow us to compose the individual async steps returningFuture
.2 Once both steps have completed, create a NewCartCreated
leading to aFuture[NewCartCreated]
coming out of the for-comprehension.3 effect.asyncReply
lets us reply once theFuture[NewCartCreated]
completes.
In this sample it is safe to base a subsequent call to the entity on the reply of the previous one, no client will know
of the cart id until createPrePopulated
replies.
For many other use cases it is important to understand that there is no transaction or consistency boundary outside of the entity, so for a sequence of calls from an action to an entity, the state of the entity could be updated by other calls it receives in-between.
For example, imagine an action that for a cart id retrieves the state using getState
to verify if too many items are
already in the cart, and once that has been verified, it adds the item to the cart.
- Java
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.java
@Override public Effect<Empty> unsafeValidation(ShoppingCartApi.AddLineItem addLineItem) { // NOTE: This is an example of an anti-pattern, do not copy this CompletionStage<ShoppingCartApi.Cart> cartReply = components().shoppingCart().getCart( ShoppingCartApi.GetShoppingCart.newBuilder() .setCartId(addLineItem.getCartId()) .build()) .execute(); (1) CompletionStage<Effect<Empty>> effect = cartReply.thenApply(cart -> { int totalCount = cart.getItemsList().stream() .mapToInt(ShoppingCartApi.LineItem::getQuantity) .sum(); if (totalCount < 10) { return effects().error("Max 10 items in a cart"); } else { return effects().forward(components().shoppingCart().addItem(addLineItem)); (2) } }); return effects().asyncEffect(effect); }
1 Between this call returning. 2 And this next call to the same entity, the entity could accept other commands that change the total count of items in the cart. - Scala
-
src/main/java/com/example/shoppingcart/ShoppingCartActionImpl.scala
override def unsafeValidation(addLineItem: AddLineItem): Effect[Empty] = { // NOTE: This is an example of an anti-pattern, do not copy this val cartReply = components.shoppingCart .getCart(GetShoppingCart(addLineItem.cartId)) .execute() (1) val effect = cartReply.map { cart => val totalCount = cart.items.map(_.quantity).sum if (totalCount < 10) effects.error("Max 10 items in a cart") else effects.forward(components.shoppingCart.addItem(addLineItem)) (2) } effects.asyncEffect(effect) }
1 Between this call returning. 2 And this next call to the same entity, the entity could accept other commands that changes the total count of items in the cart.
The problem with this is that an addItem
call directly to the entity happening between the getState
action returning and the addItem
call from the action would lead to more items in the cart than the allowed limit.
Such validation that depends on state can only be done safely while handling the command inside the entity.
Actions as Life-cycle Hooks
An Action’s gRPC method can be triggered automatically when some predefined service life-cycle event happens (currently, only on startup is available), serving as a custom hook. For such use, the method needs to be annotated with a specific flag and its input type must be google.protobuf.Empty
as shown below.
The on startup hook is called every time a service instance boots up. This can happen for very different reasons: restarting / redeploying the service, scaling up to more instances or even without any user-driven action (e.g. Kalix Runtime versions being rolled out, infrastructure-related incidents, etc.). Therefore, you should carefully consider how you use this hook and its implementation. |
service OnStartup {
option (kalix.codegen) = {
action: {} (1)
};
rpc Init (google.protobuf.Empty) returns (google.protobuf.Empty) { (2)
option (kalix.method).trigger = {
on: STARTUP, (3)
max_retries: 3 (4)
};
}
}
1 | Only methods belonging to an Action can be configured as a hook. |
2 | The method must receive google.protobuf.Empty as input type. |
3 | This hook will be triggered once the instance startup is concluded (i.e. will be called 3 times if 3 instances are running). |
4 | Optionally, set the max amount of retries to 3. |
If the call to the hook returns a failure and the max_retries is set to a value greater than the default value (0 ), a number of retry calls will be executed with a fixed delay up until the configure amount is reached.
|
Running Side Effects
Emitting effects on another component
An Entity or an Action may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.
There is no guarantee that a side effect will be executed successfully. If a failure occurs after the command is fully handled, effects might not be executed. Side effects are not retried in case of failures.
Side effects may be declared as synchronous or asynchronous. Asynchronous commands run in a "fire and forget" fashion. The code flow of the caller (the command handler of the entity which emitted the asynchronous command) continues while the command is being asynchronously processed. Meanwhile, synchronous commands run sequentially, that is, the commands are processed in order, one at a time. The final result of the command handler, either a reply or a forward, is not sent until all synchronous commands are completed.
Emitting a side effect
To illustrate how you can emit a side effect, we can build on top of the Action as a Controller example. In that previous example, we build a controller around the Value Entity Counter and forwarded the incoming request after modifying it.
This time, instead of using a forward
, we will call the entity using a side effect.
- Java
-
src/main/proto/com/example/actions/double-counter.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/counter_api.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "DoubleCounterApi"; service DoubleCounter { option (kalix.codegen) = { action: {} (2) }; rpc Increase (com.example.IncreaseValue) returns (google.protobuf.Empty); (3) rpc IncreaseWithSideEffect (com.example.IncreaseValue) returns (google.protobuf.Empty); (4) rpc forwardWithGrpcApi (com.example.IncreaseValue) returns (google.protobuf.Empty); rpc sequentialComposition (com.example.IncreaseValue) returns (com.example.CurrentCounter); rpc sumOfMy3FavouriteCounterValues (google.protobuf.Empty) returns (com.example.CurrentCounter); }
1 Import the Counter API definition. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The Action mimics the Counter API with a forward. 4 The Action mimics the Counter API with a side effect. - Scala
-
src/main/proto/com/example/actions/double-counter.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/counter_api.proto"; (1) import "google/protobuf/empty.proto"; service DoubleCounter { option (kalix.codegen) = { action: {} (2) }; rpc Increase (com.example.IncreaseValue) returns (google.protobuf.Empty); (3) rpc IncreaseWithSideEffect (com.example.IncreaseValue) returns (google.protobuf.Empty); (4) }
1 Import the Counter API definition. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The Action mimics the Counter API with a forward. 4 The Action mimics the Counter API with a side effect.
Implementing the Action
The class DoubleCounterAction
gets generated for us based on the same proto file defined in Action as a Controller.
/**
* An action.
*/
public class DoubleCounterAction extends AbstractDoubleCounterAction {
public DoubleCounterAction(ActionCreationContext creationContext) {
}
// Handler for "Increase" not shown in this snippet
/**
* Handler for "IncreaseWithSideEffect".
*/
@Override
public Effect<Empty> increaseWithSideEffect(CounterApi.IncreaseValue increaseValue) {
int doubled = increaseValue.getValue() * 2;
CounterApi.IncreaseValue increaseValueDoubled =
increaseValue.toBuilder().setValue(doubled).build(); (1)
return effects()
.reply(Empty.getDefaultInstance()) (2)
.addSideEffect( (3)
SideEffect.of(components().counter().increase(increaseValueDoubled)));
}
}
1 | On incoming requests, we double the value of IncreaseValue . |
2 | We build a reply using Empty.getDefaultInstance() . |
3 | And we attach a side effect to it. The side effect is the call to the Counter entity. |
Please note that, the result of a side effect is ignored by the current command meaning that even if the call to
the Counter
entity fails, the Action
reply will succeed.
Adding tracing spans
- Java
-
actionContext()
gives you theActionContext
. That is, some methods that apply to the context of the action’s request.src/main/java/com/example/ControllerAction.javaTracer tracer = actionContext().getTracer();
- Scala
-
actionContext()
gives you theActionContext
. That is, some methods that apply to the context of the action’s request.src/main/scala/com/example/ControllerAction.javaval tracer = actionContext.getTracer
If tracing is enabled, you will get a tracer that actually creates span and exports data as expected. But if tracing is not enabled, you will get a no operational tracer instead, which will not create traces. |
Trace generation is disabled by default. To enable it in a service deployed to Kalix, see here
.
To enable it in a service running on your local machine you need to add the following JAVA_TOOL_OPTIONS
to your docker-compose.yml
in the base of your project.
It’s also necessary to have where to collect the traces. For example, an extra container, like the jaeger
below.
---
services:
kalix-runtime:
image: ...
...
environment:
JAVA_TOOL_OPTIONS: >
-Dkalix.proxy.telemetry.tracing.enabled=true
-Dkalix.proxy.telemetry.tracing.collector-endpoint=http://jaeger:4317
...
jaeger:
image: jaegertracing/all-in-one:1.54
ports:
- 4317:4317
- 16686:16686
---
Here the traces are pushed to a jaeger
docker image, to the port 4317
. And you can check them out at http://localhost:16686
.
To create a span and end it over an asynchronous call, you can do the following:
- Java
-
src/main/java/com/example/ControllerAction.java
@Override public Effect<ControllerActionApi.MessageResponse> callAsyncEndpoint(Empty empty) { Tracer tracer = actionContext().getTracer(); Span span = tracer .spanBuilder(url+"/{}") .setParent(actionContext().metadata().traceContext().asOpenTelemetryContext())(1) .startSpan() (2) .setAttribute("post", "1");(3) CompletableFuture<ControllerActionApi.MessageResponse> asyncComputation = callAsyncService() .whenComplete((response, ex) -> { if (ex != null) { span .setStatus(StatusCode.ERROR, ex.getMessage())(4) .end();(5) } else { span .setAttribute("result", response.body().title)(3) .end();(5) } }) .thenApply(response -> ControllerActionApi.MessageResponse.newBuilder().setMessage(response.body().title).build() ); return effects().asyncReply(asyncComputation); }
1 Sets the action’s TraceContext the parent of this span. Linking the action’s trace to this span. 2 Creates and starts the span. 3 Adds some attribute. 4 Sets the status of the span as error. 5 Closes the span. - Scala
-
src/main/scala/com/example/ControllerAction.java
override def callAsyncEndpoint(empty: Empty): Action.Effect[MessageResponse] = { val tracer = actionContext.getTracer val span = tracer.spanBuilder(s"$url/{}") .setParent(actionContext.metadata.traceContext.asOpenTelemetryContext)(1) .startSpan()(2) .setAttribute("post", "1")(3) val responseBody: Future[MessageResponse] = callAsync() responseBody.onComplete { case Failure(exception) => span .setStatus(StatusCode.ERROR, exception.getMessage)(4) .end()(5) case Success(response) => span .setAttribute("result", response.message)(3) .end()(5) } effects.asyncReply(responseBody) }
1 Sets the action’s TraceContext the parent of this span. Linking the action’s trace to this span. 2 Creates and starts the span. 3 Adds some attribute. 4 Sets the status of the span as error. 5 Closes the span. NOTE: You can find how tracing is enabled and more info here.
Unit testing the side effects
The side effects of an Action can be tested in isolation. To test the side effects of DoubleCounterAction
, shown on the previous snippet, we can leverage ActionResult
. This class has the method getSideEffects()
that returns the list of side effects added to the Action
.
- Java
-
src/test/java/com/example/actions/DoubleCounterActionTest.java
@Test public void increaseWithSideEffectTest() { DoubleCounterActionTestKit testKit = DoubleCounterActionTestKit.of(DoubleCounterAction::new); int increase = 3; CounterApi.IncreaseValue increaseValueCommand = CounterApi.IncreaseValue.newBuilder() .setValue(increase) .build(); ActionResult<Empty> result1 = testKit.increaseWithSideEffect(increaseValueCommand);(1) DeferredCallDetails<?, ?> sideEffect = result1.getSideEffects().get(0);(2) assertEquals("com.example.CounterService", sideEffect.getServiceName());(3) assertEquals("Increase", sideEffect.getMethodName());(4) CounterApi.IncreaseValue doubledIncreased = CounterApi.IncreaseValue.newBuilder() .setValue(increase * 2) .build(); assertEquals(doubledIncreased, sideEffect.getMessage());(5) } }
1 Executing the DoubleCounterAction.increase
RPC call through the test kit.2 Retrieving the first side effect. There is only one in DoubleConterAction.increase
. It’s worth noting the side effects areDeferredCall
objects that represent Kalix RPC services.DeferredCallDetails
is the representation of aDeferredCall
on the Kalix test kit framework.3 Retrieving and asserting the name of the service. 4 Retrieving and asserting the RPC’s name of the service. 5 Retrieving and asserting the RPC’s input. - Scala
-
src/test/scala/com/example/actions/DoubleCounterActionSpec.java
"DoubleCounterAction" must { "handle command IncreaseWithSideEffect" in { val testKit = DoubleCounterActionTestKit(new DoubleCounterAction(_)) val result: ActionResult[Empty] = testKit.increaseWithSideEffect(IncreaseValue(value = 1))(1) result.reply shouldBe Empty.defaultInstance val sideEffect = result.sideEffects.head (2) sideEffect.serviceName shouldBe "com.example.CounterService" (3) sideEffect.methodName shouldBe "Increase" (4) sideEffect.message shouldBe IncreaseValue(value = 2) (5) } } }
1 Executing the DoubleCounterAction.increaseWithSideEffect
RPC call through the test kit.2 Retrieving the first side effect. There is only one in DoubleConterAction.increaseWithSideEffect
implementation. It’s worth noting the side effects areDeferredCall
objects that represent Kalix RPC services.DeferredCallDetails
is the representation of aDeferredCall
on the Kalix test kit framework.3 Retrieving and asserting the name of the service. 4 Retrieving and asserting the RPC’s name of the service. 5 Retrieving and asserting the RPC’s input.
Testing the Action
Unit tests
The following snippet shows how the FibonacciActionTestKit
is used to test the FibonacciAction
implementation.
Kalix generates the FibonacciActionTestKit
that allows us to call the methods of FibonacciAction
. For each Action
Kalix generates a specific test kit for it, with the name [ActionName]TestKit
. Each call we execute over to the test kit returns an ActionResult
that holds the effect produced by the underlying action method.
Apart from the test kit Kalix generates test classes based on the Action defined in the .proto
files. This is shown in the snippet below.
Actions are unique units of computation where no local state is shared with previous or subsequent calls. The framework doesn’t reuse an Action instance but creates a new one for each command handled and therefore it is also how the test kit behaves. |
- Java
-
src/test/java/com/example/actions/FibonacciActionTest.java
public class FibonacciActionTest { @Test public void nextNumberTest() { FibonacciActionTestKit testKit = FibonacciActionTestKit.of(FibonacciAction::new); (1) ActionResult<FibonacciApi.Number> result = testKit.nextNumber(FibonacciApi.Number.newBuilder().setValue(5).build()); (2) assertEquals(8, result.getReply().getValue()); (3) } }
1 The test kit is created to allow us to test the Action’s method. 2 We call nextNumber
method with some value.3 The reply message from the result is retrieved using getReply()
.ActionResult
Calling an action method through the test kit gives us back an
ActionResult
. This class has methods that we can use to assert our expectations, such as:-
getReply()
returns the reply message passed toeffects().reply()
or throws an exception failing the test, if the effect returned was not a reply. -
getError()
returns the error description wheneffects().error()
was returned to signal an error. -
getForward()
returns details about what message was forwarded and where the call was forwarded (since it is a unit test the forward is not actually executed).
-
- Scala
-
src/test/java/com/example/actions/FibonacciActionSpec.scala
class FibonacciActionSpec extends AnyWordSpec with ScalaFutures with Matchers { "FibonacciAction" must { "handle command NextNumber" in { val testKit = FibonacciActionTestKit(new FibonacciAction(_)) (1) val result = testKit.nextNumber(Number(5)) (2) result.reply shouldBe (Number(8)) (3) } } }
1 The test kit is created to allow us to test the Action’s method. 2 We call nextNumber
method with some value.3 The reply message from the result is retrieved using reply
.ActionResult
Calling an action method through the test kit gives us back an
ActionResult
. This class has methods that we can use to assert our expectations, such as:-
reply
returns the reply message passed toeffects.reply()
or throws an exception failing the test, if the effect returned was not a reply. -
errorDescription
returns the error description wheneffects().error()
was returned to signal an error. -
forwardedTo
returns details about what message was forwarded and where the call was forwarded (since it is a unit test the forward is not actually executed).By default the integration and unit test are both invoked by
sbt test
. To only run unit tests runsbt -DonlyUnitTest test
, orsbt -DonlyUnitTest=true test
, or set up that value totrue
in the sbt session byset onlyUnitTest := true
and then runtest
-
Unit tests (with cross-component calls)
Testing an Action serving as a controller, or more generally, one that depends on calling other components, requires that a mock registry containing the mocks to be used be provided to TestKit. Later, at runtime, the TestKit will try to find the appropriate mock object it needs by matching those with the dependency component’s class type.
So, let’s say we want to test the previous example where we rely on 2 external
calls to create and populate the shopping cart before replying. A unit test for such action method would look like:
- Java
-
src/test/java/com/example/shoppingcart/ShoppingCartActionImplTest.java
public class ShoppingCartActionImplTest { @Mock private ShoppingCartService shoppingCartService; (1) @Test public void prePopulatedCartTest() throws ExecutionException, InterruptedException, TimeoutException { when(shoppingCartService.create(notNull())) (2) .thenReturn(CompletableFuture.completedFuture(Empty.getDefaultInstance())); when(shoppingCartService.addItem(any())) .thenReturn(CompletableFuture.completedFuture(Empty.getDefaultInstance())); var mockRegistry = MockRegistry.create().withMock(ShoppingCartService.class, shoppingCartService); (3) var service = ShoppingCartActionImplTestKit.of(ShoppingCartActionImpl::new, mockRegistry); (4) var result = service.createPrePopulated(NewCart.getDefaultInstance()).getAsyncResult(); var reply = ((CompletableFuture<ActionResult<NewCartCreated>>) result) .get(1, TimeUnit.SECONDS) .getReply(); // assertions go here } }
1 First step is to declare our mock object. In this example, shoppingCartService
is a@Mock
object by Mockito framework.2 We start by configuring our mock service how to reply to the two calls: create
andaddItem
.3 Then we use the TestKit-provided MockRegistry
to initialize and addshoppingCartService
to serve as a mock for class typeShoppingCartService
.4 Finally, we just need to pass the mockRegistry
while initializing theShoppingCartActionImplTestKit
and the TestKit will make sure to try to find our mock object when it needs. - Scala
-
src/main/test/com/example/shoppingcart/ShoppingCartActionImplSpec.scala
class ShoppingCartActionImplSpec extends AsyncWordSpec with Matchers with AsyncMockFactory { "ShoppingCartActionImpl" must { "create a prepopulated cart" in { val mockShoppingCart = stub[ShoppingCartService] (1) (mockShoppingCart.create _) (2) .when(*) .returns(Future.successful(Empty.defaultInstance)) (mockShoppingCart.addItem _) .when(where { (li: AddLineItem) => li.name == "eggplant"}) .returns(Future.successful(Empty.defaultInstance)) val mockRegistry = MockRegistry.withMock(mockShoppingCart) (3) val service = ShoppingCartActionImplTestKit(new ShoppingCartActionImpl(_), mockRegistry) (4) val cartId = service.createPrePopulated(NewCart.defaultInstance).asyncResult // assertions go here } } }
1 First step is to declare our mock object. In this case, shoppingCartService
is astub
object provided by ScalaMock framework.2 Then we configure our mock service how to reply to the two calls: create
andaddItem
.3 We use the TestKit-provided MockRegistry
to initialize and addshoppingCartService
to serve as a mock for class typeShoppingCartService
.4 Finally, we just need to pass the mockRegistry
while initializing theShoppingCartActionImplTestKit
and the TestKit will make sure to try to find our mock object when it needs.