Implementing Workflow Entities

Implementing a business transaction that spans multiple services is one of the major challenges in distributed systems implementation. Fortunately, with Kalix ecosystem and concepts like Subscription, it’s very easy to build an event driven choreography that covers a complex business process. A Saga pattern that generalizes this solution was never so easy to implement. However, for some use cases a different flavor of the Saga pattern, a choreography, might be more suitable. That was the main driver behind a Kalix component type called a Workflow Entity. From a high-level perspective it joins the power of Kalix Entities state model (durability, consistency guaranties) and Kalix Actions ability to call other components and services. You can model your business process in a single place and the Workflow Entity will keep it running or rollback in case of a failure.

Modeling state

We want to build a simple workflow that transfers funds between two wallets. Before that, we will create a wallet subdomain with some basic functionalities that we could use later. For simplicity a WalletEntity is implemented as a ValueEntity, but for a production-ready solution an EventSourcedEntity would be a better choice.

src/main/java/com/example/wallet/WalletEntity.java
@EntityKey("id")
@EntityType("wallet")
@RequestMapping("/wallet/{id}")
public class WalletEntity extends ValueEntity<WalletEntity.Wallet> {

  public record Wallet(String id, int balance) {
    public Wallet withdraw(int amount) {
      return new Wallet(id, balance - amount);
    }
    public Wallet deposit(int amount) {
      return new Wallet(id, balance + amount);
    }
  }

  public record Balance(int value) {
  }

  @PostMapping("/create/{initBalance}") (1)
  public Effect<String> create(@PathVariable String id, @PathVariable int initBalance) {
    return effects().updateState(new Wallet(id, initBalance)).thenReply("Ok");
  }

  @PatchMapping("/withdraw/{amount}") (2)
  public Effect<String> withdraw(@PathVariable int amount) {
    var newBalance = currentState().balance() - amount;
    if (newBalance < 0) {
      return effects().error("Insufficient balance");
    } else {
      Wallet updatedWallet = currentState().withdraw(amount);
      return effects().updateState(updatedWallet).thenReply("Ok");
    }
  }

  @PatchMapping("/deposit/{amount}") (3)
  public Effect<String> deposit(@PathVariable int amount) {
    Wallet updatedWallet = currentState().deposit(amount);
    return effects().updateState(updatedWallet).thenReply("Ok");
  }

  @GetMapping (4)
  public Effect<Balance> get() {
    return effects().reply(new Balance(currentState().balance()));
  }
}
1 An endpoint to create a wallet with an initial balance.
2 An endpoint to withdraw funds from the wallet.
3 An endpoint to deposit funds to the wallet.
4 An endpoint to get current wallet balance.

Now we can focus on the workflow implementation itself. A workflow has state, which can be updated in command handlers and transition methods. During the state modeling we might consider the information is required for validation, running the steps, collecting data from steps or tracking the workflow progress.

src/main/java/com/example/transfer/TransferState.java
public record TransferState(Transfer transfer, TransferStatus status) {

  public record Transfer(String from, String to, int amount) { (1)
  }

  public enum TransferStatus { (2)
    STARTED, SUCCESSFUL_WITHDRAWAL, COMPLETED
  }

  public TransferState(Transfer transfer) {
    this(transfer, STARTED);
  }

  public TransferState withStatus(TransferStatus newStatus){
    return new TransferState(transfer, newStatus);
  }
}
1 A Transfer record encapsulates data required to withdraw and deposit funds.
2 A TransferStatus is used to track workflow progress.

Implementing behavior

Now that we have our workflow state defined, the remaining tasks can be summarized as follows:

  • declare your entity and pick an entity type and key (it needs to be unique as it will be used for sharding purposes);

  • define an access point (i.e. a route path) to your entity;

  • implement endpoint(s) to interact with the workflow (e.g. to start a workflow, or provide additional data) or retrieve its current state;

  • provide a workflow definition with all possible steps and transitions between them.

Let’s have a look at what our transfer workflow entity will look like for the first 2 points from the above list:

