Implementing Actions
Actions are stateless functions that can be used to implement different uses cases, such as:
-
a pure function.
-
request conversion - you can use Actions to convert incoming data into a different format before forwarding a call to a different component.
-
as a face or controller to fan out to multiple calls to different components.
-
publish messages to a Topic.
-
subscribe to events from an Event Sourced Entity.
-
subscribe to state changes from a Value Entity.
-
schedule and cancel Timers.
Actions can be triggered in multiple ways. For example, by:
-
a gRPC service call.
-
an HTTP service call.
-
a forwarded call from another component.
-
a scheduled call from a Timer.
-
an incoming message from a Topic.
-
an incoming event from an Event Sourced Entity, from within the same service or from a different service.
-
state changes notification from a Value Entity on the same service.
-
a service life-cycle event (e.g. on startup).
Action’s Effect API
The Action’s Effect defines the operations that Kalix should perform when an incoming message is handled by an Action.
An Action Effect can either:
-
reply with a message to the caller
-
reply with a message to be published to a topic (in case the method is a publisher)
-
forward the message to another component
-
return an error
-
ignore the call
See also Understanding what an Effect is
Actions as Pure Functions
In this first example, you will learn how to implement an Action as a pure stateless function. You will create a FibonacciAction
that takes a number and returns the
next number in the Fibonacci series.
To implement this action you need the following:
-
Extend our class from
kalix.javasdk.action.Action
. This is generic. No matter what action you want to create you always need to extend fromAction
. -
Add the Spring annotation @RequestMapping to provide a REST endpoint for the function. Here the stateless function should be reachable via HTTP.
-
Add the Spring annotations @GetMapping and @PostMapping to provide paths for GET and POST to calculate the Fibonacci of a number. Both functions do the same thing and implementation-wise the function exposed with GET calls the function exposed with POST.
import kalix.javasdk.action.Action;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.PathVariable;
@RequestMapping("/fibonacci")
public class FibonacciAction extends Action {
private boolean isFibonacci(long num) { (1)
Predicate<Long> isPerfectSquare = (n) -> {
long square = (long) Math.sqrt(n);
return square * square == n;
};
return isPerfectSquare.test(5*num*num + 4) || isPerfectSquare.test(5*num*num - 4);
}
private long nextFib(long num) { (2)
double result = num * (1 + Math.sqrt(5)) / 2.0;
return Math.round(result);
}
@GetMapping("/{number}/next")
public Effect<Number> getNumber(@PathVariable Long number) { (3)
return nextNumber(new Number(number));
}
@PostMapping("/next")
public Effect<Number> nextNumber(@RequestBody Number number) {
long num = number.value();
if (isFibonacci(num)) { (4)
return effects().reply(new Number(nextFib(num)));
} else {
return effects() (5)
.error("Input number is not a Fibonacci number, received '" + num + "'", INVALID_ARGUMENT);
}
}
}
1 | isFibonacci checks if a number is a Fibonacci number. |
2 | nextFib calculates the next number. |
3 | This nextNumber implementation calls the nextNumber implementation below. |
4 | The nextNumber implementation first checks if the input number belongs to the Fibonacci series. If so, it calculates the next number and builds a reply using effects().reply() . |
5 | Otherwise, if the input number doesn’t belong to the Fibonacci series, it builds an Effect reply error. |
Actions return effects (i.e. Action.Effect
) and there are different types of effects: a reply, an error, a forward call to another component, and to all of those you can add side effects. Here you want only the result of the calculation or an error. Therefore, you are using .reply
and .error
.
Actions as Controllers
Actions can be used to implement MVC Controllers by acting as the external interface of a service, receiving requests, operating over the requests values and forwarding the call to other components in the same service.
To illustrate how you can use an Action as a Controller, we will build on top of a Value Entity used to implement a Shopping Cart example, adding a new Action to the existing shopping cart service.
If you are hearing about ValueEntity for the first time, be sure to visit Implementing Value Entities before continuing.
|
Below you can find a summary of the shopping cart value entity we will use in this chapter: it contains only the signatures of the available endpoints for brevity:
@Id("cartId")
@TypeId("shopping-cart")
@RequestMapping("/cart/{cartId}") (1)
public class ShoppingCartEntity extends ValueEntity<ShoppingCart> {
@PostMapping("/create") (2)
public ValueEntity.Effect<ShoppingCartDTO> create() {
//...
@PostMapping("/items/add") (3)
public ValueEntity.Effect<ShoppingCartDTO> addItem(@RequestBody LineItemDTO addLineItem) {
//...
@GetMapping (4)
public ValueEntity.Effect<ShoppingCartDTO> getCart() {
//...
}
1 | Common path being used: /cart/ suffixed with a cartId . |
2 | POST endpoint exposed at (…)/create used to create a new cart with cartId . |
3 | POST endpoint exposed at (…)/items/add allowing to add an item to an cart. |
4 | GET endpoint for retrieving the state of a cart. |
Forwarding Commands
The forward
effect allows you to transform or further validate an incoming request before passing it on to another
component and have the response message directly passed back to the client making the request. The response from the
forwarded operation must have the same response type as the original request.
In this example it accepts requests with the same structure as the create
endpoint listed above, by receiving a LineItemDTO
, but add some additional verification of the
request and only conditionally forward the request to the entity if the verification is successful:
import kalix.javasdk.Metadata;
import kalix.javasdk.action.Action;
import kalix.javasdk.action.Action.Effect;
import kalix.javasdk.annotations.ForwardHeaders;
import kalix.javasdk.client.ComponentClient;
@RequestMapping("/carts")
public class ShoppingCartController extends Action {
private final ComponentClient componentClient;
public ShoppingCartController(ComponentClient componentClient) {
this.componentClient = componentClient; (1)
}
@PostMapping("/{cartId}/items/add") (2)
public Action.Effect<ShoppingCartDTO> verifiedAddItem(@PathVariable String cartId,
@RequestBody LineItemDTO addLineItem) {
if (addLineItem.name().equalsIgnoreCase("carrot")) { (3)
return effects().error("Carrots no longer for sale"); (4)
} else {
var deferredCall = componentClient.forValueEntity(cartId)
.call(ShoppingCartEntity::addItem)
.params(addLineItem); (5)
return effects().forward(deferredCall); (6)
}
}
1 | ComponentClient is injected on the constructor. It will be used to build calls to the underlining Entity. |
2 | Expose the command handler as a POST endpoint at specified path. |
3 | Check if the added item is carrots. |
4 | If it is "carrots" immediately return an error, disallowing adding the item. |
5 | For allowed requests, use componentClient to get a deferred call to the entity. |
6 | The deferredCall is then used with effects().forward() to forward the request to the entity. |
You might be wondering what the componentClient is about. For now, think of it as a lightweight, type safe, HTTP client allowing you to reach out to other Kalix services. All details can be found at Component and Service Calls chapter.
|
Forwarding Headers
By default, Kalix does not forward gRPC/HTTP headers to Kalix components. This can be overridden with the @ForwardHeaders
annotation.
@RequestMapping("/carts")
@ForwardHeaders("UserRole") (1)
public class ShoppingCartController extends Action {
@DeleteMapping("/{cartId}")
public Effect<String> removeCart(@PathVariable String cartId,
@RequestHeader("UserRole") String userRole) { (2)
var userRoleFromMeta = actionContext().metadata().get("UserRole").get(); (3)
Metadata metadata = Metadata.EMPTY.add("Role", userRole);
return effects().forward(
componentClient.forValueEntity(cartId)
.call(ShoppingCartEntity::removeCart)
.withMetadata(metadata)); (4)
}
}
1 | Specify headers names that should be forwarded to this component. |
2 | Access the header value with the @RequestHeader annotation. |
3 | Alternatively, the header value can be retrieved from the Metadata associated with this call. |
4 | Forward different headers to the next call, by using withMetadata method of from DeferredCall class. |
Transform Request and Response to Another Component
The asyncReply
and asyncEffect
effects allow you to process and transform a request before calling another component and then also transform the response.
As an example, let us look at the problem of creating a new entity with an id generated by another component.
This example implements an initializeCart
command for the controller Action which returns the generated id that can subsequently be used to interact with the cart.
@PostMapping("/create")
public Effect<String> initializeCart() {
final String cartId = UUID.randomUUID().toString(); (1)
CompletionStage<ShoppingCartDTO> shoppingCartCreated =
componentClient.forValueEntity(cartId)
.call(ShoppingCartEntity::create) (2)
.execute(); (3)
// transform response
CompletionStage<Effect<String>> effect =
shoppingCartCreated.handle((empty, error) -> { (4)
if (error == null) {
return effects().reply(cartId); (5)
} else {
return effects().error("Failed to create cart, please retry"); (6)
}
});
return effects().asyncEffect(effect); (7)
}
1 | Generate a new UUID. |
2 | Use the componentClient to create a call to endpoint create on the shopping cart. |
3 | execute() on the deferred call immediately triggers a call and returns a CompletionStage for the response. |
4 | Once the call succeeds or fails the CompletionStage is completed or failed, we can transform the result from CompletionStage<Empty> .
to CompletionStage<Effect<String>> using handle . |
5 | On a successful response, create a reply effect passing back the cartId . |
6 | If the call leads to an error, create an error effect asking the client to retry. |
7 | effects().asyncEffect() allows us to reply with a CompletionStage<Effect<String>> . |
The action generates a UUID to use as entity id for the shopping cart. UUIDs are extremely unlikely to lead to the same id being generated, but to completely guarantee two calls can never be assigned the same shopping cart we make use of the "boundary of consistency" provided by the entity - the entity will only process a single command at a time and can safely make decisions based on its state - for example to only allow creation once by storing something in its state signifying that it has been created.
In this case you mark that the entity has been created using a creation timestamp in the shopping cart state stored on first
create
call - when the timestamp has the default value of 0
. If the cart has already been stored with a timestamp it returns an error effect:
@PostMapping("/create") (2)
public ValueEntity.Effect<ShoppingCartDTO> create() {
//...
if (currentState().creationTimestamp() > 0L) {
return effects().error("Cart was already created");
} else {
var newState = currentState().withCreationTimestamp(Instant.now().toEpochMilli());
return effects()
.updateState(newState)
.thenReply(ShoppingCartDTO.of(newState));
}
}
Composing calls
The async call shown in the previous section, can also be used to chain or compose multiple calls to a single action response.
This example builds on the previous cart creation by adding an initial item in the cart once it has been created, but before it returns the new id to the client:
@PostMapping("/prepopulated")
public Action.Effect<String> createPrePopulated() {
final String cartId = UUID.randomUUID().toString();
CompletionStage<ShoppingCartDTO> shoppingCartCreated =
componentClient.forValueEntity(cartId).call(ShoppingCartEntity::create).execute();
CompletionStage<ShoppingCartDTO> cartPopulated =
shoppingCartCreated.thenCompose(empty -> { (1)
var initialItem = new LineItemDTO("e", "eggplant", 1);
return componentClient.forValueEntity(cartId)
.call(ShoppingCartEntity::addItem)
.params(initialItem) (2)
.execute(); (3)
});
CompletionStage<String> reply = cartPopulated.thenApply(ShoppingCartDTO::cartId); (4)
return effects()
.asyncReply(reply); (5)
}
1 | CompletionStage#thenCompose allow you to perform an additional async operation, returning a CompletionStage once the current one completes successfully. |
2 | Create a request to add an initial item to the cart. |
3 | Execute the addItem call returns a CompletionStage<ShoppingCartDTO> once it succeeds. |
4 | Transform the successful completion of addItem with ShoppingCartDTO to the response type of this method - String . |
5 | effects().asyncReply() lets us reply once the CompletionStage<String> completes. |
In this sample it is safe to base a subsequent call to the entity on the reply of the previous one, no client will know
of the cart id until createPrePopulated
replies.
There is no transaction or consistency boundary outside of the entity, so for a sequence of calls from an action to an entity, the state of the entity could be updated by other calls it receives in-between. |
For example, imagine an action that for a cart id retrieves the state using getState
to verify if too many items are
already in the cart, and once that has been verified, it adds the item to the cart.
@PostMapping("/{cartId}/unsafeAddItem")
public Action.Effect<String> unsafeValidation(@PathVariable String cartId,
@RequestBody LineItemDTO addLineItem) {
// NOTE: This is an example of an anti-pattern, do not copy this
CompletionStage<ShoppingCartDTO> cartReply =
componentClient.forValueEntity(cartId).call(ShoppingCartEntity::getCart).execute(); (1)
CompletionStage<Action.Effect<String>> effect = cartReply.thenApply(cart -> {
int totalCount = cart.items().stream()
.mapToInt(LineItemDTO::quantity)
.sum();
if (totalCount < 10) {
return effects().error("Max 10 items in a cart");
} else {
CompletionStage<String> addItemReply = componentClient.forValueEntity(cartId).call(ShoppingCartEntity::addItem).params(addLineItem)
.execute().thenApply(ShoppingCartDTO::cartId);
return effects()
.asyncReply(addItemReply); (2)
}
});
return effects().asyncEffect(effect);
}
1 | Between this call returning. |
2 | And this next call to the same entity, the entity could accept other commands that change the total count of items in the cart. |
The problem with this is that a POST /cart/my-cart/items/add
call directly to the entity happening between the GET /cart/my-cart
action returning and the subsequent "addItem" call from the action would lead to more items in the cart than the allowed limit.
Such validation depending on state can only safely be done handling the command inside of the entity.
Actions as Life-cycle Hooks
An Action method can be triggered automatically when some predefined service life-cycle event happens (currently, only on startup is available), serving as a custom hook. For such use, the method needs to be public, annotated with @Trigger.OnStartup
and cannot receive any parameters, as shown below.
The on startup hook is called every time a service instance boots up. This can happen for very different reasons: restarting / redeploying the service, scaling up to more instances or even without any user-driven action (e.g. Kalix Runtime versions being rolled out, infrastructure-related incidents, etc.). Therefore, you should carefully consider how you use this hook and its implementation. |
public class OnStartupAction extends Action { (1)
@PostMapping("/init")
@Trigger.OnStartup( (2)
maxRetries = 3) (3)
public Action.Effect<String> init() { (4)
// Do some initial operations here
return effects().reply("Done");
}
}
1 | Only methods belonging to an Action can be configured as a hook. |
2 | This hook will be triggered once the instance startup is concluded (i.e. will be called 3 times if 3 instances are running). |
3 | Optionally, set the max amount of retries to 3. |
4 | The method must be public and receive no parameters. |
If the call to the hook returns a failure and the maxRetries is set to a value greater than the default value (0 ), a number of retry calls will be executed with a fixed delay up until the configure amount is reached.
|
Running Side Effects
Emitting effects on another component
An Entity or an Action may also emit one or more side effects. A side effect is something whose result has no impact on the result of the current command—if it fails, the current command still succeeds. The result of the side effect is therefore ignored. When used from inside an Entity, side effects are only performed after the successful completion of any state actions requested by the command handler.
There is no guarantee that a side effect will be executed successfully. If a failure occurs after the command is fully handled, effects might not be executed. Side effects are not retried in case of failures.
Side effects may be declared as synchronous or asynchronous. Asynchronous commands run in a "fire and forget" fashion. The code flow of the caller (the command handler of the entity which emitted the asynchronous command) continues while the command is being asynchronously processed. Meanwhile, synchronous commands run sequentially, that is, the commands are processed in order, one at a time. The final result of the command handler, either a reply or a forward, is not sent until all synchronous commands are completed.
Emitting a side effect
To illustrate how you can emit a side effect, you can build on top of the Action as a Controller example. In that previous example, you build a controller around the Value Entity Counter and forwarded the incoming request after modifying it.
This time, instead of using a forward
, you will call the entity using a side effect.
Implementing the Action
The class DoubleCounterAction
listens to the counter state changes. When the counter value changes this action doubles it.
@PostMapping("/counter/{counterId}/double-increase/{value}") (1)
public Action.Effect<String> increaseWithSideEffect(@PathVariable String counterId, @PathVariable int value) {
var doubleIncrease = value * 2; (2)
var deferredCall = componentClient.forValueEntity(counterId)
.call(CounterEntity::increaseBy)
.params(new Number(doubleIncrease));
return effects().reply("ok").addSideEffect(SideEffect.of(deferredCall)); (3)
}
1 | Exposing dedicated endpoint. |
2 | On incoming request, doubling the value of increase . |
3 | Building a reply and attaching a side effect, i.e. calling to the Counter to increase double the previous amount. |
the response of the side effect is ignored by the command meaning that even if the deferred call to
the Counter entity fails, the Action reply will succeed.
|
Testing the Action
Unit tests
The following snippet shows how the ActionTestkit
is used to test the FibonacciAction
implementation.
With the ActionTestkit
you can call the methods of FibonacciAction
. Each call you pass over to the test kit returns an ActionResult
that contains the effect produced by the underlying action method.
Actions are unique units of computation where no local state is shared with previous or subsequent calls. The framework does not reuse an Action instance but creates a new one for each command handled and therefore this is also how the test kit behaves.
|
- Java
-
src/test/java/com/example/actions/FibonacciActionTest.java
import kalix.javasdk.testkit.ActionResult; import kalix.javasdk.testkit.ActionTestkit; import org.junit.jupiter.api.Test; public class FibonacciActionTest { @Test public void testNextFib() { ActionTestkit<FibonacciAction> testkit = ActionTestkit.of(FibonacciAction::new); (1) ActionResult<Number> result = testkit.call(a -> a.getNumber(3L)); (2) assertTrue(result.isReply()); assertEquals(5L, result.getReply().value()); } @Test public void testNextFibError() { ActionTestkit<FibonacciAction> testkit = ActionTestkit.of(FibonacciAction::new); (1) ActionResult<Number> result = testkit.call(a -> a.getNumber(4L)); (2) assertTrue(result.isError()); assertTrue(result.getError().startsWith("Input number is not a Fibonacci number")); } }
1 The test kit is created to allow us to test the Action’s method. 2 Calling nextNumber
method with some value.ActionResult
Calling an action method through the test kit gives us back an
ActionResult
. This class has methods that you can use to assert your expectations, such as:-
getReply()
returns the reply message passed toeffects().reply()
or throws an exception failing the test, if the effect returned was not a reply. -
getError()
returns the error description wheneffects().error()
was returned to signal an error. -
getForward()
returns details about what message was forwarded and where the call was forwarded (since it is a unit test the forward is not actually executed).
-
The side effects of an Action can NOT be tested in isolation at the moment. |
Integration tests
Actions (like any other Kalix component) can be verified with integration tests. The Spring WebClient utility can be used to run any HTTP call to test Kalix components.
- Java
-
src/it/java/com/example/fibonacci/FibonacciActionIntegrationTest.java
@SpringBootTest(classes = Main.class) (1) public class FibonacciActionIntegrationTest extends KalixIntegrationTestKitSupport { (2) @Autowired private WebClient webClient; private Duration timeout = Duration.of(5, SECONDS); @Test public void calculateNextNumber() { ResponseEntity<Number> response = webClient.get() .uri("/fibonacci/5/next") .retrieve() .toEntity(Number.class) .block(timeout); (3) Assertions.assertEquals(HttpStatus.OK, response.getStatusCode()); Assertions.assertEquals(8, response.getBody().value()); } }
1 Mark the test as a Spring integration tests. 2 Set up the Kalix infrastructure by extending KalixIntegrationTestKitSupport
.3 Use WebClient
to call the Action component endpoint.
In cases where detailed assertions on the HTTP response are not required, the Kalix ComponentClient can be used in integration tests.
- Java
-
src/it/java/com/example/fibonacci/FibonacciActionComponentClientIntegrationTest.java
@SpringBootTest(classes = Main.class) public class FibonacciActionComponentClientIntegrationTest extends KalixIntegrationTestKitSupport { private Duration timeout = Duration.of(5, SECONDS); @Test public void calculateNextNumber() throws ExecutionException, InterruptedException, TimeoutException { Number response = componentClient.forAction() (1) .call(FibonacciAction::nextNumber) .params(new Number(5)) .execute() (2) .toCompletableFuture() .get(timeout.toMillis(), MILLISECONDS); Assertions.assertEquals(8, response.value()); } }
1 Use the ComponentClient
to call the Action component endpoint.2 Transform the DeferredCall
to aCompletionStage
and wait for the response.
The integration tests in samples are under in a specific project profile it and can be run using mvn verify -Pit .
|