Calling other services

Kalix services

In some cases it is useful to call a component in another service, for example interacting with multiple entities or actions, aggregating their return values or performing a multi-step workflow. Forwards and Side Effects allow us to trigger other services but does not make it possible to compose and transform the results of calling them.

Calling other Kalix services in the same project from an Action is done by invoking them over gRPC much like how an external client would. The service is however identified only by the name it has been deployed as, Kalix takes care of routing requests to the service and keeping the data safe by encrypting the connection for us.

In this sample we will make an action that does two sequential calls to the Value Entity Counter service, deployed with the service name "counter."

We start by adding the public API of the counter to the src/main/proto directory of our project.

Since the proto file of a Kalix service contains annotations that cause the Java/Protobuf SDK code generation to generate services, and we only want to consume the service, we need to start by removing the annotations.

Copy the API definition proto file from the other service into the proto directory but remove all kalix.service option blocks as well as all other Kalix annotations and the import "kalix/annotations.proto" from it.

This is how the counter descriptor looks with all annotations removed:

Java
src/main/proto/com/example/counter_api.proto
syntax = "proto3";

package com.example;

import "google/protobuf/empty.proto";

option java_outer_classname = "CounterApi";

message IncreaseValue {
  string counter_id = 1;
  int32 value = 2;
}

message DecreaseValue {
  string counter_id = 1;
  int32 value = 2;
}

message ResetValue {
  string counter_id = 1;
}

message GetCounter {
  string counter_id = 1;
}

message CurrentCounter {
  int32 value = 1;
}

