Implementing Views

You can access a single Entity with its id. You might want to retrieve multiple Entities, or retrieve them using an attribute other than the key. Kalix Views allow you achieve this. By creating multiple Views, you can optimize for query performance against each one.

Views can be defined from any of the following:

The remainder of this page describes:

Be aware that Views are not updated immediately when Entity state changes. Kalix does update Views as quickly as possible. It is not instant but eventually all changes will become visible in the query results. View updates might also take more time during failure scenarios than during normal operation.

View’s Effect API

The View’s Effect defines the operations that Kalix should perform when an event, a message or a state change is handled by a View.

A View Effect can either:

  • update the view state

  • delete the view state

  • ignore the event or state change notification (and not update the view state)

Creating a View from a Value Entity

Consider an example of a Customer Registry service with a customer Value Entity. When customer state changes, the entire state is emitted as a value change. Value changes update any associated Views. To create a View that lists customers by their name:

This example assumes the following customer state is defined in a customer_domain.proto file:

Java
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package customer.domain;
option java_outer_classname = "CustomerDomain";
message CustomerState {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  Address address = 4;
}

message Address {
  string street = 1;
  string city = 2;
}
Scala
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package customer.domain;
message CustomerState {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  Address address = 4;
}

message Address {
  string street = 1;
  string city = 2;
}

As well as a Value Entity service that will produce the state changes consumed by the View:

Java
src/main/proto/customer/api/customer_api.proto
service CustomerService {
  option (kalix.codegen) = {
    value_entity: {
      name: "customer.domain.CustomerValueEntity"
      type_id: "customer"
      state: "customer.domain.CustomerState"
    }
  };

  rpc Create(Customer) returns (google.protobuf.Empty) {}
  rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {}
  rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {}
  rpc GetCustomer(GetCustomerRequest) returns (Customer) {}
  rpc Delete(DeleteCustomerRequest) returns (google.protobuf.Empty) {}
}
Scala
src/main/proto/customer/api/customer_api.proto
service CustomerService {
  option (kalix.codegen) = {
    value_entity: {
      name: "customer.domain.CustomerValueEntity"
      type_id: "customer"
      state: "customer.domain.CustomerState"
    }
  };

  rpc Create(Customer) returns (google.protobuf.Empty) {}
  rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {}
  rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {}
  rpc GetCustomer(GetCustomerRequest) returns (Customer) {}
  rpc Delete(DeleteCustomerRequest) returns (google.protobuf.Empty) {}
}

Define the View service descriptor

To get a View of multiple customers by their name, define the View as a service in Protobuf:

Java
src/main/proto/customer/view/customer_view.proto
syntax = "proto3";

package customer.view;

option java_outer_classname = "CustomerViewModel";

import "customer/domain/customer_domain.proto";
import "kalix/annotations.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";

