Timers

Timers allow for scheduling calls in the future. For example, to verify that some process have been completed or not.

Timers are persisted in the Kalix proxy and are guaranteed to run at least once.

When a timer is triggered, the scheduled call is executed. If successfully executed, the timer completes and is automatically removed. In case of a failure, the timer is rescheduled with a delay of 3 seconds. This process repeats until the call succeeds.

You can schedule a timer for any service method in Kalix, but you can only create a timer from within an Action and by passing a DeferredCall.

Timer features:

  • Timers are guaranteed to run at least once.

  • Timers can be scheduled to run at any time in the future.

  • Timers can be cancelled.

  • Timers are automatically removed once successfully completed.

  • Timers are re-scheduled in case of failures.

Timer limitations:

  • Timers can only be scheduled from within an Action.

  • Timers can only call other components (eg: Actions and Entities), therefore external service calls must be wrapped by an Action in the deployed service.

To demonstrate its functionality, let’s consider an Ordering Service composed of a Value Entity and an Action. The Action will work as a Controller receiving calls and delegating to the Order Entity.

Users can place an order, but the order must be confirmed within a period of time. You can compare it with an Ordering Food application where the restaurant needs to confirm if it can accept the order. If no confirmation is sent within some pre-defined period of time, the order is automatically cancelled.

Order Entity

Let’s have a look on how the Order Entity can be implemented.

Java
src/main/java/com/example/domain/Order.java
public class Order extends AbstractOrder {
  @SuppressWarnings("unused")
  private final String entityId;

  public Order(ValueEntityContext context) {
    this.entityId = context.entityId();
  }

  @Override
  public OrderDomain.OrderState emptyState() {
    return OrderDomain.OrderState.getDefaultInstance();
  }


  @Override
  public Effect<Empty> placeOrder(OrderDomain.OrderState currentState, OrderServiceApi.OrderRequest orderRequest) {
    OrderDomain.OrderState orderState =
        OrderDomain.OrderState.newBuilder()
            .setOrderNumber(orderRequest.getOrderNumber())
            .setItem(orderRequest.getItem())
            .setQuantity(orderRequest.getQuantity())
            .setPlaced(true) (1)
            .build();

    return effects().updateState(orderState).thenReply(Empty.getDefaultInstance());
  }

  @Override
  public Effect<Empty> confirm(OrderDomain.OrderState currentState, OrderServiceApi.ConfirmRequest confirmRequest) {
    if (currentState.getPlaced()) { (2)
      return effects()
          .updateState(currentState.toBuilder().setConfirmed(true).build())
          .thenReply(Empty.getDefaultInstance());
    } else {
      return effects().error(
        "No order found for '" + confirmRequest.getOrderNumber() + "'",
          Status.Code.NOT_FOUND); (3)
    }
  }

  @Override
  public Effect<Empty> cancel(OrderDomain.OrderState currentState, OrderServiceApi.CancelRequest cancelRequest) {
    if (!currentState.getPlaced()) {
      return effects().error(
          "No order found for " + cancelRequest.getOrderNumber(),
          Status.Code.NOT_FOUND); (4)
    } else if (currentState.getConfirmed()) {
      return effects().error(
          "Can not cancel an already confirmed order",
          Status.Code.INVALID_ARGUMENT); (5)
    } else {
      return effects().updateState(emptyState())
          .thenReply(Empty.getDefaultInstance()); (6)
    }
  }
}
1 The first method to look at is the placeOrder. It’s basically the creation of an order. Note that we set the placed field to true.
2 When confirming an Order, we must ensure that the Order was created before.
3 If the Order was never created, we return NOT_FOUND.
4 Cancelling an Order that was never placed also returns NOT_FOUND.
5 While cancelling an already confirmed order returns INVALID_ARGUMENT.
6 Finally, if the Order is placed, but not confirmed, the cancel method resets the order to the emptyState. This is the equivalent of deleting it.
Scala
src/main/scala/com/example/domain/Order.scala
class Order(context: ValueEntityContext) extends AbstractOrder {

  override def emptyState: OrderState =
    OrderState.defaultInstance

  override def placeOrder(currentState: OrderState, orderRequest: OrderRequest): ValueEntity.Effect[Empty] = {
    val placedOrder =
      OrderState(
      orderRequest.orderNumber,
      placed = true, (1)
      item = orderRequest.item,
      quantity = orderRequest.quantity)

    effects.updateState(placedOrder).thenReply(Empty.defaultInstance)
  }

