Implementing Workflows
Workflows make it possible to implement long-running, multi-step business processes while focusing exclusively on domain and business logic. Workflows provide durability, consistency and the ability to call other components and services. Business transactions can be modeled in one central place, and the Workflow will keep them running smoothly, or roll back if something goes wrong.
Identifying the Workflow
In order to interact with a Workflow in Kalix, we need to assign an type id and one or more instance ids:
-
type id is a unique identifier to the workflow. To define the workflow type id, the workflow class must be annotated with
@TypeId
and have a unique and stable identifier assigned. -
id, on the other hand, is unique per instance. In most cases, the workflow id is passed as a path parameter of a REST request. The exception to the rule is when we request Kalix to auto-generate a id for us. In such a case, Kalix won’t try to extract the id from the endpoint path.
The workflow id can be defined in different ways, as detailed below.
Single identifier
The most common use is to annotate the class with @Id
and assign one path variable name to it.
For instance, @Id("id")
will instruct Kalix to look up a matching path variable. For an endpoint defined with @RequestMapping("/transfer/{id}")
, Kalix will extract whatever path segment is used to replace {id}
and treat it as the Workflow unique identifier.
Composite identifier
It’s also possible to have a composite identifier. For example, @Id({"groupId", "id"})
defines a composite identifier made of groupId
and id
. In such a case, the endpoints for this workflow will need to have both path variables, e.g.: @RequestMapping("/transfer/{groupId}/{id}")
.
Generated identifier
Finally, you can ask Kalix to generate an unique identifier, this is typically useful when creating a Workflow, and the id is a surrogate id. To indicate to Kalix that an Workflow id should be generated rather than extracted from the path, be sure to annotate the corresponding command method with @GenerateId
. Typically, a Workflow has only one method annotated with @GenerateId
. The one that creates the Workflow. All other methods will have @Id
annotation in order to extract the surrogate id from the endpoint path.
It will often be necessary to access the generated id from inside the workflow code. This can be done using the WorkflowContext.workflowId
method.
Kalix generates a UUID version 4 (random) keys. Only version 4 UUIDs are currently supported for generated Workflow identifiers. |
Workflow’s Effect API
The Workflow’s Effect defines the operations that Kalix should perform when an incoming command is handled by a Workflow.
A Workflow Effect can either:
-
update the state of the workflow
-
define the next step to be executed (transition)
-
pause the workflow
-
end the workflow
-
fail the step or reject a command by returning an error
-
reply to incoming commands
See also Understanding what an Effect is
Modeling state
We want to build a simple workflow that transfers funds between two wallets. Before that, we will create a wallet subdomain with some basic functionalities that we could use later. For simplicity a WalletEntity
is implemented as a ValueEntity, but for a production-ready solution an EventSourcedEntity would be a better choice.
@Id("id")
@TypeId("wallet")
@RequestMapping("/wallet/{id}")
public class WalletEntity extends ValueEntity<WalletEntity.Wallet> {
public record Wallet(String id, int balance) {
public Wallet withdraw(int amount) {
return new Wallet(id, balance - amount);
}
public Wallet deposit(int amount) {
return new Wallet(id, balance + amount);
}
}
@PostMapping("/create/{initBalance}") (1)
public Effect<String> create(@PathVariable String id, @PathVariable int initBalance) {
return effects().updateState(new Wallet(id, initBalance)).thenReply("Ok");
}
@PatchMapping("/withdraw/{amount}") (2)
public Effect<String> withdraw(@PathVariable int amount) {
Wallet updatedWallet = currentState().withdraw(amount);
if (updatedWallet.balance < 0) {
return effects().error("Insufficient balance");
} else {
return effects().updateState(updatedWallet).thenReply("Ok");
}
}
@PatchMapping("/deposit/{amount}") (3)
public Effect<String> deposit(@PathVariable int amount) {
Wallet updatedWallet = currentState().deposit(amount);
return effects().updateState(updatedWallet).thenReply("Ok");
}
@GetMapping (4)
public Effect<Integer> get() {
return effects().reply(currentState().balance());
}
}
1 | An endpoint to create a wallet with an initial balance. |
2 | An endpoint to withdraw funds from the wallet. |
3 | An endpoint to deposit funds to the wallet. |
4 | An endpoint to get current wallet balance. |
Now we can focus on the workflow implementation itself. A workflow has state, which can be updated in command handlers and steps implementation. During the state modeling we might consider the information that is required for validation, running the steps, collecting data from steps or tracking the workflow progress.
public record TransferState(Transfer transfer, TransferStatus status) {
public record Transfer(String from, String to, int amount) { (1)
}
public enum TransferStatus { (2)
STARTED, WITHDRAW_SUCCEED, COMPLETED
}
public TransferState(Transfer transfer) {
this(transfer, STARTED);
}
public TransferState withStatus(TransferStatus newStatus) {
return new TransferState(transfer, newStatus);
}
}
1 | A Transfer record encapsulates data required to withdraw and deposit funds. |
2 | A TransferStatus is used to track workflow progress. |
Implementing behavior
Now that we have our workflow state defined, the remaining tasks can be summarized as follows:
-
declare your workflow and pick a workflow type and key (it needs to be unique as it will be used for sharding purposes);
-
define an access point (i.e. a route path) to your workflow;
-
implement endpoint(s) to interact with the workflow (e.g. to start a workflow, or provide additional data) or retrieve its current state;
-
provide a workflow definition with all possible steps and transitions between them.
Let’s have a look at what our transfer workflow will look like for the first 2 points from the above list:
@TypeId("transfer") (2)
@Id("transferId") (3)
@RequestMapping("/transfer/{transferId}") (4)
public class TransferWorkflow extends Workflow<TransferState> { (1)
1 | Create a class that extends Workflow<S> , where S is the state type this workflow will store (i.e. TransferState ). |
2 | Make sure to annotate such class with @TypeId and pass a unique identifier for this workflow type. |
3 | Annotate such class with @Id and pass the name of the key that will be used as the workflow instance unique identifier. |
4 | Use Spring’s RequestMapping annotation to define the route to your workflow. |
The @Id value transferId must match a path parameter (i.e. transferId ) and such value needs to be unique per workflow. On the other hand, the @TypeId value transfer is common for all instances of this workflow but must be stable - cannot be changed after a production deploy - and unique across the different workflow types.
|
Starting workflow
Having created the basis of our workflow, we will now define how to launch a workflow with a command handler. In the example below, we define a new endpoint that will accept StartTransfer
command and return an Effect
to start a workflow by providing a transition to the first step. Also, we will update the state with an initial value.
public record Withdraw(String from, int amount) { (4)
}
@PutMapping
public Effect<Message> startTransfer(@RequestBody Transfer transfer) {
if (transfer.amount() <= 0) {
return effects().error("transfer amount should be greater than zero"); (1)
} else if (currentState() != null) {
return effects().error("transfer already started"); (2)
} else {
TransferState initialState = new TransferState(transfer); (3)
Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount());
return effects()
.updateState(initialState) (4)
.transitionTo("withdraw", withdrawInput) (5)
.thenReply(new Message("transfer started")); (6)
}
}
1 | The validation ensures the transfer amount is greater than zero and it fails for calls with illegal values by returning an Effect with effects().error . |
2 | We should handle the situation when the workflow is already running and return a proper message. Otherwise, we might corrupt the running workflow. |
3 | From the incoming command we create an initial TransferState . |
4 | We instruct Kalix to persist the new state. |
5 | With the transitionTo method, we inform that the name of the first step is "withdraw" and the input for this step is a Withdraw object. |
6 | The last instruction is to inform the caller that the workflow was successfully started. |
For simplicity purposes, we are reusing the internal Transfer record as a request body. This should be a separate class and our domain state model shouldn’t be exposed as a public API .
|
Workflow definition
One missing piece of our transfer workflow implementation is a workflow definition
method, which composes all steps connected with transitions. A workflow Step
has a unique name, an action to perform (e.g. deferred call to an existing Kalix component, or asynchronous call to any external service) and a transition to select the next step (or end
transition to finish the workflow, in case of the last step).
public record Deposit(String to, int amount) {
}
@Override
public WorkflowDef<TransferState> definition() {
Step withdraw =
step("withdraw") (1)
.call(Withdraw.class, cmd -> {
return componentClient.forValueEntity(cmd.from)
.call(WalletEntity::withdraw)
.params(cmd.amount);
}) (2)
.andThen(String.class, __ -> {
Deposit depositInput = new Deposit(currentState().transfer().to(), currentState().transfer().amount());
return effects()
.updateState(currentState().withStatus(WITHDRAW_SUCCEED))
.transitionTo("deposit", depositInput); (3)
});
Step deposit =
step("deposit") (1)
.call(Deposit.class, cmd -> {
return componentClient.forValueEntity(cmd.to)
.call(WalletEntity::deposit)
.params(cmd.amount);
}) (4)
.andThen(String.class, __ -> {
return effects()
.updateState(currentState().withStatus(COMPLETED))
.end(); (5)
});
return workflow() (6)
.addStep(withdraw)
.addStep(deposit);
}
1 | Each step should have a unique name. |
2 | We instruct Kalix to run a given deferred call, using the ComponentClient, to withdraw funds from a wallet. |
3 | After successful withdrawal we return an Effect that will update the workflow state and move to the next step called "deposit" . An input parameter for this step is a Deposit record. |
4 | Another workflow step action to deposit funds to a given wallet. |
5 | This time we return an effect that will stop workflow processing, by using special end method. |
6 | We collect all steps to form a workflow definition. |
Retrieving state
To have access to the current state of the workflow we can use currentState()
. However, if this is the first command we are receiving for this workflow, the state will be null
. We can change it by overriding the emptyState
method. The following example shows the implementation of the read-only command handler (accessed through GET /transfer/transferId
):
@GetMapping (1)
public Effect<TransferState> getTransferState() {
if (currentState() == null) {
return effects().error("transfer not started");
} else {
return effects().reply(currentState()); (2)
}
}
1 | Marks this method as a command handler for GET requests. |
2 | Returns the current state as reply for the request. |
For simplicity purposes, we are returning the internal state directly back to the requester. In a real-world scenario, it’s usually better to instead convert this internal domain model into a public model so the internal representation is free to evolve without breaking clients code. |
A full transfer workflow source code is available here. Follow the README
file to run and test it.
Pausing workflow
A long-running workflow can be paused while waiting for some additional information to continue processing. A special pause
transition can be used to inform Kalix that the execution of the Workflow should be postponed. By launching a request to a Workflow endpoint, the user can then resume the processing. Additionally, a Kalix Timer can be scheduled to automatically inform the Workflow that the expected time for the additional data has passed.
Step waitForAcceptation =
step("wait-for-acceptation")
.asyncCall(() -> {
String transferId = currentState().transferId();
return timers().startSingleTimer(
"acceptationTimout-" + transferId,
ofHours(8),
componentClient.forWorkflow(transferId)
.call(TransferWorkflow::acceptationTimeout)); (1)
})
.andThen(Done.class, __ ->
effects().pause()); (2)
1 | Schedules a timer as a Workflow step action. Make sure that the timer name is unique for every Workflow instance. |
2 | Pauses the Workflow execution. |
Remember to cancel the timer once the Workflow is resumed. Also, adjust the Workflow timeout to match the timer schedule. |
Exposing additional mutational endpoints from the Workflow implementation should be done with special caution. Accepting a request from such endpoints should only be possible when the Workflow is in the expected state.
@PatchMapping("/accept")
public Effect<Message> accept() {
if (currentState() == null) {
return effects().error("transfer not started");
} else if (currentState().status() == WAITING_FOR_ACCEPTATION) { (1)
Transfer transfer = currentState().transfer();
Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount());
return effects()
.transitionTo("withdraw", withdrawInput)
.thenReply(new Message("transfer accepted"));
} else { (2)
return effects().error("Cannot accept transfer with status: " + currentState().status());
}
}
1 | Accepts the request only when status is WAITING_FOR_ACCEPTATION . |
2 | Otherwise, rejects the requests. |
Error handling
Design for failure is one of the key attributes of all Kalix components. Workflow has the richest set of configurations from all of them. It’s essential to build robust and reliable solutions.
Timeouts
By default, a workflow run has no time limit. It can run forever, which in most cases is not desirable behavior. A workflow step, on the other hand, has a default timeout of 5 seconds. Both settings can be overridden at the workflow definition level or for a specific step configuration.
return workflow()
.timeout(ofSeconds(5)) (1)
.defaultStepTimeout(ofSeconds(2)) (2)
1 | Sets a workflow global timeout. |
2 | Sets a default timeout for all workflow steps. |
A default step timeout can be overridden in step builder.
Step failoverHandler =
step("failover-handler")
.asyncCall(() -> {
return CompletableFuture.completedStage("handling failure");
})
.andThen(String.class, __ -> effects()
.updateState(currentState().withStatus(REQUIRES_MANUAL_INTERVENTION))
.end())
.timeout(ofSeconds(1)); (1)
1 | Overrides the step timeout for a specific step. |
Recover strategy
It’s time to define what should happen in case of timeout or any other unhandled error.
return workflow()
.failoverTo("failover-handler", maxRetries(0)) (1)
.defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) (2)
.addStep(withdraw)
.addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) (3)
.addStep(waitForAcceptation)
.addStep(compensateWithdraw) (4)
.addStep(failoverHandler);
1 | Sets a failover transition in case of a workflow timeout. |
2 | Sets a default failover transition for all steps with maximum number of retries. |
3 | Overrides the step recovery strategy for the deposit step. |
4 | Failover steps should be added like any other steps. |
In case of a workflow timeout one last failover step can be performed. Transitions from that failover step will be ignored. |
Compensation
The idea behind the Workflow error handling is that workflows should only fail due to unknown errors during execution. In general, you should always write your workflows so that they do not fail on any known edge cases. If you expect an error, it’s better to be explicit about it, possibly with your domain types. Based on this information and the flexible Workflow API you can define a compensation for any workflow step.
Step deposit =
step("deposit")
.call(Deposit.class, cmd -> {
return componentClient.forValueEntity(cmd.to)
.call(WalletEntity::deposit)
.params(cmd.amount);
})
.andThen(DepositResult.class, depositResult -> { (1)
if (depositResult instanceof DepositSucceed) {
return effects()
.updateState(currentState().withStatus(COMPLETED))
.end(); (2)
} else if (depositResult instanceof DepositFailed depositFailed) { (3)
return effects()
.updateState(currentState().withStatus(DEPOSIT_FAILED))
.transitionTo("compensate-withdraw"); (4)
} else {
throw new IllegalStateException("not supported deposit result: " + depositResult);
}
});
Step compensateWithdraw =
step("compensate-withdraw") (4)
.call(() -> {
var transfer = currentState().transfer();
return componentClient.forValueEntity(transfer.from())
.call(WalletEntity::deposit)
.params(transfer.amount());
})
.andThen(DepositResult.class, depositResult -> {
if (depositResult instanceof DepositSucceed) {
return effects()
.updateState(currentState().withStatus(COMPENSATION_COMPLETED))
.end(); (5)
} else {
throw new IllegalStateException("Expecting succeed operation but received: " + depositResult); (6)
}
});
1 | Explicit deposit call result type DepositResult . |
2 | Finishes workflow as completed, in the case of a successful deposit. |
3 | Launches compensation step to handle deposit failure. The "withdraw" step must be reversed. |
4 | Compensation step is just any other step, with the same set of functionalities. |
5 | Correct compensation can finish the workflow. |
6 | Any other result might be handled by a default recovery strategy. |
Compensating a workflow step (steps) might involve multiple logical steps and thus is part of the overall business logic that must be defined within the workflow itself. For simplicity, in the example above, the compensation is applied only to withdraw
step. Whereas deposit
step itself might also require a compensation. In case of a step timeout we can’t be certain about step successful or error outcome.
A full error handling and compensation sample is available here. Run TransferWorkflowIntegrationTest
and examine the logs from the application.