service CustomerByName {
  option (kalix.codegen) = {
    view: {} (1)
  };

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {(2)
    option (kalix.method).eventing.in = {(3)
      value_entity: "customer"
    };
    option (kalix.method).view.update = {(4)
      table: "customers"
    };
  }

  rpc DeleteCustomer(google.protobuf.Empty) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customer"
      handle_deletes: true (5)
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) {(6)
    option (kalix.method).view.query = {(7)
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}

message ByNameRequest {
  string customer_name = 1;
}
Scala
src/main/proto/customer/view/customer_view.proto
syntax = "proto3";

package customer.view;

import "customer/domain/customer_domain.proto";
import "kalix/annotations.proto";
import "google/protobuf/empty.proto";

service CustomerByName {
  option (kalix.codegen) = {
    view: {} (1)
  };

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { (2)
    option (kalix.method).eventing.in = { (3)
      value_entity: "customer"
    };
    option (kalix.method).view.update = { (4)
      table: "customers"
    };
  }

  rpc DeleteCustomer(google.protobuf.Empty) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customer"
      handle_deletes: true (5)
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) { (6)
    option (kalix.method).view.query = { (7)
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}

message ByNameRequest {
  string customer_name = 1;
}
1 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that a View must be generated.
2 The UpdateCustomer method defines how Kalix will update the view.
3 The source of the View is the "customers" Value Entity. This identifier is defined in the type_id: "customers" property of the (kalix.codegen).value_entity option in the customer_api.proto file.
4 The (kalix.method).view.update annotation defines that this method is used for updating the View. You must define the table attribute for the table to be used in the query. Pick any name and use it in the query SELECT statement. When a view model is the same as the method input parameter, Kalix will optimize the processing and won’t call this method. However, this behavior can be overridden with the (kalix.method).view.update.transform_updates flag set to true.
5 The second (kalix.method).eventing.in annotation with handle_deletes: true and (required) google.protobuf.Empty as a parameter defines that this method is used for handling Value Entity deletes. Similar to update methods, Kalix will optimise it and not call the method if not necessary, unless you mark it with (kalix.method).view.update.transform_updates: true. In this case, you indicate to Kalix that you want to provide a custom delete handler implementation.
6 The GetCustomers method defines the query to retrieve a stream of customers.
7 The (kalix.method).view.query annotation defines that this method is used as a query of the View.
In this sample we use the internal domain.CustomerState as the state of the view. This is convenient since it allows automatic updates of the view without any logic but has the drawback that it implicitly makes the domain.CustomerState type a part of the public service API. Transforming the state to another type than the incoming update to avoid this can be seen in Creating a View from an Event Sourced Entity.

If the query should only return one result, remove the stream from the return type:

Java
rpc GetCustomer(ByEmailRequest) returns (domain.CustomerState) { (1)
  option (kalix.method).view.query = {
    query: "SELECT * FROM customers WHERE email = :email"
  };
}
Scala
rpc GetCustomer(ByEmailRequest) returns (domain.CustomerState) { (1)
  option (kalix.method).view.query = {
    query: "SELECT * FROM customers WHERE email = :email"
  };
}
1 Without stream when expecting single result.

When no result is found, the request fails with gRPC status code NOT_FOUND. A streamed call completes with an empty stream when no result is found.

Handling Value Entity deletes

The View state corresponding to an Entity is not automatically deleted when the Entity is deleted.

When the source of the eventing.in is a Value Entity the handle_deletes: true annotation of (kalix.method).eventing.in defines that this method is used for handling Value Entity deletes. The method must have google.protobuf.Empty as a parameter. Kalix will automatically delete the View state, but you can also write your own delete handler when (kalix.method).view.update.transform_updates flag is true.

Registering a View

Once you’ve defined a View, register it with Kalix by invoking the KalixFactory.withComponents method in the Main class.

Java
src/main/java/customer/Main.java
public static Kalix createKalix() {
  return KalixFactory.withComponents(
    CustomerValueEntity::new,
    CustomerActionImpl::new,
    CustomerByEmailView::new,
    CustomerByNameView::new,
    CustomerDetailsByNameView::new,
    CustomerSummaryByNameView::new,
      CustomersResponseByCityView::new,
    CustomersResponseByNameView::new);
}
Scala
src/main/scala/customer/Main.scala
def createKalix(): Kalix = {
  KalixFactory.withComponents(
    new CustomerValueEntity(_),
    new CustomerActionImpl(_),
    new CustomerByEmailView(_),
    new CustomerByNameView(_),
    new CustomerDetailsByNameView(_),
    new CustomerSummaryByNameView(_),
    new CustomersResponseByCityView(_),
    new CustomersResponseByNameView(_))
}

Creating a View from an Event Sourced Entity

Create a View from an Event Sourced Entity by using events that the Entity emits to build a state representation. Using a Customer Registry service example, to create a View for querying customers by name:

The example assumes a customer_domain.proto file that defines the events that will update the View on name changes:

Java
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package customer.domain;
option java_outer_classname = "CustomerDomain";
message CustomerState {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  Address address = 4;
}

message Address {
  string street = 1;
  string city = 2;
}
message CustomerCreated {
  CustomerState customer = 1;
}

message CustomerNameChanged {
  string new_name = 1;
}

message CustomerAddressChanged {
  Address new_address = 1;
}
Scala
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package customer.domain;
message CustomerState {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  Address address = 4;
}

message Address {
  string street = 1;
  string city = 2;
}
message CustomerCreated {
  CustomerState customer = 1;
}

message CustomerNameChanged {
  string new_name = 1;
}

message CustomerAddressChanged {
  Address new_address = 1;
}

As well as an Event Sourced Entity service that will produce the events consumed by the View:

Java
src/main/proto/customer/api/customer_api.proto
service CustomerService {
  option (kalix.codegen) = {
    event_sourced_entity: {
      name: "customer.domain.CustomerEntity"
      type_id: "customers"
      state: "customer.domain.CustomerState"
      events: [
        "customer.domain.CustomerCreated",
        "customer.domain.CustomerNameChanged",
        "customer.domain.CustomerAddressChanged"]
    }
  };
  rpc Create(Customer) returns (google.protobuf.Empty) {}
  rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {}
  rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {}
  rpc GetCustomer(GetCustomerRequest) returns (Customer) {}
}
Scala
src/main/proto/customer/api/customer_api.proto
service CustomerService {
  option (kalix.codegen) = {
    event_sourced_entity: {
      name: "customer.domain.CustomerEntity"
      type_id: "customers"
      state: "customer.domain.CustomerState"
      events: [
        "customer.domain.CustomerCreated",
        "customer.domain.CustomerNameChanged",
        "customer.domain.CustomerAddressChanged"]
    }
  };

  rpc Create(Customer) returns (google.protobuf.Empty) {}
  rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {}
  rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {}
  rpc GetCustomer(GetCustomerRequest) returns (Customer) {}
}

Define a View descriptor to consume events

The following lines in the .proto file define a View to consume the CustomerCreated and CustomerNameChanged events:

Java
src/main/proto/customer/customer_view.proto
syntax = "proto3";

package customer.view;

option java_outer_classname = "CustomerViewModel";

import "customer/domain/customer_domain.proto";
import "customer/api/customer_api.proto";
import "kalix/annotations.proto";
import "google/protobuf/any.proto";

message ByNameRequest {
  string customer_name = 1;
}

service CustomerByName {
  option (kalix.codegen) = { (1)
    view: {}
  };

  rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { (2)
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers" (3)
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { (2)
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers" (4)
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
      ignore: true
     };
  };

  rpc GetCustomers(ByNameRequest) returns (stream api.Customer) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 The kalix.codegen option configures code generation to provide base classes and an initial implementation for the class transforming events to updates of the state.
2 Define an update method for each event.
3 The source of the View is from the journal of the "customers" Event Sourced Entity. This identifier is defined in the type_id: "customers"` property of the (kalix.codegen).event_sourced_entity option in the customer_api.proto file.
4 The same event_sourced_entity for all update methods. Note the required table attribute. Use any name, which you will reference in the query SELECT statement.
Scala
src/main/proto/customer/customer_view.proto
syntax = "proto3";

package customer.view;

import "customer/domain/customer_domain.proto";
import "customer/api/customer_api.proto";
import "kalix/annotations.proto";
import "google/protobuf/any.proto";

message ByNameRequest {
  string customer_name = 1;
}

service CustomerByName {
  option (kalix.codegen) = { (1)
    view: {}
  };

  rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { (2)
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers" (3)
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { (2)
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers" (4)
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }

  rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
      ignore: true
     };
  };

  rpc GetCustomers(ByNameRequest) returns (stream api.Customer) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM customers WHERE name = :customer_name"
    };
  }
}
1 The kalix.codegen option configures code generation to provide base classes and an initial implementation for the class transforming events to updates of the state.
2 Define an update method for each event.
3 The source of the View is from the journal of the "customers" Event Sourced Entity. This identifier is defined in the type_id: "customers"` property of the (kalix.codegen).event_sourced_entity option in the customer_api.proto file.
4 The same event_sourced_entity for all update methods. Note the required table attribute. Use any name, which you will reference in the query SELECT statement.

See Query syntax reference for more examples of valid query syntax.

Create a transformation class

Next, you need to define how to transforms events to state that can be used in the View. An Event Sourced entity can emit many types of events. You need to define a method for each event type. If a View does not use all events, you need to ignore unneeded events as shown in the IgnoreOtherEvents update method

The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.

View update handlers are implemented in the CustomerByNameView class as methods that override abstract methods from AbstractCustomerByNameView. The methods take the current view state as the first parameter and the event as the second parameter. They return an UpdateEffect, which describes next processing actions, such as updating the view state.

When adding or changing the rpc definitions, including name, parameter and return messages, in the .proto files the corresponding methods are regenerated in the abstract class (AbstractCustomerByNameView). This means that the compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.

Java
src/main/java/customer/view/CustomerByNameView.java
import kalix.javasdk.view.ViewContext;
import com.google.protobuf.Any;
import customer.domain.CustomerDomain;
import customer.api.CustomerApi;

public class CustomerByNameView extends AbstractCustomerByNameView { (1)

  public CustomerByNameView(ViewContext context) {}

  @Override
  public CustomerApi.Customer emptyState() { (2)
    return null;
  }

  @Override (3)
  public UpdateEffect<CustomerApi.Customer> processCustomerCreated(
    CustomerApi.Customer state,
    CustomerDomain.CustomerCreated customerCreated) {
    if (state != null) {
      return effects().ignore(); // already created
    } else {
      return effects().updateState(convertToApi(customerCreated.getCustomer()));
    }
  }

  @Override (3)
  public UpdateEffect<CustomerApi.Customer> processCustomerNameChanged(
    CustomerApi.Customer state,
    CustomerDomain.CustomerNameChanged customerNameChanged) {
    return effects().updateState(
        state.toBuilder().setName(customerNameChanged.getNewName()).build());
  }

  @Override (3)
  public UpdateEffect<CustomerApi.Customer> processCustomerAddressChanged(
    CustomerApi.Customer state,
    CustomerDomain.CustomerAddressChanged customerAddressChanged) {
    return effects().updateState(
        state.toBuilder().setAddress(convertToApi(customerAddressChanged.getNewAddress())).build());
  }

  private CustomerApi.Customer convertToApi(CustomerDomain.CustomerState s) {
    CustomerApi.Address address = CustomerApi.Address.getDefaultInstance();
    if (s.hasAddress()) {
      address = convertToApi(s.getAddress());
    }
    return CustomerApi.Customer.newBuilder()
        .setCustomerId(s.getCustomerId())
        .setEmail(s.getEmail())
        .setName(s.getName())
        .setAddress(address)
        .build();
  }

  private CustomerApi.Address convertToApi(CustomerDomain.Address a) {
    return CustomerApi.Address.newBuilder()
        .setStreet(a.getStreet())
        .setCity(a.getCity())
        .build();
  }
}
1 Extends the generated AbstractCustomerByNameView, which extends View new tab.
2 Defines the initial, empty, state that is used before any updates.
3 One method for each event.
Scala
src/main/scala/customer/view/CustomerByNameView.scala
import kalix.scalasdk.view.View.UpdateEffect
import kalix.scalasdk.view.ViewContext
import com.google.protobuf.any.{Any => ScalaPbAny}
import customer.api
import customer.api.Customer
import customer.domain
import customer.domain.CustomerAddressChanged
import customer.domain.CustomerCreated
import customer.domain.CustomerNameChanged
import customer.domain.CustomerState

class CustomerByNameView(context: ViewContext) extends AbstractCustomerByNameView { (1)

  override def emptyState: Customer = Customer.defaultInstance (2)

  override def processCustomerCreated(
      state: Customer,
      customerCreated: CustomerCreated): UpdateEffect[Customer] = (3)
    if (state != emptyState) effects.ignore() // already created
    else effects.updateState(convertToApi(customerCreated.customer.get))

  override def processCustomerNameChanged(
      state: Customer,
      customerNameChanged: CustomerNameChanged): UpdateEffect[Customer] = (3)
    effects.updateState(state.copy(name = customerNameChanged.newName))

  override def processCustomerAddressChanged(
      state: Customer,
      customerAddressChanged: CustomerAddressChanged): UpdateEffect[Customer] = (3)
    effects.updateState(state.copy(address = customerAddressChanged.newAddress.map(convertToApi)))

  private def convertToApi(customer: CustomerState): Customer =
    Customer(
      customerId = customer.customerId,
      name = customer.name,
      email = customer.email,
      address = customer.address.map(convertToApi))

  private def convertToApi(address: domain.Address): api.Address =
    api.Address(street = address.street, city = address.city)
}
1 Extends the generated AbstractCustomerByNameView, which extends View new tab.
2 Defines the initial, empty, state that is used before any updates.
3 One method for each event.
This type of update transformation is a natural fit for Events emitted by an Event Sourced Entity, but it can also be used for Value Entities. For example, if the View representation is different from the Entity state you might want to transform it before presenting the View to the client.

Ignoring events

When consuming events, each event must be matched by a Protobuf service method. In case your View is only interested in certain events, you may declare a method to receive all events that are not received by the other methods. If an event type is not handled, the View will fail. Views are designed to restart, but since the handler is missing, it will fail again. Therefore, it’s important to define methods for all events or define a catch-all method in case you want to discard some events.

Java
src/main/proto/customer/customer_view.proto
rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) {
  option (kalix.method).eventing.in = {
    event_sourced_entity: "customers"
    ignore: true
   };
};
Scala
src/main/proto/customer/customer_view.proto
rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) {
  option (kalix.method).eventing.in = {
    event_sourced_entity: "customers"
    ignore: true
  };
};

The IgnoreOtherEvents method here is defined as a catch-all because it has input type Any. Instead of using a catch-all it can be better to define concrete methods for all known event types that should be ignored because then there is no risk of accidentally ignoring events that are added in later evolution of the service.

When adding the ignore: true annotation the corresponding implementation is not needed in the component. It is more efficient to use ignore: true than implementing the method with an immediate reply.

Handling Event Sourced Entity deletes

The View state corresponding to an Entity is not automatically deleted when the Entity is deleted.

To delete from the View you can use the deleteState() effect from an event transformation method.

Register the View

Register the View class with Kalix:

Java
src/main/java/customer/Main.java
public static Kalix createKalix() {
  return KalixFactory.withComponents(
    CustomerEntity::new,
    CustomerByNameView::new
  );
}
Scala
src/main/scala/customer/Main.scala
def createKalix(): Kalix = {
  KalixFactory.withComponents(
    new CustomerEntity(_),
    new CustomerByNameView(_)
  )
}

Creating a View from a topic

The source of a View can be an eventing topic. You define it in the same way as shown in Creating a View from an Event Sourced Entity or Creating a View from a Value Entity but the View definition must set eventing.in.topic to a given topic name. Note that, on your producer side you must manually pass the ce-subject metadata, required by the View component. See the example below for how to pass such metadata.

Java
com/example/actions/CounterJournalToTopicWithMetaAction.java
public class CounterJournalToTopicWithMetaAction extends AbstractCounterJournalToTopicWithMetaAction {

  public CounterJournalToTopicWithMetaAction(ActionCreationContext creationContext) {}

  @Override
  public Effect<Increased> onIncreased(ValueIncreased valueIncreased) {
    Increased increased = Increased.newBuilder().setValue(valueIncreased.getValue()).build();
    String counterId = actionContext().metadata().get("ce-subject").orElseThrow(); (1)
    Metadata metadata = Metadata.EMPTY.add("ce-subject", counterId);
    return effects().reply(increased, metadata); (2)
  }
}
1 The ce-subject attribute is the entity id.
2 The effect replies updated metadata together with the message payload.
Scala
com/example/actions/CounterJournalToTopicWithMetaAction.scala
class CounterJournalToTopicWithMetaAction(creationContext: ActionCreationContext)
    extends AbstractCounterJournalToTopicWithMetaAction {

  override def onIncreased(valueIncreased: ValueIncreased): Action.Effect[Increased] = {
    val increased = Increased(valueIncreased.value)
    val counterId = actionContext.metadata.get("ce-subject").get (1)
    val metadata = Metadata.empty.add("ce-subject", counterId)
    effects.reply(increased, metadata) (2)
  }
}
1 The ce-subject attribute is the entity id.
2 The effect replies updated metadata together with the message payload.

How to transform results

When creating a View, you can transform the results as a relational projection instead of using a SELECT * statement.

Relational projection

Instead of using SELECT * you can define what columns that will be used in the response message:

Java
message CustomerSummary {
  string id = 1;
  string name = 2;
}

service CustomerSummaryByName {
  option (kalix.codegen) = {
    view: {}
  };

  rpc GetCustomers(ByNameRequest) returns (stream CustomerSummary) {
    option (kalix.method).view.query = {
      query: "SELECT customer_id AS id, name FROM customers WHERE name = :customer_name"
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customer"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }
}
Scala
message CustomerSummary {
  string id = 1;
  string name = 2;
}

service CustomerSummaryByName {
  option (kalix.codegen) = {
    view: {}
  };

  rpc GetCustomers(ByNameRequest) returns (stream CustomerSummary) {
    option (kalix.method).view.query = {
      query: "SELECT customer_id AS id, name FROM customers WHERE name = :customer_name"
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customer"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }
}

In a similar way, you can include values from the request message in the response, for example :request_id:

SELECT :request_id, customer_id as id, name FROM customers WHERE name = :customer_name

Along with renaming fields, results can be projected into a different message structure than the one used for storing the view state. Nested messages can be created by grouping together fields in parentheses. For example, the customer state is projected into a different result structure in this view:

Java
message CustomerDetails {
  string id = 1;
  string name = 2;
  BillingDetails billing_details = 3;
}

message BillingDetails {
  string address = 1;
  string city = 2;
  string email = 3;
}

service CustomerDetailsByName {
  rpc GetCustomers(ByNameRequest) returns (stream CustomerDetails) {
    option (kalix.method).view.query = {
      query: "SELECT"
             " customer_id AS id," (1)
             " name," (2)
             " (address.street AS address, address.city, email) AS billing_details " (3)
             "FROM customers "
             "WHERE name = :customer_name"
    };
  }
}
1 The customer_id column is projected into the id field in the result message.
2 The customer name column is projected directly into the result message.
3 A nested message is constructed with the address street (renamed to address), the address city, and the email columns, projected into billing_details.
Scala
message CustomerDetails {
  string id = 1;
  string name = 2;
  BillingDetails billing_details = 3;
}

message BillingDetails {
  string address = 1;
  string city = 2;
  string email = 3;
}

service CustomerDetailsByName {
  rpc GetCustomers(ByNameRequest) returns (stream CustomerDetails) {
    option (kalix.method).view.query = {
      query: "SELECT"
             " customer_id AS id," (1)
             " name," (2)
             " (address.street AS address, address.city, email) AS billing_details " (3)
             "FROM customers "
             "WHERE name = :customer_name"
    };
  }
}
1 The customer_id column is projected into the id field in the result message.
2 The customer name column is projected directly into the result message.
3 A nested message is constructed with the address street (renamed to address), the address city, and the email columns, projected into billing_details.

Response message including the result

Instead of streamed results you can include the results in a repeated field in the response message:

Java
message CustomersResponse {
  repeated domain.CustomerState customers = 1; (1)
}

service CustomersResponseByName {
  option (kalix.codegen) = {
    view: {}
  };

  rpc GetCustomers(ByNameRequest) returns (CustomersResponse) { (2)
    option (kalix.method).view.query = {
      query: "SELECT * AS customers FROM customers WHERE name = :customer_name" (3)
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customer"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }
}
Scala
message CustomersResponse {
  repeated domain.CustomerState customers = 1; (1)
}

service CustomersResponseByName {
  option (kalix.codegen) = {
    view: {}
  };

  rpc GetCustomers(ByNameRequest) returns (CustomersResponse) { (2)
    option (kalix.method).view.query = {
      query: "SELECT * AS customers FROM customers WHERE name = :customer_name" (3)
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customer"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }
}
1 The response message contains a repeated field.
2 The return type is not streamed.
3 The repeated field is referenced in the query with * AS customers.

How to modify a View

Kalix creates indexes for the View based on the defined queries. For example, the following query will result in a View with an index on the name column:

SELECT * FROM customers WHERE name = :customer_name

If the query is changed, Kalix might need to add other indexes. For example, changing the above query to filter on the city would mean that Kalix needs to build a View with the index on the city column.

SELECT * FROM customers WHERE address.city = :city

Such changes require you to define a new View. Kalix will then rebuild it from the source event log or value changes.

Views from topics cannot be rebuilt from the source messages, because it’s not possible to consume all events from the topic again. The new View will be built from new messages published to the topic.

Rebuilding a new View may take some time if there are many events that have to be processed. The recommended way when changing a View is multi-step, with two deployments:

  1. Define the new View, and keep the old View intact. A new View is defined by a new service in Protobuf. The viewId is the same as the service name, i.e. it will be a different viewId than the old View. Keep the old register of the old service in Main.

  2. Deploy the new View, and let it rebuild. Verify that the new query works as expected. The old View can still be used.

  3. Remove the old View definition and rename the new service to the old name if the public API is compatible, but keep the new viewId by defining it as shown below.

  4. Deploy the second change.

This is how to define a custom viewId:

Java
src/main/java/customer/Main.java
public static Kalix createKalix() {
  Kalix kalix = new Kalix();
  kalix.register(CustomerByNameViewProvider.of(CustomerByNameView::new)
      .withViewId("CustomerByNameV2"));
  kalix.register(CustomerEntityProvider.of(CustomerEntity::new));
  return kalix;
}
Scala
src/main/scala/customer/Main.scala
def createKalix(): Kalix =
  Kalix()
    .register(
      CustomerByNameViewProvider(new CustomerByNameView(_))
        .withViewId("CustomerByNameV2"))
    .register(CustomerEntityProvider(new CustomerEntity(_)))

The View definitions are stored and validated when a new version is deployed. There will be an error message if the changes are not compatible.

Drop obsolete view data

The data for old Views, that are no longer actively used, can be dropped using the kalix CLI service view commands.

A summary of all views for a running service can be listed using the views list command:

> kalix service views list customer-registry
NAME               ACTIVE   LAST UPDATED
CustomerByName     false    1d
CustomerByNameV2   true     5m

Any views that are inactive and no longer needed can be dropped using the views drop command:

> kalix service views drop customer-registry CustomerByName
The data for view 'CustomerByName' of service 'customer-registry' has successfully been dropped.

> kalix service views list customer-registry
NAME               ACTIVE   LAST UPDATED
CustomerByNameV2   true     10m

Streaming view updates

A query can provide a near real time stream of results for the query, emitting new entries matching the query as they are added or updated in the view.

This is done by adding the option stream_updates to a query method with a stream reply:

Java
message ByCityRequest {
  string city = 1;
}
service CustomerByCityStreaming {
  option (kalix.codegen) = {
    view: {}
  };

  // update methods omitted
  rpc GetCustomers(ByCityRequest) returns (stream api.Customer) { (1)
    option (kalix.method).view.query = {
      query: "SELECT * FROM customers WHERE address.city = :city", (2)
      stream_updates: true (3)
    };
  }
}
1 Return type must be a stream.
2 A regular query, in this case showing all customers from a specific city.
3 The stream_updates option set to true.
Scala
message ByCityRequest {
  string city = 1;
}
service CustomerByCityStreaming {
  option (kalix.codegen) = {
    view: {}
  };

  // update methods omitted
  rpc GetCustomers(ByCityRequest) returns (stream api.Customer) { (1)
    option (kalix.method).view.query = {
      query: "SELECT * FROM customers WHERE address.city = :city", (2)
      stream_updates: true (3)
    };
  }
}
1 Return type must be a stream.
2 A regular query, in this case showing all customers from a specific city.
3 The stream_updates option set to true.

This will first list the complete result for the query and then keep the response stream open, emitting new or updated entries matching the query as they are added to the view. The stream does not complete until the client closes it.

Note: This is not intended as transport for service to service propagation of updates and does not guarantee delivery, for such use cases you should instead publish events to a topic, see Publishing and Subscribing with Actions

Query syntax reference

Define View queries in a language that is similar to SQL. The following examples illustrate the syntax. To retrieve:

  • All customers without any filtering conditions (no WHERE clause):

    SELECT * FROM customers
  • Customers with a name matching the customer_name property of the request message:

    SELECT * FROM customers WHERE name = :customer_name
  • Customers matching the customer_name AND city properties of the request message:

    SELECT * FROM customers WHERE name = :customer_name AND address.city = :city
  • Customers in a city matching a literal value:

    SELECT * FROM customers WHERE address.city = 'New York'

Filter predicates

Use filter predicates in WHERE conditions to further refine results.

Comparison operators

The following comparison operators are supported:

  • = equals

  • != not equals

  • > greater than

  • >= greater than or equals

  • < less than

  • <= less than or equals

Logical operators

Combine filter conditions with the AND operator, and negate using the NOT operator. Group conditions using parentheses.

OR support is currently disabled, until it can be more efficiently indexed.
SELECT * FROM customers WHERE
  name = :customer_name AND NOT (address.city = 'New York' AND age > 65)

Array operators

Use IN or = ANY to check whether a value is contained in a group of values or in an array column or parameter (a repeated field in the Protobuf message).

Use IN with a list of values or parameters:

SELECT * FROM customers WHERE email IN ('bob@example.com', :some_email)

Use = ANY to check against an array column (a repeated field in the Protobuf message):

SELECT * FROM customers WHERE :some_email = ANY(emails)

Or use = ANY with a repeated field in the request parameters:

SELECT * FROM customers WHERE email = ANY(:some_emails)

Pattern matching

Use LIKE to pattern match on strings. The standard SQL LIKE patterns are supported, with _ (underscore) matching a single character, and % (percent sign) matching any sequence of zero or more characters.

SELECT * FROM customers WHERE name LIKE 'Bob%'
For index efficiency, the pattern must have a non-wildcard prefix or suffix. A pattern like '%foo%' is not supported. Given this limitation, only constant patterns with literal strings are supported; patterns in request parameters are not allowed.

Use the text_search function to search text values for words, with automatic tokenization and normalization based on language-specific configuration. The text_search function takes the text column to search, the query (as a parameter or literal string), and an optional language configuration.

text_search(<column>, <query parameter or string>, [<configuration>])

If the query contains multiple words, the text search will find values that contain all of these words (logically combined with AND), with tokenization and normalization automatically applied.

The following text search language configurations are supported: 'danish', 'dutch', 'english', 'finnish', 'french', 'german', 'hungarian', 'italian', 'norwegian', 'portuguese', 'romanian', 'russian', 'simple', 'spanish', 'swedish', 'turkish'. By default, a 'simple' configuration will be used, without language-specific features.

SELECT * FROM customers WHERE text_search(profile, :search_words, 'english')
Text search is currently only available for deployed services, and can’t be used in local testing.

Data types

The following data types are supported, for their corresponding Protobuf types. Arrays are created for a repeated field in a Protobuf message. Timestamps can be stored and compared using the google.protobuf.Timestamp message type.

Data type Protobuf type

Text

string

Integer

int32

Long (Big Integer)

int64

Float (Real)

float

Double

double

Boolean

bool

Byte String

bytes

Array

repeated fields

Timestamp

google.protobuf.Timestamp

Optional fields

Fields in a Protobuf message that were not given a value are handled as the default value of the field data type.

In some use cases it is important to explicitly express that a value is missing, doing that in a view column can be done in three ways:

  • mark the message field as optional

  • use one of the Protobuf "wrapper" types for the field (messages in the package google.protobuf ending with Value)

  • make the field a part of a nested message and omit that whole nested message, for example address.street where the lack of an address message implies there is no street field.

Optional fields with values present can be queried just like regular view fields:

SELECT * FROM customers WHERE phone_number = :number

Finding results with missing values can be done using IS NULL:

SELECT * FROM customers WHERE phone_number IS NULL

Finding entries with any value present can be queried using IS NOT NULL:

SELECT * FROM customers WHERE phone_number IS NOT NULL

Optional fields in query requests messages are handled like normal fields if they have a value, however missing optional request parameters are seen as an invalid request and lead to a bad request response.

Sorting

Results for a view query can be sorted. Use ORDER BY with view columns to sort results in ascending (ASC, by default) or descending (DESC) order.

If no explicit ordering is specified in a view query, results will be returned in the natural index order, which is based on the filter predicates in the query.

SELECT * FROM customers WHERE name = :name AND age > :min_age ORDER BY age DESC
Some orderings may be rejected, if the view index cannot be efficiently ordered. Generally, to order by a column it should also appear in the WHERE conditions.

Paging

Splitting a query result into one "page" at a time rather than returning the entire result at once is possible in two ways:

  • with a count based offset or

  • a token based offset.

In both cases OFFSET and LIMIT are used.

OFFSET specifies at which offset in the result to start

LIMIT specifies a maximum number of results to return

Count based offset

The values can either be static, defined up front in the query:

SELECT * FROM customers LIMIT 10

Or come from fields in the request message:

SELECT * FROM customers OFFSET :start_from LIMIT :max_customers

Note: Using numeric offsets can lead to missing or duplicated entries in the result if entries are added to or removed from the view between requests for the pages.

Token based offset

The count based offset requires that you keep track of how far you got by adding the page size to the offset for each query.

An alternative to this is to use a string token emitted by Kalix identifying how far into the result set the paging has reached using the functions next_page_token() and page_token_offset().

When reading the first page, an empty token is provided to page_token_offset. For each returned result page a new token that can be used to read the next page is returned by next_page_token(), once the last page has been read, an empty token is returned (see also has_more for determining if the last page was reached).

The size of each page can optionally be specified using LIMIT, if it is not present a default page size of 100 is used.

With a request and response message for the view like this:

message Request {
    string page_token = 1;
}

message Response {
    repeated Customer customers = 1;
    string next_page_token = 2;
}

A query such as the one below will allow for reading through the view in pages, each containing 10 customers:

SELECT * AS customers, next_page_token() AS next_page_token
FROM customers
OFFSET page_token_offset(:page_token)
LIMIT 10

The token value is not meant to be parseable into any meaningful information other than being a token for reading the next page.

Total count of results

To get the total number of results that will be returned over all pages, use COUNT(*) in a query that projects its results into a field. The total count will be returned in the aliased field (using AS) or otherwise into a field named count.

SELECT * AS customers, COUNT(*) AS total, has_more() AS more FROM customers LIMIT 10

Check if there are more pages

To check if there are more pages left, you can use the function has_more() providing a boolean value for the result. This works both for the count and token based offset paging, and also when only using LIMIT without any OFFSET:

SELECT * AS customers, has_more() AS more_customers FROM customers LIMIT 10

This query will return more_customers = true when the view contains more than 10 customers.

Advanced view queries

Advanced view queries include additional sort operations, grouping operations, joins across tables, and subquery support.

Advanced view queries are not available by default. Please contact the Kalix team if you require access to these features.

Joins and multiple tables

Advanced views can subscribe to events and changes from multiple entities or event sources. Data for multiple tables can then be joined using relational join operations, similar to SQL. Supported join types are:

  • (INNER) JOIN - only returns entries with matching values in both tables

  • LEFT (OUTER) JOIN - returns all entries in the left table, joined with any matching entries from the right table

  • RIGHT (OUTER) JOIN - returns all entries in the right table, joined with any matching entries from the left table

  • FULL (OUTER) JOIN - returns all entries from both tables, with joined entries for matching values

In these examples, the Customer Registry used for simple views is extended to be a simple Store, adding Products and Orders for Customers. Customers and Products are implemented using Event Sourced Entities, while Orders is a Value Entity.

Each Product includes a name and a price:

Java
src/main/proto/store/product/domain/product_domain.proto
message ProductState {
  string product_id = 1;
  string product_name = 2;
  Money price = 3;
}

message Money {
  string currency = 1;
  int64 units = 2;
  int32 cents = 3;
}

message ProductCreated {
  ProductState product = 1;
}

message ProductNameChanged {
  string new_name = 1;
}

message ProductPriceChanged {
  Money new_price = 1;
}
Scala
src/main/proto/store/product/domain/product_domain.proto
message ProductState {
  string product_id = 1;
  string product_name = 2;
  Money price = 3;
}

message Money {
  string currency = 1;
  int64 units = 2;
  int32 cents = 3;
}

message ProductCreated {
  ProductState product = 1;
}

message ProductNameChanged {
  string new_name = 1;
}

message ProductPriceChanged {
  Money new_price = 1;
}

Each Order has an id, refers to the Customer and Product ids for this order, has the quantity of the ordered product, and a timestamp for when the order was created:

Java
src/main/proto/store/order/domain/order_domain.proto
import "google/protobuf/timestamp.proto";

message OrderState {
  string order_id = 1;
  string product_id = 2;
  string customer_id = 3;
  int32 quantity = 4;
  google.protobuf.Timestamp created = 5;
}
Scala
src/main/proto/store/order/domain/order_domain.proto
import "google/protobuf/timestamp.proto";

message OrderState {
  string order_id = 1;
  string product_id = 2;
  string customer_id = 3;
  int32 quantity = 4;
  google.protobuf.Timestamp created = 5;
}

A view can then subscribe to the events or changes for each of the Customer, Order, and Product entities, with multiple update methods on the service, but for different tables.

Java
src/main/proto/store/view/joined/orders_view.proto
import "store/customer/domain/customer_domain.proto";
import "store/order/domain/order_domain.proto";
import "store/product/domain/product_domain.proto";

service JoinedCustomerOrders {
  option (kalix.codegen) = {
    view: {}
  };

  rpc ProcessCustomerCreated(customer.domain.CustomerCreated) returns (customer.domain.CustomerState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers",  (1)
    };
  }

  rpc ProcessCustomerNameChanged(customer.domain.CustomerNameChanged) returns (customer.domain.CustomerState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers",  (1)
    };
  }

  rpc ProcessCustomerAddressChanged(customer.domain.CustomerAddressChanged) returns (customer.domain.CustomerState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers",  (1)
    };
  }

  rpc ProcessProductCreated(product.domain.ProductCreated) returns (product.domain.ProductState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "products"
    };
    option (kalix.method).view.update = {
      table: "products",  (1)
    };
  }

  rpc ProcessProductNameChanged(product.domain.ProductNameChanged) returns (product.domain.ProductState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "products"
    };
    option (kalix.method).view.update = {
      table: "products",  (1)
    };
  }

  rpc ProcessProductPriceChanged(product.domain.ProductPriceChanged) returns (product.domain.ProductState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "products"
    };
    option (kalix.method).view.update = {
      table: "products",  (1)
    };
  }

  rpc UpdateOrder(order.domain.OrderState) returns (order.domain.OrderState) {
    option (kalix.method).eventing.in = {
      value_entity: "orders"
    };
    option (kalix.method).view.update = {
      table: "orders" (1)
    };
  }
}
1 Each update method stores its state type in a different table for customers, products, and orders.
Scala
src/main/proto/store/view/joined/orders_view.proto
import "store/customer/domain/customer_domain.proto";
import "store/order/domain/order_domain.proto";
import "store/product/domain/product_domain.proto";

service JoinedCustomerOrders {
  option (kalix.codegen) = {
    view: {}
  };

  rpc ProcessCustomerCreated(customer.domain.CustomerCreated) returns (customer.domain.CustomerState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers",  (1)
    };
  }

  rpc ProcessCustomerNameChanged(customer.domain.CustomerNameChanged) returns (customer.domain.CustomerState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers",  (1)
    };
  }

  rpc ProcessCustomerAddressChanged(customer.domain.CustomerAddressChanged) returns (customer.domain.CustomerState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "customers"
    };
    option (kalix.method).view.update = {
      table: "customers",  (1)
    };
  }

  rpc ProcessProductCreated(product.domain.ProductCreated) returns (product.domain.ProductState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "products"
    };
    option (kalix.method).view.update = {
      table: "products",  (1)
    };
  }

  rpc ProcessProductNameChanged(product.domain.ProductNameChanged) returns (product.domain.ProductState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "products"
    };
    option (kalix.method).view.update = {
      table: "products",  (1)
    };
  }

  rpc ProcessProductPriceChanged(product.domain.ProductPriceChanged) returns (product.domain.ProductState) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "products"
    };
    option (kalix.method).view.update = {
      table: "products",  (1)
    };
  }

  rpc UpdateOrder(order.domain.OrderState) returns (order.domain.OrderState) {
    option (kalix.method).eventing.in = {
      value_entity: "orders"
    };
    option (kalix.method).view.update = {
      table: "orders" (1)
    };
  }
}
1 Each update method stores its state type in a different table for customers, products, and orders.