  override def confirm(currentState: OrderState, confirmRequest: example.ConfirmRequest): ValueEntity.Effect[Empty] =
    if (currentState.placed) (2)
      effects
        .updateState(currentState.copy(confirmed = true))
        .thenReply(Empty.defaultInstance)
    else
      effects
        .error(
          s"No order found for '${confirmRequest.orderNumber}'",
          Status.Code.NOT_FOUND) (3)

  override def cancel(currentState: OrderState, cancelRequest: example.CancelRequest): ValueEntity.Effect[Empty] =
    if (!currentState.placed)
      effects
        .error(
          s"No order found for '${cancelRequest.orderNumber}'",
          Status.Code.NOT_FOUND) (4)
    else if (currentState.confirmed)
      effects
        .error(
          "Can not cancel an already confirmed order",
          Status.Code.INVALID_ARGUMENT) (5)
    else
      effects.updateState(emptyState)
        .thenReply(Empty.defaultInstance) (6)

}
1 The first method to look at is the placeOrder. It’s basically the creation of an order. Note that we set the placed field to true.
2 When confirming an Order, we must ensure that the Order was created before.
3 If the Order was never created, we return NOT_FOUND.
4 Cancelling an Order that was never placed also returns NOT_FOUND.
5 While cancelling an already confirmed order returns INVALID_ARGUMENT.
6 Finally, if the Order is placed, but not confirmed, the cancel method resets the order to the emptyState. This is the equivalent of deleting it.

Order Action and Timers

As said before, the OrderAction will act as a controller intercepting incoming messages, running some logic and then calling the Order Entity.

Scheduling a timer

We will first look at OrderAction.placeOrder. Before delegating the request to the Order Entity, the Action creates a timer.

Java
src/main/java/com/example/actions/OrderAction.java
public class OrderAction extends AbstractOrderAction {
  private String timerName(OrderApi.OrderNumber orderNum) {
    return "order-expiration-timer-" + orderNum.getNumber();
  }

  @Override
  public Effect<OrderApi.OrderNumber> placeOrder(OrderApi.OrderRequest orderRequest) {

    OrderApi.OrderNumber orderNumber = (1)
        OrderApi.OrderNumber
            .newBuilder()
            .setNumber(UUID.randomUUID().toString())
            .build();

    CompletionStage<Done> timerRegistration = (2)
        timers().startSingleTimer(
            timerName(orderNumber), (3)
            Duration.ofMinutes(5), (4)
            components().orderAction().expire(orderNumber) (5)
        );

    OrderServiceApi.OrderRequest request = (6)
        OrderServiceApi.OrderRequest.newBuilder()
            .setOrderNumber(orderNumber.getNumber())
            .setItem(orderRequest.getItem())
            .setQuantity(orderRequest.getQuantity())
            .build();


    return effects().asyncReply( (7)
        timerRegistration
            .thenCompose(done -> components().order().placeOrder(request).execute())
            .thenApply(empty -> orderNumber)
    );
  }
}
1 First we generate a random identifier for the Order. We will use it for identifying the Order, but also as unique name for the timer.
2 We call the timers API to register a new timer. Note that it returns CompletionStage<Done>. A successul completion means that Kalix registered the timer.
3 Order number is used to generate a unique name for the timer.
4 We set the delay we want for the timer to trigger.
5 We scheduled call to OrderAction.expire method. We will cover it in a while.
6 We build a request for the Order entity.
7 Finally, we build an asyncReply by composing the timerResgistration CompletionStage with a call to place the order. We access the Order entity through components.order and we call execute() to run the call.
Scala
src/main/java/com/example/actions/OrderAction.scala
class OrderAction(creationContext: ActionCreationContext) extends AbstractOrderAction {
  def timerName(orderNum: OrderNumber) =
    "order-expiration-timer-" + orderNum.number

