Publishing and Subscribing

A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.

In this section, we will explore how you can use an Action to:

  • Subscribe to events emitted by an event sourced entity within the same service.

  • Subscribe to state changes emitted by a value entity within the same service.

  • Subscribe to events from Event Sourced Entities published as service to service eventing.

  • Subscribe to external events from Google Cloud Pub/Sub or Apache Kafka.

  • Publish events to a Google Cloud Pub/Sub or Apache Kafka topic.

Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.

Publishing Entity events to a Topic

To illustrate how to publish entity events, we will assume the existence of an Event Sourced Counter entity that emits events of types: ValueIncreased and ValueDecreased. We will get the events delivered to an Action, apply some transformation and let them be published to a topic.

In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

Java
src/main/proto/com/example/actions/counter_topic.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";

option java_outer_classname = "CounterTopicApi";

message Increased {
  int32 value = 1;
}

message Decreased {
  int32 value = 1;
}

service CounterJournalToTopic {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc OnIncreased (com.example.domain.ValueIncreased) returns (Increased) { (3)
    option (kalix.method).eventing.in = { (4)
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = { (5)
      topic: "counter-events"
    };
  }
  
  rpc OnDecreased (com.example.domain.ValueDecreased) returns (Decreased) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = {
      topic: "counter-events"
    };
  }

}
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The Increase method receives the event ValueIncreased and returns the transformed type Increased.
4 The (kalix.method).eventing.in annotation indicates that events from the entity type id counter should be delivered to this method (when the type is ValueIncreased).
5 The (kalix.method).eventing.out annotation indicates that the returned value from this method should be published to the topic called counter-events.
Scala
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";

message Increased {
  int32 value = 1;
}

message Decreased {
  int32 value = 1;
}

service CounterJournalToTopic {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc OnIncreased (com.example.domain.ValueIncreased) returns (Increased) { (3)
    option (kalix.method).eventing.in = { (4)
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = { (5)
      topic: "counter-events"
    };
  }
  
  rpc OnDecreased (com.example.domain.ValueDecreased) returns (Decreased) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = {
      topic: "counter-events"
    };
  }

}
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The Increase method receives the event ValueIncreased and returns the transformed type Increased.
4 The (kalix.method).eventing.in annotation indicates that events from the entity type id counter should be delivered to this method (when the type is ValueIncreased).
5 The (kalix.method).eventing.out annotation indicates that the returned value from this method should be published to the topic called counter-events.

The class CounterJournalToTopicAction gets generated for us based on the proto file defined above.

Java
src/main/java/com/example/actions/CounterJournalToTopicAction.java
public class CounterJournalToTopicAction extends AbstractCounterJournalToTopicAction {
  public CounterJournalToTopicAction(ActionCreationContext creationContext) {}

  @Override
  public Effect<CounterTopicApi.Increased> onIncreased(CounterDomain.ValueIncreased valueIncreased) {
    CounterTopicApi.Increased increased = (1)
      CounterTopicApi.Increased.newBuilder()
        .setValue(valueIncreased.getValue())
        .build();

    return effects().reply(increased); (2)
  }
}
1 We convert the incoming domain event CounterDomain.ValueIncreased to the outgoing topic API CounterTopicApi.Increased.
2 We use the converted object to build a reply. The CounterTopicApi.Increased message will be published to the topic.
Scala
src/main/scala/com/example/actions/CounterJournalToTopicAction.scala
class CounterJournalToTopicAction(creationContext: ActionCreationContext) extends AbstractCounterJournalToTopicAction {

  override def onIncreased(valueIncreased: ValueIncreased): Action.Effect[Increased] = {
    effects.reply(Increased(valueIncreased.value)) (1)
  }
}
1 We convert the incoming domain event ValueIncreased to the outgoing topic API Increased and return that as a reply.

In this example we have published Protobuf messages to the topic which is convenient if the consuming end is also a Kalix service. For an external consumer Protobuf may not be a supported format, for details on publishing other formats see Handling Serialization.

Subscribing to state changes from a Value Entity

Similar to subscribing to events from an Event Sourced Entity, you can also subscribe to state changes from a Value Entity.

Java
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "StateSubscriptionApi";

service CounterStateSubscription {
  option (kalix.codegen) = {
    action: {} (2)
  };

  rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter" (3)
    };
  }

  rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter"
      handle_deletes: true (4)
    };
  }

}
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities).
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The (kalix.method).eventing.in annotation indicates that state changes from the value entity type id counter should be delivered to this method.
4 The second (kalix.method).eventing.in annotation with handle_deletes: true and (required) google.protobuf.Empty as a parameter defines that this method is used for handling value entity deletes.
Scala
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";