The view query can then JOIN across these tables, to return all orders for a specified customer, and include the customer and product details with each order:

Java
src/main/proto/store/view/joined/orders_view.proto
service JoinedCustomerOrders {
  // update methods...

  rpc Get(CustomerOrdersRequest) returns (stream CustomerOrder) { (1)
    option (kalix.method).view.query = {
      query: "SELECT * " (2)
             "FROM customers "
             "JOIN orders ON orders.customer_id = customers.customer_id " (3)
             "JOIN products ON products.product_id = orders.product_id " (4)
             "WHERE customers.customer_id = :customer_id " (5)
             "ORDER BY orders.created" (6)
    };
  }
}

message CustomerOrdersRequest {
  string customer_id = 1;
}

message CustomerOrder {
  string order_id = 1;
  string product_id = 2;
  string product_name = 3;
  product.domain.Money price = 4;
  int32 quantity = 5;
  string customer_id = 6;
  string email = 7;
  string name = 8;
  customer.domain.Address address = 9;
  google.protobuf.Timestamp created = 10;
}
1 The service method returns a stream of customer orders.
2 Select all columns from the joined entries to project into the combined CustomerOrder result type.
3 Join customers with orders on a matching customer id.
4 Join products with orders on a matching product id.
5 Find orders for a particular customer.
6 Sort all the orders by their created timestamp.
Scala
src/main/proto/store/view/joined/orders_view.proto
service JoinedCustomerOrders {
  // update methods...

  rpc Get(CustomerOrdersRequest) returns (stream CustomerOrder) { (1)
    option (kalix.method).view.query = {
      query: "SELECT * " (2)
             "FROM customers "
             "JOIN orders ON orders.customer_id = customers.customer_id " (3)
             "JOIN products ON products.product_id = orders.product_id " (4)
             "WHERE customers.customer_id = :customer_id " (5)
             "ORDER BY orders.created" (6)
    };
  }
}