  override def placeOrder(orderRequest: OrderRequest): Action.Effect[OrderNumber] = {

    val orderNumber = OrderNumber(UUID.randomUUID().toString) (1)

    val timerRegistration: Future[Done] = (2)
      timers.startSingleTimer(
        name = timerName(orderNumber), (3)
        delay = 5.minutes, (4)
        deferredCall = components.orderAction.expire(orderNumber) (5)
      )

    def placeOrder(): Future[OrderNumber] = (6)
      components.order
        .placeOrder(example
          .OrderRequest(orderNumber = orderNumber.number, item = orderRequest.item, quantity = orderRequest.quantity))
        .execute() (7)
        .map(_ => orderNumber)

    effects.asyncReply(timerRegistration.flatMap(_ => placeOrder())) (8)
  }

}
1 First we generate a random identifier for the Order. We will use it for identifying the Order, but also as unique name for the timer.
2 We call the timers API to register a new timer. Note that it returns Future[Done]. A successul completion means that Kalix registered the timer.
3 Order number is used to generate a unique name for the timer.
4 We set the delay we want for the timer to trigger.
5 We scheduled call to OrderAction.expire method. We will cover it in a while.
6 Next we define a method that will make a call to the Order entity.
7 We access the Order entity through components.order and we call execute() to run the call.
8 Finally, we build an asyncReply by chaining the timerResgistration and the placeOrder futures.

In a nutshell, we first requested Kalix to register a timer. When it completes, we know that the timer is persisted and will run at the specified time. We then proceed by placing the order.

The sequence of actions is important here. If we had called the entity first and then registered the timer, the Order could have been placed and the timer registration could have failed due to some network issue for example. In such a case, we would end up with an Order without an expiration timer.

But the inverse is also true. There is still the risk of registering the timer and then failing to place the Order. However, the implementation of the expire method can take that into account.

Handling the timer call

Let’s have a look at the OrderAction.expire method implementation.

Java
src/main/java/com/example/actions/OrderAction.java
public class OrderAction extends AbstractOrderAction {
  @Override
  public Effect<Empty> expire(OrderApi.OrderNumber orderNumber) {
    logger.info("Expiring order '{}'", orderNumber.getNumber());

    Predicate<StatusRuntimeException> validateErrorCodes = exception -> {
      Status.Code code = exception.getStatus().getCode();
      return code == Status.Code.NOT_FOUND || code == Status.Code.INVALID_ARGUMENT;
    };

    OrderServiceApi.CancelRequest cancelRequest =
        OrderServiceApi.CancelRequest.newBuilder()
            .setOrderNumber(orderNumber.getNumber())
            .build();

    CompletionStage<Empty> reply =
        components().order()
            .cancel(cancelRequest)
            .execute() (1)
            .thenApply(cancelled -> Empty.getDefaultInstance()) (2)
            .exceptionally(e -> { (3)
                  if (e instanceof StatusRuntimeException && validateErrorCodes.test((StatusRuntimeException) e)) {
                    // if NotFound or InvalidArgument, we don't need to re-try, and we can move on
                    // other kind of failures are not recovered and will trigger a re-try
                    return Empty.getDefaultInstance();
                  } else {
                    throw new StatusRuntimeException(Status.fromThrowable(e));
                  }
                }
            );
    return effects().asyncReply(reply);
  }
}
1 When the OrderAction receives the expiration call, it immediately tries to cancel the Order. We use the execute() method to run it. This method returns a CompletionStage.
2 If the CompletionStage completes successfuly, we are all good and we can simply return Empty.getDefaultInstance(). Since this method is returning normally, the timer will be considered as executed and will be removed from Kalix.
3 On the other hand, if the CompletionStage completes with a failure, we must decide if we will recover the call or not. If we recover, the timer will be considered as completed. If we will let the call fail, the timer will be re-scheduled.
Scala
src/main/java/com/example/actions/OrderAction.scala
class OrderAction(creationContext: ActionCreationContext) extends AbstractOrderAction {
  override def expire(orderNumber: OrderNumber): Action.Effect[Empty] = {
    logger.info("Expiring order '{}'", orderNumber.number)

    def validateErrorCodes(code: Status.Code) =
      code == Status.Code.NOT_FOUND || code == Status.Code.INVALID_ARGUMENT

    val result =
      components.order
        .cancel(example.CancelRequest(orderNumber.number))
        .execute() (1)
        .map { _ => Empty.defaultInstance } (2)
        .recover { (3)
          case ex: StatusRuntimeException if validateErrorCodes(ex.getStatus.getCode) =>
            // if NotFound or InvalidArgument, we don't need to re-try, and we can move on
            // other kind of failures are not recovered and will trigger a re-try
            Empty.defaultInstance
        }

    effects.asyncReply(result)
  }

}
1 When the OrderAction receives the expiration call, it immediately tries to cancel the Order. We use the execute() method to run it. This method returns a Future.
2 If the Future completes successfuly, we are all good and we can simply return Empty.defaultInstance. Since this method is returning normally, the timer will be considered as executed and will be removed from Kalix.
3 On the other hand, if the Future completes with a failure, we must decide if we will recover the call or not. If we recover, the timer will be considered as completed. If we will let the call fail, the timer will be re-scheduled.