service CounterStateSubscription {
  option (kalix.codegen) = {
    action: {} (2)
  };

  rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter" (3)
    };
  }

  rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter"
      handle_deletes: true (4)
    };
  }

}
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities).
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The (kalix.method).eventing.in annotation indicates that state changes from the value entity type id counter should be delivered to this method.
4 The second (kalix.method).eventing.in annotation with handle_deletes: true and (required) google.protobuf.Empty as a parameter defines that this method is used for handling value entity deletes.

Subscribing and acting upon

Another possible usage for Actions is to consume events and act upon.

For example, you may consume events from one entity or from a topic, transform to commands and send to another entity or an external system. This is similar to the usage explained in Actions as Controller, except that the Action is driven by the flow of incoming events instead of external user requests.

For that purpose, it’s enough to add the (kalix.method).eventing.in and omit the (kalix.method).eventing.out.

Service to Service Eventing

Kalix provides brokerless at-least-once event delivery across Kalix services through the Service to Service eventing.

The source of the events is an Event Sourced Entity. Its events can be published as a stream and consumed by another Kalix service without the need to set up a message broker.

Note

For eventing from an entity inside the same Kalix service as the consuming component, use regular Subscription to the entity instead of Service to Service eventing.

Event Producer

The event producer controls which entity to publish events for. Each entity published is identified by a stream id so that one Kalix service can publish more than one of the entity types it contains.

Java
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService {
    option (kalix.codegen) = {
        action: {}
    };
    option (kalix.service).eventing.in = { (1)
        event_sourced_entity: "customers"
        // skip/filter events that there is no transform method for (AddressChanged)
        ignore_unknown: true (2)
    };
    option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3)
    // limit access to only other services in same project
    option (kalix.service).acl.allow = { service: "*" }; (4)

    // transform methods for turning internal event types to public API
    rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { } (5)
    rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {}

}
1 eventing.in identifying which event sourced entity to publish events for.
2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default.
3 eventing.out.direct.event_stream_id marks the public identifier for consumers of this stream.
4 An ACL annotation, allowing access from other Kalix services, but not the public internet.
5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume.
Scala
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService {
    option (kalix.codegen) = {
        action: {}
    };
    option (kalix.service).eventing.in = { (1)
        event_sourced_entity: "customers"
        // skip/filter events that there is no transform method for (AddressChanged)
        ignore_unknown: true (2)
    };
    option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3)
    // limit access to only other services in same project
    option (kalix.service).acl.allow = { service: "*" }; (4)

    // transform methods for turning internal event types to public API
    rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { }  (5)
    rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {}

}
1 eventing.in identifying which event sourced entity to publish events for.
2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default.
3 eventing.out.direct.event_stream_id marks the public identifier for consumers of this stream.
4 An ACL annotation, allowing access from other Kalix services, but not the public internet.
5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume.

The implementation of the producer creates the public API messages and uses the regular Action effects API to return the messages to publish:

Java
src/main/java/customer/api/CustomerEventsServiceAction.java
@Override
public Effect<CustomerEventsApi.Created> transformCustomerCreated(CustomerDomain.CustomerCreated customerCreated) {
  CustomerDomain.CustomerState customer = customerCreated.getCustomer();
  return effects().reply(CustomerEventsApi.Created.newBuilder()
      .setCustomerId(customer.getCustomerId())
      .setCustomerName(customer.getName())
      .setEmail(customer.getEmail())
      .build());
}

@Override
public Effect<CustomerEventsApi.NameChanged> transformCustomerNameChanged(CustomerDomain.CustomerNameChanged customerNameChanged) {
  // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id
  // from the metadata on the consuming side
  return effects().reply(CustomerEventsApi.NameChanged.newBuilder()
      .setCustomerName(customerNameChanged.getNewName())
      .build());
}
Scala
src/main/scala/customer/api/CustomerEventsServiceAction.scala
override def transformCustomerCreated(customerCreated: domain.CustomerCreated): Action.Effect[Created] = {
  val customer = customerCreated.getCustomer
  effects.reply(Created(customer.customerId, customer.name, customer.email))
}

override def transformCustomerNameChanged(
    customerNameChanged: domain.CustomerNameChanged): Action.Effect[NameChanged] = {
  // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id
  // from the metadata on the consuming side
  effects.reply(NameChanged(customerNameChanged.newName))
}

Event Consumer

The consumer can be an Action or a View, annotated with (kalix.service).eventing.in.direct with a service identifying the publishing service, and the event_stream_id to subscribe to.