message CustomerOrdersRequest {
  string customer_id = 1;
}

message CustomerOrder {
  string order_id = 1;
  string product_id = 2;
  string product_name = 3;
  product.domain.Money price = 4;
  int32 quantity = 5;
  string customer_id = 6;
  string email = 7;
  string name = 8;
  customer.domain.Address address = 9;
  google.protobuf.Timestamp created = 10;
}
1 The service method returns a stream of customer orders.
2 Select all columns from the joined entries to project into the combined CustomerOrder result type.
3 Join customers with orders on a matching customer id.
4 Join products with orders on a matching product id.
5 Find orders for a particular customer.
6 Sort all the orders by their created timestamp.

In the example above, each CustomerOrder returned will contain the same customer details. The results can instead include the customer details once, and then all of the ordered products in a collection, using a relational projection in the SELECT clause:

Java
src/main/proto/store/view/nested/orders_view.proto
service NestedCustomerOrders {
  // update methods...

  rpc Get(CustomerOrdersRequest) returns (CustomerOrders) { (1)
    option (kalix.method).view.query = {
      query: "SELECT customers.*, (orders.*, products.*) AS orders " (2)
             "FROM customers "
             "JOIN orders ON orders.customer_id = customers.customer_id "
             "JOIN products ON products.product_id = orders.product_id "
             "WHERE customers.customer_id = :customer_id "
             "ORDER BY orders.created"
    };
  }
}

