Service to Service Eventing
Kalix provides brokerless at-least-once event delivery across Kalix services through the Service to Service eventing.
The source of the events is an Event Sourced Entity. Its events can be published as a stream and consumed by another Kalix service without the need to set up a message broker.
- Note
-
For eventing from an entity inside the same Kalix service as the consuming component, use regular Subscription to the entity instead of Service to Service eventing.
Event Producer
The event producer controls which entity to publish events for. Each entity published is identified by a stream id
so that one Kalix service can publish more than one of the entity types it contains.
- Java
-
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService { option (kalix.codegen) = { action: {} }; option (kalix.service).eventing.in = { (1) event_sourced_entity: "customers" // skip/filter events that there is no transform method for (AddressChanged) ignore_unknown: true (2) }; option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3) // limit access to only other services in same project option (kalix.service).acl.allow = { service: "*" }; (4) // transform methods for turning internal event types to public API rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { } (5) rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {} }
1 eventing.in
identifying which event sourced entity to publish events for.2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default. 3 eventing.out.direct.event_stream_id
marks the public identifier for consumers of this stream.4 An ACL annotation, allowing access from other Kalix services, but not the public internet. 5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume. - Scala
-
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService { option (kalix.codegen) = { action: {} }; option (kalix.service).eventing.in = { (1) event_sourced_entity: "customers" // skip/filter events that there is no transform method for (AddressChanged) ignore_unknown: true (2) }; option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3) // limit access to only other services in same project option (kalix.service).acl.allow = { service: "*" }; (4) // transform methods for turning internal event types to public API rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { } (5) rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {} }
1 eventing.in
identifying which event sourced entity to publish events for.2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default. 3 eventing.out.direct.event_stream_id
marks the public identifier for consumers of this stream.4 An ACL annotation, allowing access from other Kalix services, but not the public internet. 5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume.
The implementation of the producer creates the public API messages and uses the regular Action effects
API to return the messages to publish:
- Java
-
src/main/java/customer/api/CustomerEventsServiceAction.java
@Override public Effect<CustomerEventsApi.Created> transformCustomerCreated(CustomerDomain.CustomerCreated customerCreated) { CustomerDomain.CustomerState customer = customerCreated.getCustomer(); return effects().reply(CustomerEventsApi.Created.newBuilder() .setCustomerId(customer.getCustomerId()) .setCustomerName(customer.getName()) .setEmail(customer.getEmail()) .build()); } @Override public Effect<CustomerEventsApi.NameChanged> transformCustomerNameChanged(CustomerDomain.CustomerNameChanged customerNameChanged) { // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id // from the metadata on the consuming side return effects().reply(CustomerEventsApi.NameChanged.newBuilder() .setCustomerName(customerNameChanged.getNewName()) .build()); }
- Scala
-
src/main/scala/customer/api/CustomerEventsServiceAction.scala
override def transformCustomerCreated(customerCreated: domain.CustomerCreated): Action.Effect[Created] = { val customer = customerCreated.getCustomer effects.reply(Created(customer.customerId, customer.name, customer.email)) } override def transformCustomerNameChanged( customerNameChanged: domain.CustomerNameChanged): Action.Effect[NameChanged] = { // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id // from the metadata on the consuming side effects.reply(NameChanged(customerNameChanged.newName)) }
Event Consumer
The consumer can be an Action or a View, annotated with (kalix.service).eventing.in.direct
with a service
identifying the publishing service, and the event_stream_id
to subscribe to.
Since the consumer is in a separate View, we must include the message descriptors for the messages the producing side produces:
- Java
-
src/main/proto/customer/api/publisher_api.proto
message Created { string customer_id = 1; string customer_name = 2; string email = 3; } message NameChanged { string customer_name = 1; }
- Scala
-
src/main/proto/customer/api/publisher_api.proto
message Created { string customer_id = 1; string customer_name = 2; string email = 3; } message NameChanged { string customer_name = 1; }
We then define a component subscribing to the service to service publisher. In this example we do that with a View:
- Java
-
src/main/proto/customer/view/customer_view.proto
service AllCustomersView { option (kalix.codegen) = { view: {} }; // consume events published by java-protobuf-eventsourced-customer-registry/CustomerEventsServiceAction option (kalix.service).eventing.in.direct = { (1) service: "customer-registry" (2) event_stream_id: "customer_events" (3) }; rpc ProcessCustomerCreated(api.Created) returns (api.Customer) { (4) option (kalix.method).view.update = { table: "all_customers" transform_updates: true }; } rpc ProcessCustomerNameChanged(api.NameChanged) returns (api.Customer) { option (kalix.method).view.update = { table: "all_customers" transform_updates: true }; } rpc GetCustomers(google.protobuf.Empty) returns (stream api.Customer) { option (kalix.method).view.query = { query: "SELECT * FROM all_customers" }; } }
1 Service level eventing.in.direct
block.2 The name of the Kalix service publishing the event stream. 3 The public event_stream_id
of the specific stream from the publisher.4 One update method per message type that the stream may contain. - Scala
-
src/main/proto/customer/view/customer_view.proto
service AllCustomersView { option (kalix.codegen) = { view: {} }; // consume events published by scala-protobuf-eventsourced-customer-registry/CustomerEventsServiceAction option (kalix.service).eventing.in.direct = { (1) service: "customer-registry" (2) event_stream_id: "customer_events" (3) }; rpc ProcessCustomerCreated(api.Created) returns (api.Customer) { (4) option (kalix.method).view.update = { table: "all_customers" transform_updates: true }; } rpc ProcessCustomerNameChanged(api.NameChanged) returns (api.Customer) { option (kalix.method).view.update = { table: "all_customers" transform_updates: true }; } rpc GetCustomers(google.protobuf.Empty) returns (stream api.Customer) { option (kalix.method).view.query = { query: "SELECT * FROM all_customers" }; } }
1 Service level eventing.in.direct
block.2 The name of the Kalix service publishing the event stream. 3 The public event_stream_id
of the specific stream from the publisher.4 One update method per message type that the stream may contain.
Deployment dependent source of events
It is possible to use environment variables to control the name of the service that a consumer consumes from, this is useful for example for using the same image in staging and production deployments but having them consume from different source services.
Referencing environment variables is done with the syntax ${VAR_NAME}
in the service
string in the consumer eventing.in.direct
block.
Note that if changing the service
name after it has once been deployed means the consumer will start over from the beginning of the event stream.
See kalix service deploy for details on how to set environment variables when deploying a service.