Since the consumer is in a separate View, we must include the message descriptors for the messages the producing side produces:

Java
src/main/proto/customer/api/publisher_api.proto
message Created {
    string customer_id = 1;
    string customer_name = 2;
    string email = 3;
}
message NameChanged {
    string customer_name = 1;
}
Scala
src/main/proto/customer/api/publisher_api.proto
message Created {
    string customer_id = 1;
    string customer_name = 2;
    string email = 3;
}
message NameChanged {
    string customer_name = 1;
}

We then define a component subscribing to the service to service publisher. In this example we do that with a View:

Java
src/main/proto/customer/view/customer_view.proto
service AllCustomersView {
  option (kalix.codegen) = {
    view: {}
  };

  // consume events published by java-protobuf-eventsourced-customer-registry/CustomerEventsServiceAction
  option (kalix.service).eventing.in.direct = { (1)
    service: "customer-registry" (2)
    event_stream_id: "customer_events" (3)
  };

  rpc ProcessCustomerCreated(api.Created) returns (Customer) { (4)
    option (kalix.method).view.update = {
      table: "all_customers"
    };
  }

  rpc ProcessCustomerNameChanged(api.NameChanged) returns (Customer) {
    option (kalix.method).view.update = {
      table: "all_customers"
    };
  }

  rpc GetCustomers(google.protobuf.Empty) returns (stream Customer) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM all_customers"
    };
  }
}
1 Service level eventing.in.direct block.
2 The name of the Kalix service publishing the event stream.
3 The public event_stream_id of the specific stream from the publisher.
4 One update method per message type that the stream may contain.
Scala
src/main/proto/customer/view/customer_view.proto
service AllCustomersView {
  option (kalix.codegen) = {
    view: {}
  };

  // consume events published by scala-protobuf-eventsourced-customer-registry/CustomerEventsServiceAction
  option (kalix.service).eventing.in.direct = { (1)
    service: "customer-registry" (2)
    event_stream_id: "customer_events" (3)
  };

  rpc ProcessCustomerCreated(api.Created) returns (Customer) { (4)
    option (kalix.method).view.update = {
      table: "all_customers"
    };
  }

  rpc ProcessCustomerNameChanged(api.NameChanged) returns (Customer) {
    option (kalix.method).view.update = {
      table: "all_customers"
    };
  }

  rpc GetCustomers(google.protobuf.Empty) returns (stream Customer) {
    option (kalix.method).view.query = {
      query: "SELECT * FROM all_customers"
    };
  }
}
1 Service level eventing.in.direct block.
2 The name of the Kalix service publishing the event stream.
3 The public event_stream_id of the specific stream from the publisher.
4 One update method per message type that the stream may contain.
If you’re looking to test this locally, you will likely need to run the 2 services in different ports. For more details, consult Running multiple services.

Deployment dependent source of events

It is possible to use environment variables to control the name of the service that a consumer consumes from, this is useful for example for using the same image in staging and production deployments but having them consume from different source services.

Referencing environment variables is done with the syntax ${VAR_NAME} in the service string in the consumer eventing.in.direct block.

Changing the service name after it has once been deployed means the consumer will start over from the beginning of the event stream.

See kalix service deploy for details on how to set environment variables when deploying a service.

Handling Serialization

You do not need to handle serialization for messages. Kalix functions serve gRPC interfaces, and the input and output messages are protobuf messages that get serialized to the protobuf format.

The gRPC services are also exposed as HTTP endpoints with JSON messages. See Transcoding HTTP.

Subscribing to a Topic

It’s also possible to subscribe to a Pub/Sub or Kafka topic. To receive messages from a topic, annotate a service method in the Protobuf service definition with the option (kalix.method).eventing.in annotation and specify the topic name in the topic section of the annotation.

In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

For illustration purpose, we can add a second Action that consumes from the Pub Sub topic counter-events from the previous example.

Java
src/main/proto/com/example/actions/counter_topic_sub.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/actions/counter_topic.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "CounterTopicSubApi";

service CounterTopicSubscription {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc OnIncreased (com.example.actions.Increased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = { (3)
      topic: "counter-events"
    };
  }

  rpc OnDecreased (com.example.actions.Decreased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      topic: "counter-events"
    };
  }
}
1 Import the Counter Topic types from previous example.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in indicating that the source of events is the topic counter-events.
Scala
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/actions/counter_topic.proto"; (1)
import "google/protobuf/empty.proto";

