Implementing Workflows
Workflows make it possible to implement long-running, multi-step business processes while focusing exclusively on domain and business logic. Workflows provide durability, consistency and the ability to call other components and services. Business transactions can be modeled in one central place, and the Workflow will keep them running smoothly, or roll back if something goes wrong.
Workflow’s Effect API
The Workflow’s Effect defines the operations that Kalix should perform when an incoming command is handled by a Workflow.
A Workflow Effect can either:
-
update the state of the workflow
-
define the next step to be executed (transition)
-
pause the workflow
-
end the workflow
-
fail the step or reject a command by returning an error
-
reply to incoming commands
See also Understanding what an Effect is.
Defining the proto files
We want to build a simple workflow that transfers funds between two wallets. Before that, we will create a wallet subdomain with some basic functionalities that we could use later. For simplicity a WalletEntity
is implemented as a ValueEntity, but for a production-ready solution an EventSourcedEntity would be a better choice.
service WalletService {
option (kalix.codegen) = {
value_entity: {
name: "com.example.wallet.api.WalletEntity"
type_id: "wallet"
state: "com.example.wallet.domain.WalletState"
}
};
rpc Create(InitialBalance) returns (google.protobuf.Empty) {} (1)
rpc Withdraw(WithdrawRequest) returns (google.protobuf.Empty) {} (2)
rpc Deposit(DepositRequest) returns (google.protobuf.Empty) {} (3)
rpc GetWalletState(GetRequest) returns (com.example.wallet.domain.WalletState) {} (4)
}
1 | An endpoint to create a wallet with an initial balance. |
2 | An endpoint to withdraw funds from the wallet. |
3 | An endpoint to deposit funds to the wallet. |
4 | An endpoint to get current wallet balance. |
Now we can focus on the workflow implementation itself. A workflow has state, which can be updated in command handlers and steps implementation. During the state modeling we might consider the information that is required for validation, running the steps, collecting data from steps or tracking the workflow progress.
message TransferState { (1)
string from = 1;
string to = 2;
int32 amount = 3;
TransferStatus status = 4;
}
enum TransferStatus { (2)
STARTED = 0;
WITHDRAW_SUCCEED = 1;
COMPLETED = 2;
}
1 | A TransferState encapsulates data required to withdraw and deposit funds. |
2 | A TransferStatus is used to track workflow progress. |
The following transfer_api.proto
file defines our TransferWorkflow
component. In this file we instruct the Kalix code generation tooling (codegen) to generate all stubs for your workflow/service and corresponding tests, as well as an abstract class for your implementation to extend.
- Java
-
src/main/proto/com/example/transfer/transfer_api.proto
syntax = "proto3"; package com.example.transfer.api; (1) import "google/protobuf/empty.proto"; (2) import "kalix/annotations.proto"; import "com/example/transfer/transfer_domain.proto"; option java_outer_classname = "TransferApi"; (3) message Transfer { (4) string transfer_id = 1 [(kalix.field).id = true]; (5) string from = 2; string to = 3; int32 amount = 4; } message GetRequest{ string transfer_id = 1 [(kalix.field).id = true]; } service TransferWorkflowService { (6) option (kalix.codegen) = { (7) workflow: { (8) name: "com.example.transfer.api.TransferWorkflow" (9) type_id: "transfer" (10) state: "com.example.transfer.domain.TransferState" (11) } }; rpc Start(Transfer) returns (google.protobuf.Empty) {} rpc GetTransferState(GetRequest) returns (com.example.transfer.domain.TransferState) {} }
1 Any classes generated from this protobuf file will be in the Java package com.example.transfer.api
.2 Import the Kalix protobuf annotations or options. 3 Let the messages declared in this protobuf file be inner classes to the Java class TrasnferApi
.4 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data. 5 Every Command must contain a string
field that contains the workflow ID and is marked with the(kalix.field).id
option.6 The service descriptor shows the API of the workflow. It lists the methods a client can use to issue Commands to the workflow. 7 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. 8 workflow
indicates that we want the codegen to generate a Workflow component for this service.9 name
denotes the base name for the Workflow component, the code-generation will create initial sourcesTransferWorkflow
andTransferWorkflowIntegrationTest
. Once these files exist, they are not overwritten, so you can freely add logic to them.10 type_id
is a unique identifier of the "state storage". The workflow name may be changed even after data has been created, thetype_id
can’t.11 state
points to the protobuf message representing the workflow’s state which is kept by Kalix. - Scala
-
src/main/proto/com/example/transfer/transfer_api.proto
syntax = "proto3"; package com.example.transfer.api; (1) import "google/protobuf/empty.proto"; (2) import "kalix/annotations.proto"; import "com/example/transfer/transfer_domain.proto"; message Transfer { (3) string transfer_id = 1 [(kalix.field).id = true]; (4) string from = 2; string to = 3; int32 amount = 4; } message GetRequest{ string transfer_id = 1 [(kalix.field).id = true]; } service TransferWorkflowService { (5) option (kalix.codegen) = { (6) workflow: { (7) name: "com.example.transfer.api.TransferWorkflow" (8) type_id: "transfer" (9) state: "com.example.transfer.domain.TransferState" (10) } }; rpc Start(Transfer) returns (google.protobuf.Empty) {} rpc GetTransferState(GetRequest) returns (com.example.transfer.domain.TransferState) {} }
1 Any classes generated from this protobuf file will be in the Scala package com.example.transfer.api
.2 Import the Kalix protobuf annotations or options. 3 We use protobuf messages to describe the Commands that our service handles. They may contain other messages to represent structured data. 4 Every Command must contain a string
field that contains the workflow ID and is marked with the(kalix.field).id
option.5 The service descriptor shows the API of the workflow. It lists the methods a client can use to issue Commands to the workflow. 6 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. 7 workflow
indicates that we want the codegen to generate a Workflow component for this service.8 name
denotes the base name for the Workflow component, the code-generation will create initial sourcesTransferWorkflow
andTransferWorkflowIntegrationTest
. Once these files exist, they are not overwritten, so you can freely add logic to them.9 type_id
is a unique identifier of the "state storage". The workflow name may be changed even after data has been created, thetype_id
can’t.10 state
points to the protobuf message representing the workflow’s state which is kept by Kalix.
Implementing behavior
Now that we have our workflow component defined, the remaining tasks can be summarized as follows:
-
implement endpoint(s) to interact with the workflow (e.g. to start a workflow, or provide additional data) or retrieve its current state;
-
provide a workflow definition with all possible steps and transitions between them.
Starting workflow
Having created the basis of our workflow, we will now define how to launch a workflow with a command handler. In the example below, we implement Start
endpoint that accepts Transfer
command and return an Effect
to start a workflow by providing a transition to the first step. Also, we will update the state with an initial value.
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
@Override public Effect<Empty> start(TransferState currentState, Transfer transfer) { if (transfer.getAmount() <= 0) { return effects().error("transfer amount should be greater than zero"); (1) } else if (currentState != null) { return effects().error("transfer already started"); (2) } else { TransferState initialState = TransferState.newBuilder() (3) .setFrom(transfer.getFrom()) .setTo(transfer.getTo()) .setAmount(transfer.getAmount()) .setStatus(TransferDomain.TransferStatus.STARTED) .build(); Withdraw withdrawInput = Withdraw.newBuilder() .setFrom(transfer.getFrom()) .setAmount(transfer.getAmount()) .build(); return effects() .updateState(initialState) (4) .transitionTo("withdraw", withdrawInput) (5) .thenReply(Empty.getDefaultInstance()); (6) } }
1 The validation ensures the transfer amount is greater than zero and it fails for calls with illegal values by returning an Effect
witheffects().error
.2 We should handle the situation when the workflow is already running and return a proper message. Otherwise, we might corrupt the running workflow. 3 From the incoming command we create an initial TransferState
.4 We instruct Kalix to persist the new state. 5 With the transitionTo
method, we inform that the name of the first step is"withdraw"
and the input for this step is aWithdraw
message.6 The last instruction is to inform the caller that the workflow was successfully started. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
override def start(currentState: TransferState, transfer: Transfer): Effect[Empty] = { if (transfer.amount <= 0) { effects.error("transfer amount should be greater than zero") (1) } else if (currentState != null) { effects.error("transfer already started") (2) } else { val initialState = TransferState(transfer.from, transfer.to, transfer.amount, TransferStatus.STARTED) (3) val withdrawInput = Withdraw(transfer.from, transfer.amount) effects .updateState(initialState) (4) .transitionTo("withdraw", withdrawInput) (5) .thenReply(Empty()) (6) } }
1 The validation ensures the transfer amount is greater than zero and it fails for calls with illegal values by returning an Effect
witheffects.error
.2 We should handle the situation when the workflow is already running and return a proper message. Otherwise, we might corrupt the running workflow. 3 From the incoming command we create an initial TransferState
.4 We instruct Kalix to persist the new state. 5 With the transitionTo
method, we inform that the name of the first step is"withdraw"
and the input for this step is aWithdraw
message.6 The last instruction is to inform the caller that the workflow was successfully started.
Workflow definition
One missing piece of our transfer workflow implementation is a workflow definition
method, which composes all steps connected with transitions. A workflow Step
has a unique name, an action to perform (e.g. deferred call to an existing Kalix component, or asynchronous call to any external service) and a transition to select the next step (or end
transition to finish the workflow, in case of the last step).
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
@Override public WorkflowDef<TransferState> definition() { Step withdraw = step("withdraw") (1) .call(Withdraw.class, cmd -> { WithdrawRequest withdrawRequest = WithdrawRequest.newBuilder() .setWalletId(cmd.getFrom()) .setAmount(cmd.getAmount()) .build(); return components().walletEntity().withdraw(withdrawRequest); }) (2) .andThen(Empty.class, __ -> { Deposit depositInput = Deposit.newBuilder() .setTo(currentState().getTo()) .setAmount(currentState().getAmount()) .build(); return effects() .updateState(currentState().toBuilder().setStatus(WITHDRAW_SUCCEED).build()) .transitionTo("deposit", depositInput); (3) }); Step deposit = step("deposit") (1) .call(Deposit.class, cmd -> { DepositRequest depositRequest = DepositRequest.newBuilder().setWalletId(cmd.getTo()).setAmount(cmd.getAmount()).build(); return components().walletEntity().deposit(depositRequest); }) (4) .andThen(Empty.class, __ -> { return effects() .updateState(currentState().toBuilder().setStatus(COMPLETED).build()) .end(); (5) }); return workflow() (6) .addStep(withdraw) .addStep(deposit); }
1 Each step should have a unique name. 2 We instruct Kalix to run a given deferred call, to withdraw funds from a wallet. 3 After successful withdrawal we return an Effect
that will update the workflow state and move to the next step called"deposit"
. An input parameter for this step is aDeposit
message.4 Another workflow step action to deposit funds to a given wallet. 5 This time we return an effect that will stop workflow processing, by using special end
method.6 We collect all steps to form a workflow definition. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
override def definition: AbstractWorkflow.WorkflowDef[TransferState] = { val withdraw = step("withdraw") (1) .call { (withdraw: Withdraw) => val withdrawRequest = WithdrawRequest(withdraw.from, withdraw.amount) components.walletEntity.withdraw(withdrawRequest) } (2) .andThen { _ => val deposit = Deposit(currentState().to, currentState().amount) effects .updateState(currentState().copy(status = WITHDRAW_SUCCEED)) .transitionTo("deposit", deposit) (3) } val deposit = step("deposit") (1) .call { (deposit: Deposit) => val depositRequest = DepositRequest(deposit.to, deposit.amount) components.walletEntity.deposit(depositRequest) } (4) .andThen { _ => effects .updateState(currentState().copy(status = COMPLETED)) .end (5) } workflow (6) .addStep(withdraw) .addStep(deposit) }
1 Each step should have a unique name. 2 We instruct Kalix to run a given deferred call, to withdraw funds from a wallet. 3 After successful withdrawal we return an Effect
that will update the workflow state and move to the next step called"deposit"
. An input parameter for this step is aDeposit
message.4 Another workflow step action to deposit funds to a given wallet. 5 This time we return an effect that will stop workflow processing, by using special end
method.6 We collect all steps to form a workflow definition.
Retrieving state
To have access to the current state of the workflow we can use currentState()
or the command handler parameter. However, if this is the first command we are receiving for this workflow, the state will be null
. We can change it by overriding the emptyState
method. The following example shows the implementation of the read-only command handler:
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
@Override public Effect<TransferState> getTransferState(TransferState currentState, GetRequest getRequest) { if (currentState == null) { return effects().error("transfer not found", NOT_FOUND); } else { return effects().reply(currentState); (1) } }
1 Returns the current state as reply for the request. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
override def getTransferState(currentState: TransferState, getRequest: GetRequest): Effect[TransferState] = { if (currentState == null) { effects.error("Transfer does not exist", Code.NOT_FOUND) } else { effects.reply(currentState) (1) } }
1 Returns the current state as reply for the request.
For simplicity purposes, we are returning the internal state directly back to the requester. In a real-world scenario, it’s usually better to instead convert this internal domain model into a public model so the internal representation is free to evolve without breaking clients code. |
A full transfer workflow source code is available here (Java Protobuf) or here (Scala Protobuf). Follow the README
file to run and test it.
Pausing workflow
A long-running workflow can be paused while waiting for some additional information to continue processing. A special pause
transition can be used to inform Kalix that the execution of the Workflow should be postponed. By launching a request to a Workflow endpoint, the user can then resume the processing. Additionally, a Kalix Timer can be scheduled to automatically inform the Workflow that the expected time for the additional data has passed.
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
Step waitForAcceptation = step("wait-for-acceptation") .asyncCall(() -> { AcceptationTimeoutRequest timeoutRequest = AcceptationTimeoutRequest.newBuilder().setTransferId(currentState().getTransferId()).build(); return timers().startSingleTimer( (1) "acceptationTimout-" + currentState().getTransferId(), ofHours(8), components().transferWorkflow().acceptationTimeout(timeoutRequest) ).thenApply(__ -> Empty.getDefaultInstance()); }) .andThen(Empty.class, __ -> effects().pause()); (2)
1 Schedules a timer as a Workflow step action. Make sure that the timer name is unique for every Workflow instance. 2 Pauses the Workflow execution. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
val waitForAcceptation = step("wait-for-acceptation") .asyncCall { () => val timeoutRequest = AcceptationTimeoutRequest(currentState().transferId) timers .startSingleTimer( (1) "acceptationTimout-" + currentState().transferId, 8.hours, components.transferWorkflow.acceptationTimeout(timeoutRequest)) .map(_ => Empty()) } .andThen { _ => effects.pause } (2)
1 Schedules a timer as a Workflow step action. Make sure that the timer name is unique for every Workflow instance. 2 Pauses the Workflow execution.
Remember to cancel the timer once the Workflow is resumed. Also, adjust the Workflow timeout to match the timer schedule. |
Exposing additional mutational endpoints from the Workflow implementation should be done with special caution. Accepting a request from such endpoints should only be possible when the Workflow is in the expected state.
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
@Override public Effect<Empty> accept(TransferState currentState, TransferApi.AcceptRequest acceptRequest) { if (currentState == null) { return effects().error("transfer not started"); } else if (currentState.getStatus().equals(WAITING_FOR_ACCEPTATION)) { (1) Withdraw withdrawInput = Withdraw.newBuilder() .setFrom(currentState.getFrom()) .setAmount(currentState.getAmount()) .build(); return effects() .transitionTo("withdraw", withdrawInput) .thenReply(Empty.getDefaultInstance()); } else { (2) return effects().error("Cannot accept transfer with status: " + currentState.getStatus()); } }
1 Accepts the request only when status is WAITING_FOR_ACCEPTATION
.2 Otherwise, rejects the requests. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
override def accept(currentState: TransferState, acceptRequest: AcceptRequest): AbstractWorkflow.Effect[Empty] = { if (currentState == null) { effects.error("Transfer not started") } else if (currentState.status == WAITING_FOR_ACCEPTATION) { (1) val withdrawInput = Withdraw(currentState.from, currentState.amount) effects .transitionTo("withdraw", withdrawInput) .thenReply(Empty()) } else { (2) effects.error(s"Cannot accept transfer with status: ${currentState.status}") } }
1 Accepts the request only when status is WAITING_FOR_ACCEPTATION
.2 Otherwise, rejects the requests.
Error handling
Design for failure is one of the key attributes of all Kalix components. Workflow has the richest set of configurations from all of them. It’s essential to build robust and reliable solutions.
Timeouts
By default, a workflow run has no time limit. It can run forever, which in most cases is not desirable behavior. A workflow step, on the other hand, has a default timeout of 5 seconds timeout. Both settings can be overridden at the workflow definition level or for a specific step configuration.
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
return workflow() .timeout(ofSeconds(5)) (1) .defaultStepTimeout(ofSeconds(2)) (2)
1 Sets a workflow global timeout. 2 Sets a default timeout for all workflow steps. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
workflow .timeout(5.seconds) (1) .defaultStepTimeout(2.seconds) (2)
1 Sets a workflow global timeout. 2 Sets a default timeout for all workflow steps.
A default step timeout can be overridden in step builder.
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
Step failoverHandler = step("failover-handler") .asyncCall(() -> { return CompletableFuture.completedStage(Done.getInstance()).thenApply(__ -> Empty.getDefaultInstance()); }) .andThen(Empty.class, __ -> effects() .updateState(currentState().toBuilder().setStatus(REQUIRES_MANUAL_INTERVENTION).build()) .end()) .timeout(ofSeconds(1)); (1)
1 Overrides the step timeout for a specific step. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
val failoverHandler = step("failover-handler") .asyncCall { () => Future.successful("handling failure").map(_ => Empty()) } .andThen { _ => effects .updateState(currentState().withStatus(REQUIRES_MANUAL_INTERVENTION)) .end } .timeout(1.second) (1)
1 Overrides the step timeout for a specific step.
Recover strategy
It’s time to define what should happen in case of timeout or any other unhandled error.
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
return workflow() .failoverTo("failover-handler", maxRetries(0)) (1) .defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) (2) .addStep(withdraw) .addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) (3) .addStep(waitForAcceptation) .addStep(compensateWithdraw) (4) .addStep(failoverHandler);
1 Sets a failover transition in case of a workflow timeout. 2 Sets a default failover transition for all steps with maximum number of retries. 3 Overrides the step recovery strategy for the deposit
step.4 Failover steps should be added like any other steps. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
workflow .failoverTo("failover-handler", maxRetries(0)) (1) .defaultStepRecoverStrategy(maxRetries(1).failoverTo("failover-handler")) (2) .addStep(withdraw) .addStep(deposit, maxRetries(2).failoverTo("compensate-withdraw")) (3) .addStep(waitForAcceptation) .addStep(compensateWithdraw) (4) .addStep(failoverHandler);
1 Sets a failover transition in case of a workflow timeout. 2 Sets a default failover transition for all steps with maximum number of retries. 3 Overrides the step recovery strategy for the deposit
step.4 Failover steps should be added like any other steps.
In case of a workflow timeout one last failover step can be performed. Transitions from that failover step will be ignored. |
Compensation
The idea behind the Workflow error handling is that workflows should only fail due to unknown errors during execution. In general, you should always write your workflows so that they do not fail on any known edge cases. If you expect an error, it’s better to be explicit about it, possibly with your domain types. Based on this information and the flexible Workflow API you can define a compensation for any workflow step.
- Java
-
src/main/java/com/example/transfer/api/TransferWorkflow.java
Step deposit = step("deposit") .call(Deposit.class, cmd -> { DepositRequest depositRequest = DepositRequest.newBuilder().setWalletId(cmd.getTo()).setAmount(cmd.getAmount()).build(); return components().walletEntity().deposit(depositRequest); }) .andThen(DepositResult.class, depositResult -> { (1) if (depositResult.hasSucceed()) { return effects() .updateState(currentState().toBuilder().setStatus(COMPLETED).build()) .end(); (2) } else { return effects() .updateState(currentState().toBuilder().setStatus(DEPOSIT_FAILED).build()) .transitionTo("compensate-withdraw"); (3) } }); Step compensateWithdraw = step("compensate-withdraw") (3) .call(() -> { DepositRequest refund = DepositRequest.newBuilder().setAmount(currentState().getAmount()).setWalletId(currentState().getFrom()).build(); return components().walletEntity().deposit(refund); }) .andThen(DepositResult.class, depositResult -> { if (depositResult.hasSucceed()) { return effects() .updateState(currentState().toBuilder().setStatus(COMPENSATION_COMPLETED).build()) .end(); (4) } else { throw new IllegalStateException("Expecting succeed operation but received: " + depositResult); (5) } });
1 Explicit deposit call result type DepositResult
.2 Finishes workflow as completed, in the case of a successful deposit. 3 Launches compensation step to handle deposit failure. The "withdraw"
step must be reversed. Compensation step is just any other step, with the same set of functionalities.4 Correct compensation can finish the workflow. 5 Any other result might be handled by a default recovery strategy. - Scala
-
src/main/scala/com/example/transfer/api/TransferWorkflow.scala
val deposit = step("deposit") .call { (deposit: Deposit) => val depositRequest = DepositRequest(deposit.to, deposit.amount) components.walletEntity.deposit(depositRequest) } .andThen { depositResult => (1) depositResult.result match { case DepositResult.Result.Succeed(_) => effects .updateState(currentState().withStatus(COMPLETED)) .end (2) case DepositResult.Result.Failed(msg) => effects .updateState(currentState().withStatus(DEPOSIT_FAILED)) .transitionTo("compensate-withdraw"); (3) case DepositResult.Result.Empty => throw new IllegalStateException(s"not supported deposit result: $depositResult") } } val compensateWithdraw = step("compensate-withdraw") (3) .call { () => val depositRequest = DepositRequest(currentState().from, currentState().amount) components.walletEntity.deposit(depositRequest) } .andThen { depositResult => depositResult.result match { case DepositResult.Result.Succeed(_) => effects .updateState(currentState().withStatus(COMPENSATION_COMPLETED)) .end (4) case _ => throw new IllegalStateException("Expecting succeed operation but received: " + depositResult); (5) } }
1 Explicit deposit call result type DepositResult
.2 Finishes workflow as completed, in the case of a successful deposit. 3 Launches compensation step to handle deposit failure. The "withdraw"
step must be reversed. Compensation step is just any other step, with the same set of functionalities.4 Correct compensation can finish the workflow. 5 Any other result might be handled by a default recovery strategy.
Compensating a workflow step (steps) might involve multiple logical steps and thus is part of the overall business logic that must be defined within the workflow itself. For simplicity, in the example above, the compensation is applied only to withdraw
step. Whereas deposit
step itself might also require a compensation. In case of a step timeout we can’t be certain about step successful or error outcome.
A full error handling and compensation sample is available here (Java Protobuf) or here (Scala Protobuf). Run TransferWorkflowIntegrationTest
or TransferWorkflowIntegrationSpec
and examine the logs from the application.