service CounterService {
  rpc Increase (IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease (DecreaseValue) returns (google.protobuf.Empty);
  rpc Reset (ResetValue) returns (google.protobuf.Empty);
  rpc GetCurrentCounter (GetCounter) returns (CurrentCounter);
}
Scala
src/main/proto/com/example/counter_api.proto
syntax = "proto3";

package com.example;

import "google/protobuf/empty.proto";

message IncreaseValue {
  string counter_id = 1;
  int32 value = 2;
}

message DecreaseValue {
  string counter_id = 1;
  int32 value = 2;
}

message ResetValue {
  string counter_id = 1;
}

message GetCounter {
  string counter_id = 1;
}

message CurrentCounter {
  int32 value = 1;
}

service CounterService {
  rpc Increase (IncreaseValue) returns (google.protobuf.Empty);
  rpc Decrease (DecreaseValue) returns (google.protobuf.Empty);
  rpc Reset (ResetValue) returns (google.protobuf.Empty);
  rpc GetCurrentCounter (GetCounter) returns (CurrentCounter);
}

The Akka gRPC protocPlugin will now generate Akka gRPC service interface and client classes when we compile the project. In this case the service interface com.example.CounterService client class com.example.CounterServiceClient is generated.

Creating an instance of the service is done for us, by calling the getGrpcClient method on the context of the action.

In our delegating service implementation:

Java
src/main/java/com/example/DelegatingServiceAction.java
@Override
public Effect<DelegatingServiceApi.Result> addAndReturn(DelegatingServiceApi.Request request) {
  CounterService counterService = actionContext().getGrpcClient(CounterService.class, "counter"); (1)

  CounterApi.IncreaseValue increaseValue = CounterApi.IncreaseValue.newBuilder()
      .setCounterId(request.getCounterId())
      .setValue(1)
      .build();
  CompletionStage<Empty> increaseCompleted = counterService.increase(increaseValue);  (2)

  CompletionStage<CounterApi.CurrentCounter> currentCounterValueAfter = increaseCompleted.thenCompose((empty) -> (3)
      // once increase completed successfully, ask for the current state after
      counterService.getCurrentCounter(CounterApi.GetCounter.newBuilder().setCounterId(request.getCounterId()).build())
  );

  // turn the reply from the other service into our reply type
  CompletionStage<DelegatingServiceApi.Result> result = currentCounterValueAfter.thenApply(currentCounter ->  (4)
      DelegatingServiceApi.Result.newBuilder().setValue(currentCounter.getValue()).build());

  return effects().asyncReply(result);  (5)
}
1 We call actionContext().getGrpcClient with the service interface of the API and the name that the service was deployed with.
2 Calling a unary gRPC method returns the reply as a java.util.concurrent.CompletionStage. When execution continues through our method, the result will likely not yet have arrived, so we cannot look at it like a regular variable.
3 Instead we register a callback to execute once the increase call completes successfully, here we ignore the empty increase response and call getCurrentCounter, this makes the two calls sequential. (Note however that some other client calling the counter API could have changed the counter in between the calls).
4 Once we get a response for the getCurrentCounter we transform that into a Response - the response type of this action method.
5 We can reply with a CompletionStage<Result> using effects().asyncReply. Once the CompletionStage completes, the client will get the response back.
Scala
src/main/scala/com/example/DelegatingServiceAction.scala
override def addAndReturn(request: Request): Action.Effect[Result] = {
  implicit val executionContext: ExecutionContext = ExecutionContext.global

  val counterService = actionContext.getGrpcClient(classOf[CounterService], "counter") (1)

  val increaseValue = IncreaseValue(counterId = request.counterId, value = 1)
  val increaseCompleted = counterService.increase(increaseValue) (2)

  val currentCounterValueAfter = increaseCompleted.flatMap(_ => (3)
    // once increase completed successfully, ask for the current state after
    counterService.getCurrentCounter(GetCounter(counterId = request.counterId))
  )

  // turn the reply from the other service into our reply type
  val result = currentCounterValueAfter.map(currentCounterValueAfter => (4)
    Result(currentCounterValueAfter.value))

  effects.asyncReply(result) (5)
}
1 We call actionContext.getGrpcClient with the service interface of the API and the name that the service was deployed with.
2 Calling a unary gRPC method returns the reply as a scala.concurrent.Future. When execution continues through our method, the result will likely not yet have arrived, so we cannot look at it like a regular variable.
3 Instead we register a callback to execute once the increase call completes successfully, here we ignore the empty increase response and call getCurrentCounter, this makes the two calls sequential. (Note however that some other client calling the counter API could have changed the counter in between the calls).
4 Once we get a response for the getCurrentCounter we transform that into a Response - the response type of this action method.
5 We can reply with a Future[Result] using effects.asyncReply. Once the Future completes, the client will get the response back.

To pass headers to this client you need to lift (by casting) the service interface to the specific client. In this example, casting the service interface com.example.CounterService to the client class com.example.CounterServiceClient. Once this is done you can build the desired method, add the headers, and then invoke the method with the desired input.

In our delegating implementation:

Java
src/main/java/com/example/DelegatingServiceAction.java
CounterApi.IncreaseValue increaseValue = CounterApi.IncreaseValue.newBuilder()
        .setCounterId(request.getCounterId())
        .setValue(1)
        .build();
CompletionStage<Empty> increaseCompleted = ((CounterServiceClient) counterService).increase() (1)
        .addHeader("key", "value") (2)
        .invoke(increaseValue);  (3)
1 Casting and building the increase() method
2 Add the headers to it.
3 Invoke the method with the desired increaseValue parameter.
Scala
src/main/scala/com/example/DelegatingServiceAction.scala
val increaseValue = IncreaseValue(counterId = request.counterId, value = 1)
val increaseCompleted = counterService.asInstanceOf[CounterServiceClient].increase() (1)
  .addHeader("key","value") (2)
  .invoke(increaseValue) (3)
1 Casting and building the increase() method
2 Add the headers to it.
3 Invoke the method with the desired increaseValue parameter.

For additional documentation about Akka gRPC see the Akka gRPC documentation.

External gRPC services

Calling a Kalix service in another project, or an arbitrary external gRPC service is done the same way as described above, with the difference that the service name string passed to getGrpcClient is present in the application.conf file of the project with details on how to connect to the external service. For a service named external-example-service config is expected to be found under akka.grpc.client.external-example-service.

See the Akka gRPC docs on consuming a service for details on configuring the client for a service name.