service CounterTopicSubscription {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc OnIncreased (com.example.actions.Increased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = { (3)
      topic: "counter-events"
    };
  }

  rpc OnDecreased (com.example.actions.Decreased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      topic: "counter-events"
    };
  }
}
1 Import the Counter Topic types from previous example.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.
3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in indicating that the source of events is the topic counter-events.

The class CounterTopicSubscriptionAction gets generated for us based on the proto file defined above.

Java
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
public class CounterTopicSubscriptionAction extends AbstractCounterTopicSubscriptionAction {

  private Logger logger = LoggerFactory.getLogger(getClass());

  public CounterTopicSubscriptionAction(ActionCreationContext creationContext) {}

  @Override
  public Effect<Empty> onIncreased(CounterTopicApi.Increased increased) { (1)
    logger.info("Received increase event: " + increased.toString());
    return effects().reply(Empty.getDefaultInstance());
  }

  @Override
  public Effect<Empty> onDecreased(CounterTopicApi.Decreased decreased) {
    logger.info("Received decrease event: " + decreased.toString());
    return effects().reply(Empty.getDefaultInstance());
  }
}
Scala
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
class CounterTopicSubscriptionAction(creationContext: ActionCreationContext)
    extends AbstractCounterTopicSubscriptionAction {
  private val logger = LoggerFactory.getLogger(getClass())
  /** Handler for "Increase". */
  override def onIncreased(increased: Increased): Action.Effect[Empty] = {
    logger.info("Received increase event: " + increased.toString())
    effects.reply(Empty.defaultInstance)
  }

  override def onDecreased(decreased: Decreased): Action.Effect[Empty] = {
    logger.info("Received decrease event: " + decreased.toString())
    effects.reply(Empty.defaultInstance)
  }
}

The events from the topic are delivered to the new Action. The implementation may vary, for this simplified example we are just logging it, but it could a forward to some other component or external service.

Receiving CloudEvents

Kalix uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.

Describing the structure of the message payload is the CloudEvents feature most important to Kalix.

An example of that is the capability to send serialized Protobuf messages and have Kalix deserialize them accordingly.

To allow proper reading of Protobuf messages from topics, the messages need to specify the message attributes:

  • Content-Type = application/protobuf

  • ce-specversion = 1.0

  • ce-type = fully qualified protobuf message name (e.g., shopping.cart.api.TopicOperation)

(The ce- prefixed attributes are part of the CloudEvents specification.)

The Protobuf rpc declaration uses the expected Protobuf message type and specifies the topic to subscribe to. You’ll normally want to share the exact Protobuf message declaration with the sending system.

Java

A proto definition of an Action that consumes CloudEvent messages can look like this:

src/main/proto/com/example/topics_action.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "MyTopics";

message TopicOperation {
  string operation = 1;
}

service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "shopping-cart-protobuf-cloudevents"
    };
  }
}
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata.
Scala

A proto definition of an Action that consumes CloudEvent messages can look like this:

src/main/proto/com/example/json/json_api.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/empty.proto";

message TopicOperation {
  string operation = 1;
}

service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "shopping-cart-protobuf-cloudevents"
    };
  }
}
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata.

Receiving messages from an external source

When a message arrives from a topic, Kalix detects the message payload type based on the Content-Type or ce-datacontenttype header or attribute of the message. If there is no such metadata, the content is handled as raw bytes.

If the content type starts with application/protobuf, application/x-protobuf or application/vnd.google.protobuf the payload is expected to also have a ce-type header or attribute identifying the concrete protobuf message type. Such messages will be decoded into the described message type before being handed to a topic subscriber method, which must accept that specific message type.

If the publishing service is also a Kalix service, this is handled transparently for you as shown in the previous section.

For messages that are consumed from or published to topics when interacting with external services, it can be a requirement to use a format other than protobuf. Other supported message formats include JSON, text, or raw bytes.

In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

JSON

If the incoming content type starts with application/json or application/…​+json and possibly a ce-type field identifying a specific type object in the JSON. The topic subscriber method must accept a protobuf Any message.

Kalix provides a utility to serialize and deserialize JSON messages based on Jackson.

Java

Kalix provides the JsonSupport new tab utility to serialize and deserialize JSON messages.

A proto definition of an Action that consumes JSON messages and produces JSON messages can look like this:

src/main/proto/com/example/json/json_api.proto
import "kalix/annotations.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "MyServiceApi";

message KeyValue {
  string key = 1;
  int32 value = 2;
}