message CustomerOrdersRequest {
  string customer_id = 1;
}

message CustomerOrders {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  customer.domain.Address address = 4;
  repeated CustomerOrder orders = 5; (3)
}

message CustomerOrder {
  string customer_id = 1;
  string order_id = 2;
  string product_id = 3;
  string product_name = 4;
  product.domain.Money price = 5;
  int32 quantity = 6;
  google.protobuf.Timestamp created = 7;
}
1 A single CustomerOrders message is returned, which will have the customer details and all orders for this customer.
2 The customer columns are projected into the result message, and the order and product columns are combined into a nested message and projected into the orders field.
3 The orders (repeated) field will contain the nested CustomerOrder messages.
Scala
src/main/proto/store/view/nested/orders_view.proto
service NestedCustomerOrders {
  // update methods...

  rpc Get(CustomerOrdersRequest) returns (CustomerOrders) { (1)
    option (kalix.method).view.query = {
      query: "SELECT customers.*, (orders.*, products.*) AS orders " (2)
             "FROM customers "
             "JOIN orders ON orders.customer_id = customers.customer_id "
             "JOIN products ON products.product_id = orders.product_id "
             "WHERE customers.customer_id = :customer_id "
             "ORDER BY orders.created"
    };
  }
}

message CustomerOrdersRequest {
  string customer_id = 1;
}