We have seen the Order.cancel implementation and we know that if we get a NOT_FOUND, it means that either the Order never existed or it was already cancelled (deleted). Or, we may get an INVALID_ARGUMENT error, meaning that the order has been confirmed in the meantime. In both cases, we can consider that the timer has become obsolete and don’t need to be rescheduled, therefore we recover the call.

For all other possible errors, the call to OrderAction.expire will fail and the timer will be re-scheduled.

Whenever we implement a method that is called from a timer, we need carefully handle errors inside that method. Failing to do so may cause the timer to keep re-scheduling. Therefore, we should ensure that any failure is properly handled and only propagated if the intention is to re-try the call.

Cancelling a timer

Next, we can have a look at OrderAction.confirm and OrderAction.cancel implementations. They are very similar. The only difference being the method they call on the Order entity.

Java
src/main/java/com/example/actions/OrderAction.java
public class OrderAction extends AbstractOrderAction {
  @Override
  public Effect<Empty> confirm(OrderApi.OrderNumber orderNumber) {
    logger.info("Confirming order '{}'", orderNumber.getNumber());
    OrderServiceApi.ConfirmRequest request =
        OrderServiceApi.ConfirmRequest.newBuilder()
            .setOrderNumber(orderNumber.getNumber())
            .build();

    CompletionStage<Empty> reply =
        components().order() (1)
            .confirm(request)
            .execute()
            .thenCompose(req -> timers().cancel(timerName(orderNumber))) (2)
            .thenApply(done -> Empty.getDefaultInstance());

    return effects().asyncReply(reply);
  }

  @Override
  public Effect<Empty> cancel(OrderApi.OrderNumber orderNumber) {
    logger.info("Cancelling order '{}'", orderNumber.getNumber());
    OrderServiceApi.CancelRequest request =
        OrderServiceApi.CancelRequest.newBuilder()
            .setOrderNumber(orderNumber.getNumber())
            .build();

    CompletionStage<Empty> reply =
        components().order()
            .cancel(request)
            .execute()
            .thenCompose(req -> timers().cancel(timerName(orderNumber)))
            .thenApply(done -> Empty.getDefaultInstance());

    return effects().asyncReply(reply);
  }
}
1 We first call the Order entity to execute the command.
2 If it succeeds, we remove the timer.
Scala
src/main/java/com/example/actions/OrderAction.scala
class OrderAction(creationContext: ActionCreationContext) extends AbstractOrderAction {
  override def confirm(orderNumber: OrderNumber): Action.Effect[Empty] = {
    logger.info("Confirming order '{}'", orderNumber.number)
    val reply =
      for {
        _ <- components.order (1)
          .confirm(example.ConfirmRequest(orderNumber.number))
          .execute()
        _ <- timers.cancel(timerName(orderNumber)) (2)
      } yield Empty.defaultInstance

    effects.asyncReply(reply)
  }

  override def cancel(orderNumber: OrderNumber): Action.Effect[Empty] = {
    logger.info("Cancelling order '{}'", orderNumber.number)
    val reply =
      for {
        _ <- components.order
          .cancel(example.CancelRequest(orderNumber.number))
          .execute()
        _ <- timers.cancel(timerName(orderNumber))
      } yield Empty.defaultInstance

    effects.asyncReply(reply)
  }

}
1 We first call the Order entity to execute the command.
2 If it succeeds, we remove the timer.

In both methods, we pass the request to the entity and when it completes, we cancel the timer.

Once more, the ordering is important. It’s not a problem if the call to cancel the timer fails. As we have seen in the OrderAction.expire implementation, if the timer is triggered, but is obsolete, we will properly recover from it and signal to Kalix that the timer can be removed.

We could have completely ignore the timer when handling the confirmation or the cancelling. The registered timer would then be triggered at some point later and the expire method would have handled the fact that it has become obsolete. However, it’s always of good measure to do some housekeeping to save resources.