service MyService {
  option (kalix.codegen) = {
    action: {}
  };

  rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "notifications"
    };
  }

  rpc Produce(KeyValue) returns (google.protobuf.Any) { (2)
    option (kalix.method).eventing.out = {
      topic:  "notifications"
    };
  }

}
1 When consuming JSON messages from a topic the input type must be google.protobuf.Any.
2 When producing a JSON message to a topic the return type must be google.protobuf.Any.
The type_url in the google.protobuf.Any must start with json.kalix.io/. The suffix of the type_url is a type hint of the concrete message type that is encoded.
Scala

Kalix provides the JsonSupport new tab utility to serialize and deserialize JSON messages. A proto definition of an Action that consumes JSON messages and produces JSON messages can look like this:

src/main/proto/com/example/json/json_api.proto
import "kalix/annotations.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";

message KeyValue {
  string key = 1;
  int32 value = 2;
}

service MyService {
  option (kalix.codegen) = {
    action: {}
  };

  rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "notifications"
    };
  }

  rpc Produce(KeyValue) returns (google.protobuf.Any) { (2)
    option (kalix.method).eventing.out = {
      topic:  "notifications"
    };
  }

}
1 When consuming JSON messages from a topic the input type must be google.protobuf.any.Any.
2 When producing a JSON message to a topic the return type must be google.protobuf.any.Any.
The type_url in the google.protobuf.any.Any must start with json.kalix.io/. The suffix of the type_url is a type hint of the concrete message type that is encoded.

The corresponding implementation class:

Java
src/main/java/com/example/json/MyServiceAction.java
import kalix.javasdk.JsonSupport;
import kalix.javasdk.action.ActionCreationContext;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyServiceAction extends AbstractMyServiceAction {

  private static final Logger LOG = LoggerFactory.getLogger(MyServiceAction.class);

  public MyServiceAction(ActionCreationContext creationContext) {}

  @Override
  public Effect<Empty> consume(Any any) {
    JsonKeyValueMessage jsonMessage =
        JsonSupport.decodeJson(JsonKeyValueMessage.class, any); (1)
    LOG.info("Consumed " + jsonMessage);
    return effects().reply(Empty.getDefaultInstance());
  }

  @Override
  public Effect<Any> produce(MyServiceApi.KeyValue keyValue) {
    JsonKeyValueMessage jsonMessage =
        new JsonKeyValueMessage(keyValue.getKey(), keyValue.getValue()); (2)
    Any jsonAny = JsonSupport.encodeJson(jsonMessage); (3)
    return effects().reply(jsonAny);
  }
}
1 Decode the JSON message to a Java class JsonKeyValueMessage.
2 Convert the Protobuf message KeyValue to a Java class JsonKeyValueMessage.
3 Encode the Java class JsonKeyValueMessage to JSON.
Scala
src/main/scala/com/example/json/MyServiceAction.scala
class MyServiceAction(creationContext: ActionCreationContext) extends AbstractMyServiceAction {

  private val log = LoggerFactory.getLogger(classOf[MyServiceAction])

  override def consume(any: ScalaPbAny): Action.Effect[Empty] = {
    val jsonMessage = JsonSupport.decodeJson[JsonKeyValueMessage](any) (1)
    log.info("Consumed {}", jsonMessage)
    effects.reply(Empty.defaultInstance)
  }

  override def produce(keyValue: KeyValue): Action.Effect[ScalaPbAny] = {
    val jsonMessage = JsonKeyValueMessage(keyValue.key, keyValue.value) (2)
    val jsonAny = JsonSupport.encodeJson(jsonMessage) (3)
    effects.reply(jsonAny)
  }
}
1 Decode the JSON message to a Scala class JsonKeyValueMessage.
2 Convert the Protobuf message KeyValue to a Scala class JsonKeyValueMessage.
3 Encode the Scala class JsonKeyValueMessage to JSON.

Kalix uses Jackson to serialize JSON.

Text

If the content type starts with text/ it is treated as a string message. The topic subscriber method must accept the google.protobuf.StringValue message.

Java

A proto definition of an Action that consumes String messages can look like this:

src/main/proto/com/example/json/json_api.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "MyTopics";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "strings_topic"
    };
  }
}
1 google.protobuf.StringValue requires the import google/protobuf/wrappers.proto.
2 When consuming text messages from a topic the input type must be google.protobuf.StringValue.
Scala

A proto definition of an Action that consumes String messages can look like this:

src/main/proto/com/example/topics_action.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "strings_topic"
    };
  }
}
1 google.protobuf.StringValue requires the import google/protobuf/wrappers.proto.
2 When consuming text messages from a topic the input type must be google.protobuf.StringValue.

If an action has a return type of StringValue and publishes to a topic, the events published to the topic will have content-type text/plain; charset=utf-8.