src/main/java/com/example/transfer/TransferWorkflowEntity.java
@EntityType("transfer") (2)
@EntityKey("transferId") (3)
@RequestMapping("/transfer/{transferId}") (4)
public class TransferWorkflowEntity extends WorkflowEntity<TransferState> { (1)
1 Create a class that extends WorkflowEntity<S>, where S is the state type this entity will store (i.e. TransferState).
2 Make sure to annotate such class with @EntityType and pass a unique name for this entity type.
3 Annotate such class with @EntityKey and pass the name of the key that will be used as the entity unique identifier.
4 Use Spring’s RequestMapping annotation to define the route to your entity.
The EntityKey transferId must match a path parameter (i.e. transferId) and such value needs to be unique per entity. On the other hand, the EntityType transfer is common for all instances of this workflow entity but must be stable - cannot be changed after a production deploy - and unique across the different entity types.

Starting workflow

Having created the basis of our workflow entity, we will now define how to launch a workflow with a command handler. In the example below, we define a new endpoint that will accept StartTransfer command and return an Effect to start a workflow by providing a transition to the first step. Also, we will update the state with an initial value.

src/main/java/com/example/transfer/TransferWorkflowEntity.java
public record Withdraw(String from, int amount) { (4)
}

@PutMapping
public Effect<Message> startTransfer(@RequestBody Transfer transfer) {
  if (transfer.amount() <= 0) {
    return effects().error("transfer amount should be greater than zero"); (1)
  } else if (currentState() != null) {
    return effects().error("transfer already started"); (2)
  } else {

    TransferState initialState = new TransferState(transfer); (3)

    Withdraw withdrawInput = new Withdraw(transfer.from(), transfer.amount());

    return effects()
        .updateState(initialState) (4)
        .transitionTo("withdraw", withdrawInput) (5)
        .thenReply(new Message("transfer started")); (6)
  }
}
1 The validation ensures the transfer amount is greater than zero and it fails for calls with illegal values by returning an Effect with effects().error.
2 We should handle the situation when the workflow is already running and return a proper message. Otherwise, we might corrupt the running workflow.
3 From the incoming command we create an initial TransferState.
4 We instruct Kalix to persist the new state.
5 With the transitionTo method, we inform that the name of the first step is "withdraw" and the input for this step is a Withdraw object.
6 The last instruction is to inform the caller that the workflow was successfully started.
For simplicity purposes, we are reusing the internal Transfer record as a request body. This should be a separate class and our domain state model shouldn’t be exposed as an entity public API.

Workflow definition

One missing piece of our transfer workflow implementation is a workflow definition method, which composes all steps connected with transitions. A workflow Step has a unique name, an action to perform (e.g. deferred call to an existing Kalix component, or asynchronous call to any external service) and a transition to select the next step (or end transition to finish the workflow, in case of the last step).

src/main/java/com/example/transfer/TransferWorkflowEntity.java
public record Deposit(String to, int amount) {
}

@Override
public Workflow<TransferState> definition() {
  Step withdraw =
      step("withdraw") (1)
          .call((Withdraw cmd) -> {
            String withdrawUri = "/wallet/" + cmd.from() + "/withdraw/" + cmd.amount();
            return kalixClient.patch(withdrawUri, String.class);
          }) (2)
          .andThen(__ -> {
            Deposit depositInput = new Deposit(currentState().transfer().to(), currentState().transfer().amount());
            return effects()
                .updateState(currentState().withStatus(SUCCESSFUL_WITHDRAWAL))
                .transitionTo("deposit", depositInput); (3)
          });

  Step deposit =
      step("deposit") (1)
          .call((Deposit cmd) -> {
            String depositUri = "/wallet/" + cmd.to() + "/deposit/" + cmd.amount();
            return kalixClient.patch(depositUri, String.class);
          }) (4)
          .andThen(__ -> {
            return effects()
                .updateState(currentState().withStatus(COMPLETED))
                .end(); (5)
          });

  return workflow() (6)
      .addStep(withdraw)
      .addStep(deposit);
}
1 Each step should have a unique name.
2 We instruct Kalix to run a given deferred call to withdraw funds from a wallet.
3 After successful withdrawal we return an Effect that will update the workflow state and move to the next step called "deposit". An input parameter for this step is a Deposit record.
4 Another workflow step action to deposit funds to a given wallet.
5 This time we return an effect that will stop workflow processing, by using special end method.
6 We collect all steps to form a workflow definition.

Retrieving state

To have access to the current state of the workflow entity we can use currentState() (similar to other entities). However, if this is the first command we are receiving for this workflow entity, the state will be null. We can change it by overriding the emptyState method. The following example shows the implementation of the read-only command handler (accessed through GET /transfer/transferId):

src/main/java/com/example/transfer/TransferWorkflowEntity.java
@GetMapping (1)
public Effect<TransferState> getTransferState() {
  if (currentState() == null) {
    return effects().error("transfer not started");
  } else {
    return effects().reply(currentState()); (2)
  }
}
1 Marks this method as a command handler for GET requests.
2 Returns the current state as reply for the request.
For simplicity purposes, we are returning the internal state directly back to the requester. In a real-world scenario, it’s usually better to instead convert this internal domain model into a public model so the internal representation is free to evolve without breaking clients code.

A full transfer workflow source code is available here. Follow the README file to run and test it.