message CustomerOrders {
  string customer_id = 1;
  string email = 2;
  string name = 3;
  customer.domain.Address address = 4;
  repeated CustomerOrder orders = 5; (3)
}

message CustomerOrder {
  string customer_id = 1;
  string order_id = 2;
  string product_id = 3;
  string product_name = 4;
  product.domain.Money price = 5;
  int32 quantity = 6;
  google.protobuf.Timestamp created = 7;
}
1 A single CustomerOrders message is returned, which will have the customer details and all orders for this customer.
2 The customer columns are projected into the result message, and the order and product columns are combined into a nested message and projected into the orders field.
3 The orders (repeated) field will contain the nested CustomerOrder messages.

A relational projection for a JOIN query can also restructure the results. For example, the shipping details for a customer can be constructed in a particular form, and the product orders transformed into a different nested message structure:

Java
src/main/proto/store/view/structured/orders_view.proto
service StructuredCustomerOrders {
  // update methods...

  rpc Get(CustomerOrdersRequest) returns (CustomerOrders) {
    option (kalix.method).view.query = {
      query: "SELECT"
             " customers.customer_id AS id," (1)
             " (name,"
             "  address.street AS address1,"
             "  address.city AS address2,"
             "  email AS contact_email) AS shipping," (2)
             " (products.product_id AS id,"
             "  product_name AS name,"
             "  quantity,"
             "  (price.currency, price.units, price.cents) AS value," (3)
             "  order_id,"
             "  created AS order_created) AS orders " (4)
             "FROM customers "
             "JOIN orders ON orders.customer_id = customers.customer_id "
             "JOIN products ON products.product_id = orders.product_id "
             "WHERE customers.customer_id = :customer_id "
             "ORDER BY orders.created" (5)
    };
  }
}