Bytes

If the content type is application/octet-stream, no content type is present, or the type is unknown to Kalix the message is treated as a binary message. The topic subscriber method must accept the google.protobuf.BytesValue message.

Java

A proto definition of an Action that consumes binary messages with raw bytes can look like this:

src/main/proto/com/example/topics_action.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "MyTopics";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "bytes_topic"
    };
  }
}
1 google.protobuf.BytesValue requires the import google/protobuf/wrappers.proto.
2 When consuming raw bytes messages from a topic the input type must be google.protobuf.BytesValue.
Scala

A proto definition of an Action that consumes binary messages with raw bytes can look like this:

src/main/proto/com/example/json/json_api.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "bytes_topic"
    };
  }
}
1 google.protobuf.BytesValue requires the import google/protobuf/wrappers.proto.
2 When consuming text messages from a topic the input type must be google.protobuf.BytesValue.

If an action has a return type of BytesValue and publishes to a topic, the events published to the topic will have content-type application/octet-stream.

Deployment dependent topic names

It is possible to use environment variables to control the name of the topic that is used for consuming from or producing events to, this is useful for example for using the same image in staging and production deployments but having them interact with separate topics.

Referencing environment variables is done with the syntax ${VAR_NAME} in the topic string in eventing.in.topic or eventing.out.topic blocks. See kalix service deploy for details on how to set environment variables when deploying a service.

Changing the topic name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.

Accessing the Entity ID

Java

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field subject, accessible through eventSubject in the ActionContextnew tab.

You can access the ActionContextnew tab through method actionContext().

src/main/java/com/example/actions/CounterJournalToTopicAction.java
@Override
public Effect<CounterTopicApi.Increased> onIncreased(CounterDomain.ValueIncreased valueIncreased) {
  Optional<String> counterId = actionContext().eventSubject(); (1)
  ...
}
Scala

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field subject, accessible through eventSubject in the ActionContextnew tab.

You can access the ActionContextnew tab through method actionContext.

src/main/proto/customer/domain/customer_domain.proto
override def onIncreased(valueIncreased: ValueIncreased): Action.Effect[Increased] = {
  val counterId = actionContext.eventSubject (1)
  ...
}

Ignoring events

When consuming events, each event must be matched by a Protobuf service method. In case your component is only interested in certain events, you may declare a method to receive all events that are not received by the other methods. If an event type is not handled, the Action will fail. Actions are designed to restart, but since the handler is missing, it will fail again. Therefore, it’s important to define methods for all events or define a catch-all method in case you want to discard some events.

Java
src/main/proto/com/example/actions/counter_topic.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) {
  option (kalix.method).eventing.in = { (1)
    event_sourced_entity: "counter"
    ignore: true (2)
  };
}
1 We must annotate it with a (kalix.method).eventing.in.
2 Set ignore: true option.
Scala
src/main/proto/customer/domain/customer_domain.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) {
  option (kalix.method).eventing.in = { (1)
    event_sourced_entity: "counter"
    ignore: true (2)
  };
}
1 We must annotate it with a (kalix.method).eventing.in.
2 Set ignore: true option.

The Ignore method here is defined as a catch-all because it has input type Any. Instead of using a catch-all it can be better to define concrete methods for all known event types that should be ignored because then there is no risk of accidentally ignoring events that are added in later evolution of the service.

When adding the ignore: true annotation the corresponding implementation is not needed in the component. It is more efficient to use ignore: true than implementing the method with an immediate reply.

Testing the Integration

When a Kalix service relies on a broker, it might be useful to use integration tests to assert that those boundaries work as intended. For such scenarios, you can either:

  • Use TestKit’s mocked topic:

    • this offers a general API to inject messages into topics or read the messages written to another topic, regardless of the specific broker integration you have configured.

  • Run an external broker instance:

    • if you’re interested in running your integration tests against a real instance, you need to provide the broker instance yourself by running it in a separate process in your local setup and make sure to disable the use of TestKit’s test broker. Currently, the only external broker supported in integration tests is Google PubSub Emulator.

TestKit Mocked Incoming Messages

Following up on the counter entity example used above, let’s consider an example (composed by 2 Actions and 1 Event Sourced entity) as pictured below:

eventing testkit sample

In this example:

  • commands are consumed from an external topic event-commands and forwarded to a Counter entity;

  • the Counter entity is an Event Sourced Entity and has its events published to another topic counter-events.

To test this flow, we will take advantage of the TestKit to be able to push commands into the event-commands topic and check what messages are produced to topic counter-events.

