Implementing Views in Java or Scala
You can access a single Entity with its Entity key. You might want to retrieve multiple Entities, or retrieve them using an attribute other than the key. Kalix Views allow you achieve this. By creating multiple Views, you can optimize for query performance against each one.
Views can be defined from any of the following:
The remainder of this page describes:
Be aware that Views are not updated immediately when Entity state changes. Kalix does update Views as quickly as possible, but it is not instant and can take up to a few seconds for the changes to become visible in the query results. View updates might also take more time during failure scenarios than during normal operation. |
Creating a View from a Value Entity
Consider an example of a Customer Registry service with a customer
Value Entity. When customer
state changes, the entire state is emitted as a value change. Value changes update any associated Views. To create a View that lists customers by their name:
-
Define the View service descriptor for a service that selects customers by name and associates a table name with the View. The table is created and used by Kalix to store the View.
This example assumes the following customer
state is defined in a customer_domain.proto
file:
- Java
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package customer.domain; option java_outer_classname = "CustomerDomain"; message CustomerState { string customer_id = 1; string email = 2; string name = 3; Address address = 4; } message Address { string street = 1; string city = 2; }
- Scala
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package customer.domain; message CustomerState { string customer_id = 1; string email = 2; string name = 3; Address address = 4; } message Address { string street = 1; string city = 2; }
As well as a Value Entity service that will produce the state changes consumed by the View:
- Java
-
src/main/proto/customer/api/customer_api.proto
service CustomerService { option (kalix.codegen) = { value_entity: { name: "customer.domain.CustomerValueEntity" entity_type: "customers" state: "customer.domain.CustomerState" } }; rpc Create(Customer) returns (google.protobuf.Empty) {} rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {} rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {} rpc GetCustomer(GetCustomerRequest) returns (Customer) {} }
- Scala
-
src/main/proto/customer/api/customer_api.proto
service CustomerService { option (kalix.codegen) = { value_entity: { name: "customer.domain.CustomerValueEntity" entity_type: "customers" state: "customer.domain.CustomerState" } }; rpc Create(Customer) returns (google.protobuf.Empty) {} rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {} rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {} rpc GetCustomer(GetCustomerRequest) returns (Customer) {} }
Define the View service descriptor
To get a View of multiple customers by their name, define the View as a service
in Protobuf:
- Java
-
src/main/proto/customer/view/customer_view.proto
syntax = "proto3"; package customer.view; option java_outer_classname = "CustomerViewModel"; import "customer/domain/customer_domain.proto"; import "kalix/annotations.proto"; import "google/protobuf/any.proto"; service CustomerByName { option (kalix.codegen) = { view: {} (1) }; rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { (2) option (kalix.method).eventing.in = { (3) value_entity: "customers" }; option (kalix.method).view.update = { (4) table: "customers" }; } rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) { (5) option (kalix.method).view.query = { (6) query: "SELECT * FROM customers WHERE name = :customer_name" }; } } message ByNameRequest { string customer_name = 1; }
- Scala
-
src/main/proto/customer/view/customer_view.proto
syntax = "proto3"; package customer.view; import "customer/domain/customer_domain.proto"; import "kalix/annotations.proto"; import "google/protobuf/any.proto"; service CustomerByName { option (kalix.codegen) = { view: {} (1) }; rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { (2) option (kalix.method).eventing.in = { (3) value_entity: "customers" }; option (kalix.method).view.update = { (4) table: "customers" }; } rpc GetCustomers(ByNameRequest) returns (stream domain.CustomerState) { (5) option (kalix.method).view.query = { (6) query: "SELECT * FROM customers WHERE name = :customer_name" }; } } message ByNameRequest { string customer_name = 1; }
1 | The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that a View must be generated. |
2 | The UpdateCustomer method defines how Kalix will update the view. |
3 | The source of the View is the "customers" Value Entity. This identifier is defined in the entity_type: "customers" property of the (kalix.codegen).value_entity option in the customer_api.proto file. |
4 | The (kalix.method).view.update annotation defines that this method is used for updating the View. You must define the table attribute for the table to be used in the query. Pick any name and use it in the query SELECT statement. |
5 | The GetCustomers method defines the query to retrieve a stream of customers. |
6 | The (kalix.method).view.query annotation defines that this method is used as a query of the View. |
In this sample we use the internal domain.CustomerState as the state of the view. This is convenient since it allows automatic updates of the view without any logic but has the draw back that it implicitly makes the domain.CustomerState type a part of the public service API. Transforming the state to another type than the incoming update to avoid this can be seen in Creating a View from an Event Sourced Entity.
|
If the query should only return one result, remove the stream
from the return type:
- Java
-
rpc GetCustomer(ByEmailRequest) returns (domain.CustomerState) { (1) option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE email = :email" }; }
- Scala
-
rpc GetCustomer(ByEmailRequest) returns (domain.CustomerState) { (1) option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE email = :email" }; }
1 | Without stream when expecting single result. |
When no result is found, the request fails with gRPC status code NOT_FOUND
. A streamed call completes with an empty stream when no result is found.
Registering a View
Once you’ve defined a View, register it with Kalix
by invoking the KalixFactory.withComponents
method in the Main
class.
- Java
-
src/main/java/customer/Main.java
public static Kalix createKalix() { return KalixFactory.withComponents( CustomerValueEntity::new, CustomerActionImpl::new, CustomerByEmailView::new, CustomerByNameView::new, CustomerSummaryByNameView::new, CustomersResponseByNameView::new); }
- Scala
-
src/main/scala/customer/Main.scala
def createKalix(): Kalix = { KalixFactory.withComponents( new CustomerValueEntity(_), new CustomerActionImpl(_), new CustomerByEmailView(_), new CustomerByNameView(_), new CustomerSummaryByNameView(_), new CustomersResponseByNameView(_)) }
Creating a View from an Event Sourced Entity
Create a View from an Event Sourced Entity by using events that the Entity emits to build a state representation. Using a Customer Registry service example, to create a View for querying customers by name:
The example assumes a customer_domain.proto
file that defines the events that will update the View on name changes:
- Java
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package customer.domain; option java_outer_classname = "CustomerDomain"; message CustomerState { string customer_id = 1; string email = 2; string name = 3; Address address = 4; } message Address { string street = 1; string city = 2; } message CustomerCreated { CustomerState customer = 1; } message CustomerNameChanged { string new_name = 1; } message CustomerAddressChanged { Address new_address = 1; }
- Scala
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package customer.domain; message CustomerState { string customer_id = 1; string email = 2; string name = 3; Address address = 4; } message Address { string street = 1; string city = 2; } message CustomerCreated { CustomerState customer = 1; } message CustomerNameChanged { string new_name = 1; } message CustomerAddressChanged { Address new_address = 1; }
As well as an Event Sourced Entity service that will produce the events consumed by the View:
- Java
-
src/main/proto/customer/api/customer_api.proto
service CustomerService { option (kalix.codegen) = { event_sourced_entity: { name: "customer.domain.CustomerEntity" entity_type: "customers" state: "customer.domain.CustomerState" events: [ "customer.domain.CustomerCreated", "customer.domain.CustomerNameChanged", "customer.domain.CustomerAddressChanged"] } }; rpc Create(Customer) returns (google.protobuf.Empty) {} rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {} rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {} rpc GetCustomer(GetCustomerRequest) returns (Customer) {} }
- Scala
-
src/main/proto/customer/api/customer_api.proto
service CustomerService { option (kalix.codegen) = { event_sourced_entity: { name: "customer.domain.CustomerEntity" entity_type: "customers" state: "customer.domain.CustomerState" events: [ "customer.domain.CustomerCreated", "customer.domain.CustomerNameChanged", "customer.domain.CustomerAddressChanged"] } }; rpc Create(Customer) returns (google.protobuf.Empty) {} rpc ChangeName(ChangeNameRequest) returns (google.protobuf.Empty) {} rpc ChangeAddress(ChangeAddressRequest) returns (google.protobuf.Empty) {} rpc GetCustomer(GetCustomerRequest) returns (Customer) {} }
Define a View descriptor to consume events
The following lines in the .proto
file define a View to consume the CustomerCreated
and CustomerNameChanged
events:
- Java
-
src/main/proto/customer/customer_view.proto
syntax = "proto3"; package customer.view; option java_outer_classname = "CustomerViewModel"; import "customer/domain/customer_domain.proto"; import "customer/api/customer_api.proto"; import "kalix/annotations.proto"; import "google/protobuf/any.proto"; message ByNameRequest { string customer_name = 1; } service CustomerByName { option (kalix.codegen) = { (1) view: {} }; rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { (2) option (kalix.method).eventing.in = { event_sourced_entity: "customers" (3) }; option (kalix.method).view.update = { table: "customers" transform_updates: true (4) }; } rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { (2) option (kalix.method).eventing.in = { event_sourced_entity: "customers" (5) }; option (kalix.method).view.update = { table: "customers" transform_updates: true (6) }; } rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) { option (kalix.method).eventing.in = { event_sourced_entity: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) { option (kalix.method).eventing.in = { event_sourced_entity: "customers" (5) }; option (kalix.method).view.update = { table: "customers" transform_updates: true (6) }; }; rpc GetCustomers(ByNameRequest) returns (stream api.Customer) { option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE name = :customer_name" }; } }
1 The kalix.codegen
option configures code generation to provide base classes and an initial implementation for the class transforming events to updates of the state.2 Define an update method for each event. 3 The source of the View is from the journal of the "customers"
Event Sourced Entity. This identifier is defined in the entity_type: "customers"` property of the(kalix.codegen).event_sourced_entity
option in thecustomer_api.proto
file.4 Enable transform_updates
to build the View state from the events.5 The same event_sourced_entity
for all update methods. Note the requiredtable
attribute. Use any name, which you will reference in the querySELECT
statement.6 Enable transform_updates
for all update methods. - Scala
-
src/main/proto/customer/customer_view.proto
syntax = "proto3"; package customer.view; import "customer/domain/customer_domain.proto"; import "customer/api/customer_api.proto"; import "kalix/annotations.proto"; import "google/protobuf/any.proto"; message ByNameRequest { string customer_name = 1; } service CustomerByName { option (kalix.codegen) = { (1) view: {} }; rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { (2) option (kalix.method).eventing.in = { event_sourced_entity: "customers" (3) }; option (kalix.method).view.update = { table: "customers" transform_updates: true (4) }; } rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { (2) option (kalix.method).eventing.in = { event_sourced_entity: "customers" (5) }; option (kalix.method).view.update = { table: "customers" transform_updates: true (6) }; } rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) { option (kalix.method).eventing.in = { event_sourced_entity: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) { option (kalix.method).eventing.in = { event_sourced_entity: "customers" (5) }; option (kalix.method).view.update = { table: "customers" transform_updates: true (6) }; }; rpc GetCustomers(ByNameRequest) returns (stream api.Customer) { option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE name = :customer_name" }; } }
See Query syntax reference for more examples of valid query syntax.
Create a transformation class
Next, you need to define how to transforms events to state that can be used in the View. An Event Sourced entity can emit many types of events. If a View does not use
all events, you need to ignore unneeded events as shown in the IgnoreOtherEvents
update handler:
The code-generation will generate an implementation class with an initial empty implementation which we’ll discuss below.
View update handlers are implemented in the CustomerByNameView
class as methods that override abstract methods from AbstractCustomerByNameView
. The methods take the current view state as the first parameter and the event as the second parameter. They return an UpdateEffect
, which describes next processing actions, such as updating the view state.
When adding or changing the rpc
definitions, including name, parameter and return messages, in the .proto
files the corresponding methods are regenerated in the abstract class (AbstractCustomerByNameView
). This means that the compiler will assist you with such changes. The IDE can typically fill in missing method signatures and such.
- Java
-
src/main/java/customer/view/CustomerByNameView.java
import kalix.javasdk.view.ViewContext; import com.google.protobuf.Any; import customer.domain.CustomerDomain; import customer.api.CustomerApi; public class CustomerByNameView extends AbstractCustomerByNameView { (1) public CustomerByNameView(ViewContext context) {} @Override public CustomerApi.Customer emptyState() { (2) return null; } @Override (3) public UpdateEffect<CustomerApi.Customer> processCustomerCreated( CustomerApi.Customer state, CustomerDomain.CustomerCreated customerCreated) { if (state != null) { return effects().ignore(); // already created } else { return effects().updateState(convertToApi(customerCreated.getCustomer())); } } @Override (3) public UpdateEffect<CustomerApi.Customer> processCustomerNameChanged( CustomerApi.Customer state, CustomerDomain.CustomerNameChanged customerNameChanged) { return effects().updateState( state.toBuilder().setName(customerNameChanged.getNewName()).build()); } @Override (3) public UpdateEffect<CustomerApi.Customer> processCustomerAddressChanged( CustomerApi.Customer state, CustomerDomain.CustomerAddressChanged customerAddressChanged) { return effects().updateState( state.toBuilder().setAddress(convertToApi(customerAddressChanged.getNewAddress())).build()); } @Override public UpdateEffect<CustomerApi.Customer> ignoreOtherEvents( CustomerApi.Customer state, Any any) { return effects().ignore(); } private CustomerApi.Customer convertToApi(CustomerDomain.CustomerState s) { CustomerApi.Address address = CustomerApi.Address.getDefaultInstance(); if (s.hasAddress()) { address = convertToApi(s.getAddress()); } return CustomerApi.Customer.newBuilder() .setCustomerId(s.getCustomerId()) .setEmail(s.getEmail()) .setName(s.getName()) .setAddress(address) .build(); } private CustomerApi.Address convertToApi(CustomerDomain.Address a) { return CustomerApi.Address.newBuilder() .setStreet(a.getStreet()) .setCity(a.getCity()) .build(); } }
1 Extends the generated AbstractCustomerByNameView
, which extendsView
.
2 Defines the initial, empty, state that is used before any updates. 3 One method for each event. - Scala
-
src/main/scala/customer/view/CustomerByNameView.scala
import kalix.scalasdk.view.View.UpdateEffect import kalix.scalasdk.view.ViewContext import com.google.protobuf.any.{Any => ScalaPbAny} import customer.api import customer.api.Customer import customer.domain import customer.domain.CustomerAddressChanged import customer.domain.CustomerCreated import customer.domain.CustomerNameChanged import customer.domain.CustomerState class CustomerByNameView(context: ViewContext) extends AbstractCustomerByNameView { (1) override def emptyState: Customer = Customer.defaultInstance (2) override def processCustomerCreated( state: Customer, customerCreated: CustomerCreated): UpdateEffect[Customer] = (3) if (state != emptyState) effects.ignore() // already created else effects.updateState(convertToApi(customerCreated.customer.get)) override def processCustomerNameChanged( state: Customer, customerNameChanged: CustomerNameChanged): UpdateEffect[Customer] = (3) effects.updateState(state.copy(name = customerNameChanged.newName)) override def processCustomerAddressChanged( state: Customer, customerAddressChanged: CustomerAddressChanged): UpdateEffect[Customer] = (3) effects.updateState(state.copy(address = customerAddressChanged.newAddress.map(convertToApi))) override def ignoreOtherEvents(state: Customer, any: ScalaPbAny): UpdateEffect[Customer] = effects.ignore() private def convertToApi(customer: CustomerState): Customer = Customer( customerId = customer.customerId, name = customer.name, email = customer.email, address = customer.address.map(convertToApi)) private def convertToApi(address: domain.Address): api.Address = api.Address(street = address.street, city = address.city) }
1 Extends the generated AbstractCustomerByNameView
, which extendsView
.
2 Defines the initial, empty, state that is used before any updates. 3 One method for each event.
This type of update transformation is a natural fit for Events emitted by an Event Sourced Entity, but it can also be used for Value Entities. For example, if the View representation is different from the Entity state you might want to transform it before presenting the View to the client. |
Register the View
Register the View class with Kalix
:
- Java
-
src/main/java/customer/Main.java
public static Kalix createKalix() { return KalixFactory.withComponents( CustomerEntity::new, CustomerByNameView::new); }
- Scala
-
src/main/scala/customer/Main.scala
def createKalix(): Kalix = { KalixFactory.withComponents( new CustomerEntity(_), new CustomerByNameView(_)) }
Creating a View from a topic
The source of a View can be an eventing topic. You define it in the same way as shown in Creating a View from an Event Sourced Entity or Creating a View from a Value Entity, but leave out the eventing.in
annotation in the Protobuf file.
- Java
-
src/main/proto/customer/view/customer_view.proto
syntax = "proto3"; package customer.view; option java_outer_classname = "CustomerViewModel"; import "customer/domain/customer_domain.proto"; import "customer/api/customer_api.proto"; import "kalix/annotations.proto"; import "google/protobuf/any.proto"; service CustomerByNameFromTopic { rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" (1) }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; }; rpc GetCustomers(ByNameRequest) returns (stream api.Customer) { option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE name = :customer_name" }; } }
1 This is the only difference from Creating a View from an Event Sourced Entity. - Scala
-
src/main/proto/customer/view/customer_view.proto
syntax = "proto3"; package customer.view; import "customer/domain/customer_domain.proto"; import "customer/api/customer_api.proto"; import "kalix/annotations.proto"; import "google/protobuf/any.proto"; service CustomerByNameFromTopic { rpc ProcessCustomerCreated(domain.CustomerCreated) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" (1) }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc ProcessCustomerNameChanged(domain.CustomerNameChanged) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc ProcessCustomerAddressChanged(domain.CustomerAddressChanged) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; } rpc IgnoreOtherEvents(google.protobuf.Any) returns (api.Customer) { option (kalix.method).eventing.in = { topic: "customers" }; option (kalix.method).view.update = { table: "customers" transform_updates: true }; }; rpc GetCustomers(ByNameRequest) returns (stream api.Customer) { option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE name = :customer_name" }; } }
1 This is the only difference from Creating a View from an Event Sourced Entity.
How to transform results
When creating a View, you can transform the results as a relational projection instead of using a SELECT *
statement.
Relational projection
Instead of using SELECT *
you can define what columns that will be used in the response message:
- Java
-
message CustomerSummary { string id = 1; string name = 2; } service CustomerSummaryByName { option (kalix.codegen) = { view: {} }; rpc GetCustomers(ByNameRequest) returns (stream CustomerSummary) { option (kalix.method).view.query = { query: "SELECT customer_id AS id, name FROM customers WHERE name = :customer_name" }; } rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { option (kalix.method).eventing.in = { value_entity: "customers" }; option (kalix.method).view.update = { table: "customers" }; } }
- Scala
-
message CustomerSummary { string id = 1; string name = 2; } service CustomerSummaryByName { option (kalix.codegen) = { view: {} }; rpc GetCustomers(ByNameRequest) returns (stream CustomerSummary) { option (kalix.method).view.query = { query: "SELECT customer_id AS id, name FROM customers WHERE name = :customer_name" }; } rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { option (kalix.method).eventing.in = { value_entity: "customers" }; option (kalix.method).view.update = { table: "customers" }; } }
In a similar way, you can include values from the request message in the response, for example :request_id
:
SELECT :request_id, customer_id as id, name FROM customers WHERE name = :customer_name
Response message including the result
Instead of streamed results you can include the results in a repeated field in the response message:
- Java
-
message CustomersResponse { repeated domain.CustomerState results = 1; (1) } service CustomersResponseByName { option (kalix.codegen) = { view: {} }; rpc GetCustomers(ByNameRequest) returns (CustomersResponse) { (2) option (kalix.method).view.query = { query: "SELECT * AS results FROM customers WHERE name = :customer_name" (3) }; } rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { option (kalix.method).eventing.in = { value_entity: "customers" }; option (kalix.method).view.update = { table: "customers" }; } }
- Scala
-
message CustomersResponse { repeated domain.CustomerState results = 1; (1) } service CustomersResponseByName { option (kalix.codegen) = { view: {} }; rpc GetCustomers(ByNameRequest) returns (CustomersResponse) { (2) option (kalix.method).view.query = { query: "SELECT * AS results FROM customers WHERE name = :customer_name" (3) }; } rpc UpdateCustomer(domain.CustomerState) returns (domain.CustomerState) { option (kalix.method).eventing.in = { value_entity: "customers" }; option (kalix.method).view.update = { table: "customers" }; } }
1 | The response message contains a repeated field. |
2 | The return type is not streamed . |
3 | The repeated field is referenced in the query with * AS results . |
How to modify a View
Kalix creates indexes for the View based on the query. For example, the following query will result in a View with an index on the name
column:
SELECT * FROM customers WHERE name = :customer_name
If the query is changed, Kalix might need to add other indexes. For example, changing the above query to filter on the city
would mean that Kalix needs to build a View with the index on the city
column.
SELECT * FROM customers WHERE address.city = :city
Such changes require you to define a new View. Kalix will then rebuild it from the source event log or value changes.
Views from topics cannot be rebuilt from the source messages, because it’s not possible to consume all events from the topic again. The new View will be built from new messages published to the topic. |
Rebuilding a new View may take some time if there are many events that have to be processed. The recommended way when changing a View is multi-step, with two deployments:
-
Define the new View, and keep the old View intact. A new View is defined by a new
service
in Protobuf. TheviewId
is the same as theservice
name, i.e. it will be a differentviewId
than the old View. Keep the oldregister
of the oldservice
inMain
. -
Deploy the new View, and let it rebuild. Verify that the new query works as expected. The old View can still be used.
-
Remove the old View definition and rename the new
service
to the old name if the public API is compatible, but keep the newviewId
by defining it as shown below. -
Deploy the second change.
This is how to define a custom viewId
:
- Java
-
src/main/java/customer/Main.java
public static Kalix createKalix() { Kalix kalix = new Kalix(); kalix.register(CustomerByNameViewProvider.of(CustomerByNameView::new) .withViewId("CustomerByNameV2")); kalix.register(CustomerEntityProvider.of(CustomerEntity::new)); return kalix; }
- Scala
-
src/main/scala/customer/Main.scala
def createKalix(): Kalix = Kalix() .register( CustomerByNameViewProvider(new CustomerByNameView(_)) .withViewId("CustomerByNameV2")) .register(CustomerEntityProvider(new CustomerEntity(_)))
The View definitions are stored and validated when a new version is deployed. There will be an error message if the changes are not compatible.
Streaming view updates
A query can provide a near real time stream of results for the query, emitting new entries matching the query as they are added or updated in the view.
This is done by adding the option stream_updates
to a query method with a stream
reply:
- Java
-
message ByCityRequest { string city = 1; } service CustomerByCityStreaming { option (kalix.codegen) = { view: {} }; // update methods omitted rpc GetCustomers(ByCityRequest) returns (stream api.Customer) { (1) option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE address.city = :city", (2) stream_updates: true (3) }; } }
1 Return type must be a stream
2 A regular query, in this case showing all customers from a specific city 3 The stream_updates
option set totrue
- Scala
-
message ByCityRequest { string city = 1; } service CustomerByCityStreaming { option (kalix.codegen) = { view: {} }; // update methods omitted rpc GetCustomers(ByCityRequest) returns (stream api.Customer) { (1) option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE address.city = :city", (2) stream_updates: true (3) }; } }
1 Return type must be a stream
2 A regular query, in this case showing all customers from a specific city 3 The stream_updates
option set totrue
This will first list the complete result for the query and then keep the response stream open, emitting new or updated entries matching the query as they are added to the view. The stream does not complete until the client closes it.
Note: This is not intended as transport for service to service propagation of updates and does not guarantee delivery, for such use cases you should instead publish events to a topic, see Publishing and Subscribing with Actions
Query syntax reference
Define View queries in a language that is similar to SQL. The following examples illustrate the syntax. To retrieve:
-
All customers without any filtering conditions (no WHERE clause):
SELECT * FROM customers
-
Customers with a name matching the
customer_name
property of the request message:SELECT * FROM customers WHERE name = :customer_name
-
Customers matching the
customer_name
ANDcity
properties of the request message:SELECT * FROM customers WHERE name = :customer_name AND address.city = :city
-
Customers in a city matching a literal value:
SELECT * FROM customers WHERE address.city = 'New York'
Filter predicates
Use filter predicates in WHERE
conditions to further refine results.
Comparison operators
The following comparison operators are supported:
-
=
equals -
!=
not equals -
>
greater than -
>=
greater than or equals -
<
less than -
<=
less than or equals
Logical operators
Combine filter conditions with the AND
operator, and negate using the NOT
operator. Group conditions using parentheses.
OR support is currently disabled, until it can be more efficiently indexed.
|
SELECT * FROM customers WHERE name = :customer_name AND NOT (address.city = 'New York' AND age > 65)
Array operators
Use IN
or = ANY
to check whether a value is contained in a group of values or in an array column or parameter (a repeated
field in the Protobuf message).
Use IN
with a list of values or parameters:
SELECT * FROM customers WHERE email IN ('bob@example.com', :some_email)
Use = ANY
to check against an array column (a repeated
field in the Protobuf message):
SELECT * FROM customers WHERE :some_email = ANY(emails)
Or use = ANY
with a repeated field in the request parameters:
SELECT * FROM customers WHERE email = ANY(:some_emails)
Pattern matching
Use LIKE
to pattern match on strings. The standard SQL LIKE
patterns are supported, with _
(underscore) matching a single character, and %
(percent sign) matching any sequence of zero or more characters.
SELECT * FROM customers WHERE name LIKE 'Bob%'
For index efficiency, the pattern must have a non-wildcard prefix or suffix. A pattern like '%foo%' is not supported. Given this limitation, only constant patterns with literal strings are supported; patterns in request parameters are not allowed.
|
Text search
Use the text_search
function to search text values for words, with automatic tokenization and normalization based on language-specific configuration. The text_search
function takes the text column to search, the query (as a parameter or literal string), and an optional language configuration.
text_search(<column>, <query parameter or string>, [<configuration>])
If the query contains multiple words, the text search will find values that contain all of these words (logically combined with AND), with tokenization and normalization automatically applied.
The following text search language configurations are supported: 'danish'
, 'dutch'
, 'english'
, 'finnish'
, 'french'
, 'german'
, 'hungarian'
, 'italian'
, 'norwegian'
, 'portuguese'
, 'romanian'
, 'russian'
, 'simple'
, 'spanish'
, 'swedish'
, 'turkish'
. By default, a 'simple'
configuration will be used, without language-specific features.
SELECT * FROM customers WHERE text_search(profile, :search_words, 'english')
Text search is currently only available for deployed services, and can’t be used in local testing. |
Data types
The following data types are supported, for their corresponding Protobuf types. Arrays are created for a repeated field in a Protobuf message. Timestamps can be stored and compared using the google.protobuf.Timestamp
message type.
Data type | Protobuf type |
---|---|
Text |
|
Integer |
|
Long (Big Integer) |
|
Float (Real) |
|
Double |
|
Boolean |
|
Byte String |
|
Array |
repeated fields |
Timestamp |
|
Optional fields
Fields in a Protobuf message that were not given a value are handled as [the default value](https://developers.google.com/protocol-buffers/docs/proto3#default) of the field data type.
In some use cases it is important to explicitly express that a value is missing, doing that in a view column can be done in three ways:
-
mark the message field as
optional
-
use one of the Protobuf "wrapper" types for the field (messages in the package
google.protobuf
ending withValue
) -
make the field a part of a nested message and omit that whole nested message, for example
address.street
where the lack of anaddress
message implies there is nostreet
field.
Optional fields with values present can be queried just like regular view fields:
SELECT * FROM customers WHERE phone_number = :number
Finding results with missing values can be done using IS NULL
:
SELECT * FROM customers WHERE phone_number IS NULL
Finding entries with any value present can be queried using IS NOT NULL
:
SELECT * FROM customers WHERE phone_number IS NOT NULL
Optional fields in query requests messages are handled like normal fields if they have a value, however missing optional request parameters are seen as an invalid request and lead to a bad request response.
Sorting
Results for a view query can be sorted. Use ORDER BY
with view columns to sort results in ascending (ASC
, by default) or descending (DESC
) order.
If no explicit ordering is specified in a view query, results will be returned in the natural index order, which is based on the filter predicates in the query.
SELECT * FROM customers WHERE name = :name AND age > :min_age ORDER BY age DESC
Some orderings may be rejected, if the view index cannot be efficiently ordered. Generally, to order by a column it should also appear in the WHERE conditions.
|
Paging
Splitting a query result into one "page" at a time rather than returning the entire result at once is possible in two ways:
-
with a count based offset or
-
a token based offset.
In both cases OFFSET
and LIMIT
are used.
OFFSET
specifies at which offset in the result to start
LIMIT
specifies a maximum number of results to return
Count based offset
The values can either be static, defined up front in the query:
SELECT * FROM customers LIMIT 10
Or come from fields in the request message:
SELECT * FROM customers OFFSET :start_from LIMIT :max_customers
Note: Using numeric offsets can lead to missing or duplicated entries in the result if entries are added to or removed from the view between requests for the pages.
Token based offset
The count based offset requires that you keep track of how far you got by adding the page size to the offset for each query.
An alternative to this is to use a string token emitted by Kalix identifying how far into the result set the paging has reached using the functions next_page_token()
and page_token_offset()
.
When reading the first page, an empty token is provided to page_token_offset
. For each returned result page a new token that can be used to read the next page is returned by next_page_token()
, once the last page has been read, an empty token is returned (see also has_more for determining if the last page was reached).
The size of each page can optionally be specified using LIMIT
, if it is not present a default page size of 100 is used.
With a request and response message for the view like this:
message Request {
string page_token = 1;
}
message Response {
repeated Customer customers = 1;
string next_page_token = 2;
}
A query like this will allow for reading through the view in pages, each containing 10 customers:
SELECT * AS customers, next_page_token() AS next_page_token
FROM customers
OFFSET page_token_offset(:page_token)
LIMIT 10
The token value is not meant to be parseable into any meaningful information other than being a token for reading the next page.
Total count of results
To get the total number of results that will be returned over all pages, use COUNT(*)
in a query that projects its results into a field. The total count will be returned in the aliased field (using AS
) or otherwise into a field named count
.
SELECT * AS customers, COUNT(*) AS total, has_more() AS more FROM customers LIMIT 10
Check if there are more pages
To check if there are more pages left, you can use the function has_more()
providing a boolean value for the result. This works both for the count and token based offset paging, and also when only using LIMIT
without any OFFSET
:
SELECT * AS customers, has_more() AS more_customers FROM customers LIMIT 10
This query will return more_customers = true
when the view contains more than 10 customers.