message CustomerOrdersRequest {
  string customer_id = 1;
}

message CustomerOrders {
  string id = 1;
  CustomerShipping shipping = 2;
  repeated ProductOrder orders = 3;
}

message CustomerShipping {
  string name = 1;
  string address1 = 2;
  string address2 = 3;
  string contact_email = 4;
}

message ProductOrder {
  string id = 1;
  string name = 2;
  int32 quantity = 3;
  ProductValue value = 4;
  string order_id = 5;
  google.protobuf.Timestamp order_created = 6;
}

message ProductValue {
  string currency = 1;
  int64 units = 2;
  int32 cents = 3;
}
1 The customer_id is renamed to just id in the result type.
2 Customer shipping details are transformed and combined into a nested message.
3 The product price is reconstructed into a ProductValue message, nested within the order message.
4 The order and associated product information is transformed and combined into a collection of ProductOrder messages.
5 The nested orders in the result will still be sorted by their created timestamps.
Scala
src/main/proto/store/view/structured/orders_view.proto
service StructuredCustomerOrders {
  // update methods...

  rpc Get(CustomerOrdersRequest) returns (CustomerOrders) {
    option (kalix.method).view.query = {
      query: "SELECT"
             " customers.customer_id AS id," (1)
             " (name,"
             "  address.street AS address1,"
             "  address.city AS address2,"
             "  email AS contact_email) AS shipping," (2)
             " (products.product_id AS id,"
             "  product_name AS name,"
             "  quantity,"
             "  (price.currency, price.units, price.cents) AS value," (3)
             "  order_id,"
             "  created AS order_created) AS orders " (4)
             "FROM customers "
             "JOIN orders ON orders.customer_id = customers.customer_id "
             "JOIN products ON products.product_id = orders.product_id "
             "WHERE customers.customer_id = :customer_id "
             "ORDER BY orders.created" (5)
    };
  }
}