Java
src/it/java/com/example/CounterTopicIntegrationTest.java
import kalix.javasdk.testkit.EventingTestKit;
import kalix.javasdk.testkit.KalixTestKit;
import kalix.javasdk.testkit.junit.jupiter.KalixTestKitExtension;
// ...

public class CounterTopicIntegrationTest {

  /**
   * The test kit starts both the service container and the Kalix Runtime.
   */
  @RegisterExtension
  public static final KalixTestKitExtension testKit =
      new KalixTestKitExtension(Main.createKalix(), KalixTestKit.Settings.DEFAULT
          .withTopicIncomingMessages("counter-commands")
          .withTopicOutgoingMessages("counter-events")); (1)

  private EventingTestKit.IncomingMessages commandsTopic;
  private EventingTestKit.OutgoingMessages eventsTopic;

  public CounterTopicIntegrationTest() {
    commandsTopic = testKit.getTopicIncomingMessages("counter-commands"); (2)
    eventsTopic = testKit.getTopicOutgoingMessages("counter-events"); (3)
  }

  @Test
  public void verifyCounterCommandsAndPublish() {
    var counterId = "test-topic";

    var increaseCmd = CounterApi.IncreaseValue.newBuilder().setCounterId(counterId).setValue(4).build();
    var decreaseCmd = CounterApi.DecreaseValue.newBuilder().setCounterId(counterId).setValue(1).build();
    commandsTopic.publish(increaseCmd, counterId); (4)
    commandsTopic.publish(decreaseCmd, counterId);

    var increasedEvent = eventsTopic.expectOneTyped(CounterTopicApi.Increased.class); (5)
    var decreasedEvent = eventsTopic.expectOneTyped(CounterTopicApi.Decreased.class);
    assertEquals(increaseCmd.getValue(), increasedEvent.getPayload().getValue()); (6)
    assertEquals(decreaseCmd.getValue(), decreasedEvent.getPayload().getValue());
  }
}
1 Start the TestKit. Set the configuration to mock incoming messages from the counter-commands topic and mock outgoing messages from the counter-events topic.
2 Get IncomingMessages for the topic named counter-commands from the TestKit.
3 Get OutgoingMessages for the topic named counter-events from the TestKit.
4 Build 2 commands and publish both to the topic. The counterId is passed as the subject id of the message.
5 Read 2 messages, one at a time. We pass in the expected class type for the next message.
6 Assert the received messages have the same value as the commands sent.
Scala
src/test/scala/com/example/CounterServiceIntegrationSpec.scala
import kalix.scalasdk.testkit.{ KalixTestKit, Message }
import org.scalatest.BeforeAndAfterEach
// ...

class CounterServiceIntegrationSpec
    extends AnyWordSpec
    with Matchers
    with BeforeAndAfterEach
    with BeforeAndAfterAll
    with ScalaFutures {

  private val testKit = KalixTestKit(
    Main.createKalix(),
    KalixTestKit.DefaultSettings
      .withTopicIncomingMessages("counter-commands")
      .withTopicOutgoingMessages("counter-events")).start() (1)
  private val commandsTopic = testKit.getTopicIncomingMessages("counter-commands") (2)
  private val eventsTopic = testKit.getTopicOutgoingMessages("counter-events") (3)

  "CounterService" must {
    val counterId = "xyz"
    "handle commands from topic and publishing related events out" in {
      commandsTopic.publish(IncreaseValue(counterId, 4), counterId) (4)
      commandsTopic.publish(DecreaseValue(counterId, 1), counterId)

      val Message(incEvent, _) = eventsTopic.expectOneTyped[Increased] (5)
      val Message(decEvent, _) = eventsTopic.expectOneTyped[Decreased]
      incEvent shouldBe Increased(4) (6)
      decEvent shouldBe Decreased(1)
    }
  }

  override def afterAll(): Unit = {
    testKit.stop()
    super.afterAll()
  }
}
1 Start the TestKit. Set the configuration to mock incoming messages from the counter-commands topic and mock outgoing messages from the counter-events topic.
2 Get IncomingMessages for topic named counter-commands from the TestKit.
3 Get OutgoingMessages for topic named counter-events from the TestKit.
4 Build 2 commands and publish both to the topic. Note the counterId is passed as the subject id of the message.
5 Read 2 messages, one at a time and assert the received messages values. Note we pass in the expected class type for the next message.
In the example above we take advantage of the TestKit to serialize / deserialize the messages and pass all the required metadata automatically for us. However, the API also offers the possibility to read and write raw bytes, construct your metadata or read multiple messages at once.

Metadata

