Service to Service Eventing

Kalix provides brokerless at-least-once event delivery across Kalix services through the Service to Service eventing.

The source of the events is an Event Sourced Entity. Its events can be published as a stream and consumed by another Kalix service without the need to set up a message broker.

Note

For eventing from an entity inside the same Kalix service as the consuming component, use regular Subscription to the entity instead of Service to Service eventing.

Event Producer

The event producer controls which entity to publish events for. Each entity published is identified by a stream id so that one Kalix service can publish more than one of the entity types it contains.

Java
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService {
    option (kalix.codegen) = {
        action: {}
    };
    option (kalix.service).eventing.in = { (1)
        event_sourced_entity: "customers"
        // skip/filter events that there is no transform method for (AddressChanged)
        ignore_unknown: true (2)
    };
    option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3)
    // limit access to only other services in same project
    option (kalix.service).acl.allow = { service: "*" }; (4)

    // transform methods for turning internal event types to public API
    rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { } (5)
    rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {}

}
1 eventing.in identifying which event sourced entity to publish events for
2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default
3 eventing.out.direct.event_stream_id marks the public identifier for consumers of this stream
4 An ACL annotation, allowing access from other Kalix services, but not the public internet
5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume
Scala
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService {
    option (kalix.codegen) = {
        action: {}
    };
    option (kalix.service).eventing.in = { (1)
        event_sourced_entity: "customers"
        // skip/filter events that there is no transform method for (AddressChanged)
        ignore_unknown: true (2)
    };
    option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3)
    // limit access to only other services in same project
    option (kalix.service).acl.allow = { service: "*" }; (4)

    // transform methods for turning internal event types to public API
    rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { }  (5)
    rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {}

}
1 eventing.in identifying which event sourced entity to publish events for
2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default
3 eventing.out.direct.event_stream_id marks the public identifier for consumers of this stream
4 An ACL annotation, allowing access from other Kalix services, but not the public internet
5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume

The implementation of the producer creates the public API messages and uses the regular Action effects API to return the messages to publish:

Java
src/main/java/customer/api/CustomerEventsServiceAction.java
@Override
public Effect<CustomerEventsApi.Created> transformCustomerCreated(CustomerDomain.CustomerCreated customerCreated) {
  CustomerDomain.CustomerState customer = customerCreated.getCustomer();
  return effects().reply(CustomerEventsApi.Created.newBuilder()
      .setCustomerId(customer.getCustomerId())
      .setCustomerName(customer.getName())
      .setEmail(customer.getEmail())
      .build());
}

@Override
public Effect<CustomerEventsApi.NameChanged> transformCustomerNameChanged(CustomerDomain.CustomerNameChanged customerNameChanged) {
  // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id
  // from the metadata on the consuming side
  return effects().reply(CustomerEventsApi.NameChanged.newBuilder()
      .setCustomerName(customerNameChanged.getNewName())
      .build());
}
Scala
src/main/scala/customer/api/CustomerEventsServiceAction.scala
override def transformCustomerCreated(customerCreated: domain.CustomerCreated): Action.Effect[Created] = {
  val customer = customerCreated.getCustomer
  effects.reply(Created(customer.customerId, customer.name, customer.email))
}

override def transformCustomerNameChanged(
    customerNameChanged: domain.CustomerNameChanged): Action.Effect[NameChanged] = {
  // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id
  // from the metadata on the consuming side
  effects.reply(NameChanged(customerNameChanged.newName))
}

Event Consumer

The consumer can be an Action or a View, annotated with (kalix.service).eventing.in.direct with a service identifying the publishing service, and the event_stream_id to subscribe to.

Since the consumer is in a separate View, we must include the message descriptors for the messages the producing side produces:

Java
src/main/proto/customer/api/publisher_api.proto
message Created {
    string customer_id = 1;
    string customer_name = 2;
    string email = 3;
}
message NameChanged {
    string customer_name = 1;
}
Scala
src/main/proto/customer/api/publisher_api.proto
message Created {
    string customer_id = 1;
    string customer_name = 2;
    string email = 3;
}
message NameChanged {
    string customer_name = 1;
}

We then define a component subscribing to the service to service publisher. In this example we do that with a View:

Java
src/main/proto/customer/view/customer_view.proto
service AllCustomersView {
  option (kalix.codegen) = {
    view: {}
  };

  // consume events published by java-eventsourced-customer-registry/CustomerEventsServiceAction
  option (kalix.service).eventing.in.direct = { (1)
    service: "customer-registry" (2)
    event_stream_id: "customer_events" (3)
  };

  rpc ProcessCustomerCreated(api.Created) returns (api.Customer) { (4)
    option (kalix.method).view.update = {
      table: "all_customers"
      transform_updates: true
    };
  }

  rpc ProcessCustomerNameChanged(api.NameChanged) returns (api.Customer) {
    option (kalix.method).view.update = {
      table: "all_customers"
      transform_updates: true
    };
  }

  rpc GetCustomers(google.protobuf.Empty) returns (stream api.Customer) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM all_customers"
    };
  }
}
1 Service level eventing.in.direct block
2 The name of the Kalix service publishing the event stream
3 The public event_stream_id of the specific stream from the publisher
4 One update method per message type that the stream may contain
Scala
src/main/proto/customer/view/customer_view.proto
service AllCustomersView {
  option (kalix.codegen) = {
    view: {}
  };

  // consume events published by scala-eventsourced-customer-registry/CustomerEventsServiceAction
  option (kalix.service).eventing.in.direct = { (1)
    service: "customer-registry" (2)
    event_stream_id: "customer_events" (3)
  };

  rpc ProcessCustomerCreated(api.Created) returns (api.Customer) { (4)
    option (kalix.method).view.update = {
      table: "all_customers"
      transform_updates: true
    };
  }

  rpc ProcessCustomerNameChanged(api.NameChanged) returns (api.Customer) {
    option (kalix.method).view.update = {
      table: "all_customers"
      transform_updates: true
    };
  }

  rpc GetCustomers(google.protobuf.Empty) returns (stream api.Customer) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM all_customers"
    };
  }
}
1 Service level eventing.in.direct block
2 The name of the Kalix service publishing the event stream
3 The public event_stream_id of the specific stream from the publisher
4 One update method per message type that the stream may contain

Deployment dependent source of events

It is possible to use environment variables to control the name of the service that a consumer consumes from, this is useful for example for using the same image in staging and production deployments but having them consume from different source services.

Referencing environment variables is done with the syntax ${VAR_NAME} in the service string in the consumer eventing.in.direct block.

Note that if changing the service name after it has once been deployed means the consumer will start over from the beginning of the event stream.

See kalix service deploy for details on how to set environment variables when deploying a service.