message CustomerOrdersRequest {
  string customer_id = 1;
}

message CustomerOrders {
  string id = 1;
  CustomerShipping shipping = 2;
  repeated ProductOrder orders = 3;
}

message CustomerShipping {
  string name = 1;
  string address1 = 2;
  string address2 = 3;
  string contact_email = 4;
}

message ProductOrder {
  string id = 1;
  string name = 2;
  int32 quantity = 3;
  ProductValue value = 4;
  string order_id = 5;
  google.protobuf.Timestamp order_created = 6;
}

message ProductValue {
  string currency = 1;
  int64 units = 2;
  int32 cents = 3;
}
1 The customer_id is renamed to just id in the result type.
2 Customer shipping details are transformed and combined into a nested message.
3 The product price is reconstructed into a ProductValue message, nested within the order message.
4 The order and associated product information is transformed and combined into a collection of ProductOrder messages.
5 The nested orders in the result will still be sorted by their created timestamps.
Rather than transforming results in a relational projection, it’s also possible to transform the stored state in the update methods for the view, using the transform_updates option.

Enable advanced views

Advanced view queries are not available by default. Please contact the Kalix team if you require access to these features.

For local development, the advanced view features can be enabled in integration tests using the testkit settings:

Java
src/it/java/store/view/joined/JoinedCustomerOrdersViewIntegrationTest.java
public static final KalixTestKitExtension testKit =
    new KalixTestKitExtension(
        Main.createKalix(), KalixTestKit.Settings.DEFAULT.withAdvancedViews());
Scala
src/test/scala/store/view/joined/JoinedCustomerOrdersIntegrationSpec.scala
private val testKit = KalixTestKit(Main.createKalix(), KalixTestKit.DefaultSettings.withAdvancedViews()).start()

For running a local Kalix Runtime in Docker, with advanced view features enabled, set the environment variable VIEW_FEATURES_ALL: "true".

Testing the View

Testing Views is very similar to testing other subscription integrations.

For a View definition that subscribes to changes from the customer Value Entity.

src/main/proto/customer/view/customer_view.proto
message ByCityRequest {
  repeated string cities = 1;
}

service CustomersResponseByCity {
  option (kalix.codegen) = {
    view: {}
  };

  rpc GetCustomers(ByCityRequest) returns (CustomersResponse) {
    option (kalix.method).view.query = {
      query: "SELECT * AS customers FROM customers WHERE address.city = ANY(:cities)"
    };
  }

  rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) {
    option (kalix.method).eventing.in = {
      value_entity: "customer"
    };
    option (kalix.method).view.update = {
      table: "customers"
    };
  }
}

An integration test can be implemented as below.

Java
public class CustomersResponseByCityViewIntegrationTest {

  @RegisterExtension
  public static final KalixTestKitExtension testKit =
      new KalixTestKitExtension(
          Main.createKalix(),
          DEFAULT.withValueEntityIncomingMessages("customer")); (1)

  private final CustomersResponseByCity viewClient;

  public CustomersResponseByCityViewIntegrationTest() {
    viewClient = testKit.getGrpcClient(CustomersResponseByCity.class);
  }

  @Test
  public void shouldFindCustomersByCity() {
    EventingTestKit.IncomingMessages customerEvents = testKit.getValueEntityIncomingMessages("customer"); (2)

    CustomerState johanna = CustomerState.newBuilder().setCustomerId("1")
        .setEmail("johanna@example.com")
        .setName("Johanna")
        .setAddress(Address.newBuilder().setStreet("Cool Street").setCity("Porto").build())
        .build();

    CustomerState bob = CustomerState.newBuilder().setCustomerId("1")
        .setEmail("bob@example.com")
        .setName("Bob")
        .setAddress(Address.newBuilder().setStreet("Baker Street").setCity("London").build())
        .build();

    CustomerState alice = CustomerState.newBuilder().setCustomerId("1")
        .setEmail("alice@example.com")
        .setName("Alice")
        .setAddress(Address.newBuilder().setStreet("Long Street").setCity("Wroclaw").build())
        .build();

    customerEvents.publish(johanna, "1"); (3)
    customerEvents.publish(bob, "2");
    customerEvents.publish(alice, "3");

    await()
        .ignoreExceptions()
        .atMost(5, TimeUnit.SECONDS)
        .untilAsserted(() -> {
              ByCityRequest byCities = ByCityRequest.newBuilder().addAllCities(List.of("Porto", "London")).build();

              CustomerViewModel.CustomersResponse customersResponse = viewClient
                  .getCustomers(byCities) (4)
                  .toCompletableFuture().get(2, TimeUnit.SECONDS);

              assertTrue(customersResponse.getCustomersList().containsAll(List.of(johanna, bob)));
            }
        );
  }
}
1 Mocks incoming messages from the customer Value Entity.
2 Gets an IncomingMessages from the customer Value Entity.
3 Publishes test data.
4 Queries the view and asserts the results.
Scala
class CustomersResponseByCityViewIntegrationSpec
    extends AnyWordSpec
    with Matchers
    with BeforeAndAfterAll
    with ScalaFutures
    with Eventually {
  private val testKit =
    KalixTestKit(Main.createKalix(), DefaultSettings.withValueEntityIncomingMessages("customer")) (1)
      .start()

  private val viewClient = testKit.getGrpcClient(classOf[CustomersResponseByCity])

  "CustomersResponseByCityView" should {

    "find customers by city" in {
      val customerEvents = testKit.getValueEntityIncomingMessages("customer") (2)

      val johanna = CustomerState("1", "johanna@example.com", "Johanna", Some(Address("Cool Street", "Porto")))
      val bob = CustomerState("2", "bob@example.com", "Bob", Some(Address("Baker Street", "London")))
      val alice = CustomerState("3", "alice@example.com", "Alice", Some(Address("Long Street", "Wroclaw")))

      customerEvents.publish(johanna, "1") (3)
      customerEvents.publish(bob, "2")
      customerEvents.publish(alice, "3")

      eventually {

        val customersResponse = viewClient
          .getCustomers(ByCityRequest(Seq("Porto", "London"))) (4)
          .futureValue

        customersResponse.customers should contain only (johanna, bob)
      }
    }
  }
}
1 Mocks incoming messages from the customer Value Entity.
2 Gets an IncomingMessages from the customer Value Entity.
3 Publishes test data.
4 Queries the view and asserts the results.