Typically, messages are published with associated metadata. If you want to construct your own Metadata to be consumed by a service or make sure the messages published out of your service have specific metadata attached, you can do so using the TestKit, as shown below.

Java
src/it/java/com/example/CounterTopicIntegrationTest.java
@Test
public void verifyCounterCommandsAndPublishWithMetadata() {
  var counterId = "test-topic-metadata";
  var increaseCmd = CounterApi.IncreaseValue.newBuilder().setCounterId(counterId).setValue(10).build();

  var metadata = CloudEvent.of( (1)
          "cmd1",
          URI.create("CounterTopicIntegrationTest"),
          increaseCmd.getDescriptorForType().getFullName())
      .withSubject(counterId) (2)
      .asMetadata()
      .add("Content-Type", "application/protobuf"); (3)

  commandsTopic.publish(testKit.getMessageBuilder().of(increaseCmd, metadata)); (4)

  var increasedEvent = eventsTopicWithMeta.expectOneTyped(CounterTopicApi.Increased.class, Duration.ofSeconds(10));
  var actualMd = increasedEvent.getMetadata(); (5)
  assertEquals(counterId, actualMd.asCloudEvent().subject().get()); (6)
  assertEquals("application/protobuf", actualMd.get("Content-Type").get());
}
1 Build a CloudEvent object with the 3 required attributes, respectively: id, source and type.
2 Add the subject to which the message is related, that is the counterId.
3 Set the mandatory header "Content-Type" accordingly.
4 Build and publish the message along with its metadata to topic commandsTopic.
5 Upon receiving the message, access the metadata.
6 Assert the headers Content-Type and ce-subject (every CloudEvent header is prefixed with "ce-") have the expected values.
Scala
src/test/scala/com/example/CounterServiceIntegrationSpec.scala
"allow passing and reading metadata for messages" in {
  val increaseCmd = IncreaseValue(counterId, 4)
  val md = CloudEvent( (1)
    id = "cmd1",
    source = URI.create("CounterServiceIntegrationSpec"),
    `type` = increaseCmd.companion.javaDescriptor.getFullName)
    .withSubject(counterId) (2)
    .asMetadata
    .add("Content-Type", "application/protobuf"); (3)

  commandsTopic.publish(Message(increaseCmd, md)) (4)

  val Message(incEvent, actualMd) = eventsTopicWithMeta.expectOneTyped[Increased] (5)
  incEvent shouldBe Increased(4)
  actualMd.get("Content-Type") should contain("application/protobuf") (6)
  actualMd.asCloudEvent.subject should contain(counterId)
}
1 Build a CloudEvent object with the 3 required attributes, respectively: id, source and type.
2 Add the subject to which the message is related, that is the counterId.
3 Set the mandatory header "Content-Type" accordingly.
4 Build and publish the message along with its metadata to topic commandsTopic.
5 Receive the message of correct type and extract Metadata.
6 Assert the headers Content-Type and ce-subject (every CloudEvent header is prefixed with "ce-") have the expected values.

One Suite, Multiple Tests

When running multiple test cases under the same test suite and thus using a common TestKit instance, you might face some issues if unconsumed messages from previous tests mess up with the current one. To avoid this, be sure to:

  • have the tests run in sequence, not in parallel;

  • clear the contents of the topics in use before the test.

As an alternative, you can consider using different test suites which will use independent TestKit instances.

Java
src/it/java/com/example/CounterTopicIntegrationTest.java
@BeforeEach (1)
public void clearTopics() {
  eventsTopic.clear(); (2)
  eventsTopicWithMeta.clear();
}
1 Run this before each test.
2 Clear the topic ignoring any unread messages.
Scala
src/test/scala/com/example/CounterServiceIntegrationSpec.scala
override def beforeEach(): Unit = { (1)
  eventsTopic.clear() (2)
  eventsTopicWithMeta.clear()
}
1 Override method from trait BeforeAndAfterEach.
2 Clear the topic ignoring any unread messages.
Despite the example, you are neither forced to clear all topics nor to do it before each test. You can do it selectively, or you might not even need it depending on your tests and the flows they test.

External Broker

To run an integration test against a real instance of Google PubSub (or its Emulator) or Kafka, use the TestKit settings to override the default eventing support, as shown below:

Java
private static final KalixTestKitExtension testKit = new KalixTestKitExtension(
        Main.createKalix(),
        Settings.DEFAULT.withEventingSupport(EventingSupport.GOOGLE_PUBSUB)
    );
Scala
private val testKit = KalixTestKit(
    Main.createKalix(),
    DefaultSettings.withEventingSupport(GooglePubSub)
  ).start()