Service to Service Eventing
Kalix provides brokerless at-least-once event delivery across Kalix services through the Service to Service eventing.
The source of the events is an Event Sourced Entity. Its events can be published as a stream and consumed by another Kalix service without the need to set up a message broker.
- Note
-
For eventing from an entity inside the same Kalix service as the consuming component, use regular Subscription to the entity instead of Service to Service eventing.
Event Producer
The event producer controls which entity to publish events for. Each entity published is identified by a stream id
so
that one Kalix service can publish more than one of the entity types it contains.
@Subscribe.EventSourcedEntity(
value = CustomerEntity.class, (1)
ignoreUnknown = true) (2)
@Publish.Stream(id = "customer_events") (3)
@Acl(allow = @Acl.Matcher(service = "*")) (4)
public class CustomerEventsService extends Action {
public Effect<CustomerPublicEvent.Created> onEvent( (5)
CustomerEvent.CustomerCreated created) {
return effects().reply(
new CustomerPublicEvent.Created(created.email(), created.name()));
}
public Effect<CustomerPublicEvent.NameChanged> onEvent( (5)
CustomerEvent.NameChanged nameChanged) {
return effects().reply(new CustomerPublicEvent.NameChanged(nameChanged.newName()));
}
}
1 | Identifying which event sourced entity to publish events for. |
2 | Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default. |
3 | Public identifier for consumers of this stream. |
4 | Allowing access from other Kalix services, but not the public internet. |
5 | All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume. |
Event Consumer
The consumer can be an Action or a View, annotated with @Subscribe.Stream
with a service
identifying the publishing service, and the id
of the stream to subscribe to.
We then define a component subscribing to the service to service publisher. In this example we do that with a View:
- Java
-
src/main/java/customer/views/CustomersByNameView.java
@Table("customers_by_name") @Subscribe.Stream( (1) service = "customer-registry", (2) id = "customer_events" (3) ) public class CustomersByNameView extends View<Customer> { public UpdateEffect<Customer> onEvent( (4) CustomerPublicEvent.Created created) { var id = updateContext().eventSubject().get(); return effects().updateState( new Customer(id, created.email(), created.name())); } public UpdateEffect<Customer> onEvent( CustomerPublicEvent.NameChanged nameChanged) { var updated = viewState().withName(nameChanged.newName()); return effects().updateState(updated); } @GetMapping("/customers/by_name/{name}") @Query("SELECT * FROM customers_by_name WHERE name = :name") @Acl(allow = @Acl.Matcher(principal = Acl.Principal.INTERNET)) public Flux<Customer> findByName(@PathVariable String name) { return null; } }
1 Annotate the component with @Subscribe.Stream
to subscribe to an event stream from another Kalix service.2 The name of the Kalix service publishing the event stream. 3 The public identifier of the specific stream from the publisher. 4 One update method per message type that the stream may contain.