Publishing and Subscribing
A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.
In this section, we will explore how you can use an Action to:
-
Subscribe to events emitted by an event sourced entity within the same service.
-
Subscribe to state changes emitted by a value entity within the same service.
-
Subscribe to events from Event Sourced Entities published as service to service eventing.
-
Subscribe to external events from Google Cloud Pub/Sub or Apache Kafka.
-
Publish events to a Google Cloud Pub/Sub or Apache Kafka topic.
Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages. |
Publishing Entity events to a Topic
To illustrate how to publish entity events, we will assume the existence of an Event Sourced Counter entity that emits events of types: ValueIncreased
and ValueDecreased
. We will get the events delivered to an Action, apply some transformation and let them be published to a topic.
In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers |
- Java
-
src/main/proto/com/example/actions/counter_topic.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; import "google/protobuf/any.proto"; option java_outer_classname = "CounterTopicApi"; message Increased { int32 value = 1; } message Decreased { int32 value = 1; } service CounterJournalToTopic { option (kalix.codegen) = { action: {} (2) }; rpc OnIncreased (com.example.domain.ValueIncreased) returns (Increased) { (3) option (kalix.method).eventing.in = { (4) event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { (5) topic: "counter-events" }; } rpc OnDecreased (com.example.domain.ValueDecreased) returns (Decreased) { option (kalix.method).eventing.in = { event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { topic: "counter-events" }; } }
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The Increase
method receives the eventValueIncreased
and returns the transformed typeIncreased
.4 The (kalix.method).eventing.in
annotation indicates that events from the entity type idcounter
should be delivered to this method (when the type isValueIncreased
).5 The (kalix.method).eventing.out
annotation indicates that the returned value from this method should be published to the topic calledcounter-events
. - Scala
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; import "google/protobuf/any.proto"; message Increased { int32 value = 1; } message Decreased { int32 value = 1; } service CounterJournalToTopic { option (kalix.codegen) = { action: {} (2) }; rpc OnIncreased (com.example.domain.ValueIncreased) returns (Increased) { (3) option (kalix.method).eventing.in = { (4) event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { (5) topic: "counter-events" }; } rpc OnDecreased (com.example.domain.ValueDecreased) returns (Decreased) { option (kalix.method).eventing.in = { event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { topic: "counter-events" }; } }
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The Increase
method receives the eventValueIncreased
and returns the transformed typeIncreased
.4 The (kalix.method).eventing.in
annotation indicates that events from the entity type idcounter
should be delivered to this method (when the type isValueIncreased
).5 The (kalix.method).eventing.out
annotation indicates that the returned value from this method should be published to the topic calledcounter-events
.
The class CounterJournalToTopicAction
gets generated for us based on the proto file defined above.
- Java
-
src/main/java/com/example/actions/CounterJournalToTopicAction.java
public class CounterJournalToTopicAction extends AbstractCounterJournalToTopicAction { public CounterJournalToTopicAction(ActionCreationContext creationContext) {} @Override public Effect<CounterTopicApi.Increased> onIncreased(CounterDomain.ValueIncreased valueIncreased) { CounterTopicApi.Increased increased = (1) CounterTopicApi.Increased.newBuilder() .setValue(valueIncreased.getValue()) .build(); return effects().reply(increased); (2) } }
1 We convert the incoming domain event CounterDomain.ValueIncreased
to the outgoing topic APICounterTopicApi.Increased
.2 We use the converted object to build a reply. The CounterTopicApi.Increased
message will be published to the topic. - Scala
-
src/main/scala/com/example/actions/CounterJournalToTopicAction.scala
class CounterJournalToTopicAction(creationContext: ActionCreationContext) extends AbstractCounterJournalToTopicAction { override def onIncreased(valueIncreased: ValueIncreased): Action.Effect[Increased] = { effects.reply(Increased(valueIncreased.value)) (1) } }
1 We convert the incoming domain event ValueIncreased
to the outgoing topic APIIncreased
and return that as a reply.
In this example we have published Protobuf messages to the topic which is convenient if the consuming end is also a Kalix service. For an external consumer Protobuf may not be a supported format, for details on publishing other formats see Handling Serialization.
Subscribing to state changes from a Value Entity
Similar to subscribing to events from an Event Sourced Entity, you can also subscribe to state changes from a Value Entity.
- Java
-
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "StateSubscriptionApi"; service CounterStateSubscription { option (kalix.codegen) = { action: {} (2) }; rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" (3) }; } rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" handle_deletes: true (4) }; } }
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities). 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The (kalix.method).eventing.in
annotation indicates that state changes from the value entity type idcounter
should be delivered to this method.4 The second (kalix.method).eventing.in
annotation withhandle_deletes: true
and (required)google.protobuf.Empty
as a parameter defines that this method is used for handling value entity deletes. - Scala
-
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; service CounterStateSubscription { option (kalix.codegen) = { action: {} (2) }; rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" (3) }; } rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" handle_deletes: true (4) }; } }
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities). 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The (kalix.method).eventing.in
annotation indicates that state changes from the value entity type idcounter
should be delivered to this method.4 The second (kalix.method).eventing.in
annotation withhandle_deletes: true
and (required)google.protobuf.Empty
as a parameter defines that this method is used for handling value entity deletes.
Subscribing and acting upon
Another possible usage for Actions is to consume events and act upon.
For example, you may consume events from one entity or from a topic, transform to commands and send to another entity or an external system. This is similar to the usage explained in Actions as Controller, except that the Action is driven by the flow of incoming events instead of external user requests.
For that purpose, it’s enough to add the (kalix.method).eventing.in
and omit the (kalix.method).eventing.out
.
Service to Service Eventing
Kalix provides brokerless at-least-once event delivery across Kalix services through the Service to Service eventing.
The source of the events is an Event Sourced Entity. Its events can be published as a stream and consumed by another Kalix service without the need to set up a message broker.
- Note
-
For eventing from an entity inside the same Kalix service as the consuming component, use regular Subscription to the entity instead of Service to Service eventing.
Event Producer
The event producer controls which entity to publish events for. Each entity published is identified by a stream id
so that one Kalix service can publish more than one of the entity types it contains.
- Java
-
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService { option (kalix.codegen) = { action: {} }; option (kalix.service).eventing.in = { (1) event_sourced_entity: "customers" // skip/filter events that there is no transform method for (AddressChanged) ignore_unknown: true (2) }; option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3) // limit access to only other services in same project option (kalix.service).acl.allow = { service: "*" }; (4) // transform methods for turning internal event types to public API rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { } (5) rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {} }
1 eventing.in
identifying which event sourced entity to publish events for.2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default. 3 eventing.out.direct.event_stream_id
marks the public identifier for consumers of this stream.4 An ACL annotation, allowing access from other Kalix services, but not the public internet. 5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume. - Scala
-
src/main/proto/customer/api/direct_customer_events.proto
service CustomerEventsService { option (kalix.codegen) = { action: {} }; option (kalix.service).eventing.in = { (1) event_sourced_entity: "customers" // skip/filter events that there is no transform method for (AddressChanged) ignore_unknown: true (2) }; option (kalix.service).eventing.out.direct.event_stream_id = "customer_events"; (3) // limit access to only other services in same project option (kalix.service).acl.allow = { service: "*" }; (4) // transform methods for turning internal event types to public API rpc TransformCustomerCreated(domain.CustomerCreated) returns (Created) { } (5) rpc TransformCustomerNameChanged(domain.CustomerNameChanged) returns (NameChanged) {} }
1 eventing.in
identifying which event sourced entity to publish events for.2 Ignore any event types not handled by a method and move on with the event stream, rather than fail which is the default. 3 eventing.out.direct.event_stream_id
marks the public identifier for consumers of this stream.4 An ACL annotation, allowing access from other Kalix services, but not the public internet. 5 All methods on the service are transformer methods for turning internal event message types into public API message types for other services to consume.
The implementation of the producer creates the public API messages and uses the regular Action effects
API to return the messages to publish:
- Java
-
src/main/java/customer/api/CustomerEventsServiceAction.java
@Override public Effect<CustomerEventsApi.Created> transformCustomerCreated(CustomerDomain.CustomerCreated customerCreated) { CustomerDomain.CustomerState customer = customerCreated.getCustomer(); return effects().reply(CustomerEventsApi.Created.newBuilder() .setCustomerId(customer.getCustomerId()) .setCustomerName(customer.getName()) .setEmail(customer.getEmail()) .build()); } @Override public Effect<CustomerEventsApi.NameChanged> transformCustomerNameChanged(CustomerDomain.CustomerNameChanged customerNameChanged) { // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id // from the metadata on the consuming side return effects().reply(CustomerEventsApi.NameChanged.newBuilder() .setCustomerName(customerNameChanged.getNewName()) .build()); }
- Scala
-
src/main/scala/customer/api/CustomerEventsServiceAction.scala
override def transformCustomerCreated(customerCreated: domain.CustomerCreated): Action.Effect[Created] = { val customer = customerCreated.getCustomer effects.reply(Created(customer.customerId, customer.name, customer.email)) } override def transformCustomerNameChanged( customerNameChanged: domain.CustomerNameChanged): Action.Effect[NameChanged] = { // Note: customer_id is not present in the event or elsewhere here, but will be available as subject id // from the metadata on the consuming side effects.reply(NameChanged(customerNameChanged.newName)) }
Event Consumer
The consumer can be an Action or a View, annotated with (kalix.service).eventing.in.direct
with a service
identifying the publishing service, and the event_stream_id
to subscribe to.
Since the consumer is in a separate View, we must include the message descriptors for the messages the producing side produces:
- Java
-
src/main/proto/customer/api/publisher_api.proto
message Created { string customer_id = 1; string customer_name = 2; string email = 3; } message NameChanged { string customer_name = 1; }
- Scala
-
src/main/proto/customer/api/publisher_api.proto
message Created { string customer_id = 1; string customer_name = 2; string email = 3; } message NameChanged { string customer_name = 1; }
We then define a component subscribing to the service to service publisher. In this example we do that with a View:
- Java
-
src/main/proto/customer/view/customer_view.proto
service AllCustomersView { option (kalix.codegen) = { view: {} }; // consume events published by java-protobuf-eventsourced-customer-registry/CustomerEventsServiceAction option (kalix.service).eventing.in.direct = { (1) service: "customer-registry" (2) event_stream_id: "customer_events" (3) }; rpc ProcessCustomerCreated(api.Created) returns (Customer) { (4) option (kalix.method).view.update = { table: "all_customers" }; } rpc ProcessCustomerNameChanged(api.NameChanged) returns (Customer) { option (kalix.method).view.update = { table: "all_customers" }; } rpc GetCustomers(google.protobuf.Empty) returns (stream Customer) { option (kalix.method).view.query = { query: "SELECT * FROM all_customers" }; } }
1 Service level eventing.in.direct
block.2 The name of the Kalix service publishing the event stream. 3 The public event_stream_id
of the specific stream from the publisher.4 One update method per message type that the stream may contain. - Scala
-
src/main/proto/customer/view/customer_view.proto
service AllCustomersView { option (kalix.codegen) = { view: {} }; // consume events published by scala-protobuf-eventsourced-customer-registry/CustomerEventsServiceAction option (kalix.service).eventing.in.direct = { (1) service: "customer-registry" (2) event_stream_id: "customer_events" (3) }; rpc ProcessCustomerCreated(api.Created) returns (Customer) { (4) option (kalix.method).view.update = { table: "all_customers" }; } rpc ProcessCustomerNameChanged(api.NameChanged) returns (Customer) { option (kalix.method).view.update = { table: "all_customers" }; } rpc GetCustomers(google.protobuf.Empty) returns (stream Customer) { option (kalix.method).view.query = { query: "SELECT * FROM all_customers" }; } }
1 Service level eventing.in.direct
block.2 The name of the Kalix service publishing the event stream. 3 The public event_stream_id
of the specific stream from the publisher.4 One update method per message type that the stream may contain.
If you’re looking to test this locally, you will likely need to run the 2 services in different ports. For more details, consult Running multiple services. |
Deployment dependent source of events
It is possible to use environment variables to control the name of the service that a consumer consumes from, this is useful for example for using the same image in staging and production deployments but having them consume from different source services.
Referencing environment variables is done with the syntax ${VAR_NAME}
in the service
string in the consumer eventing.in.direct
block.
Changing the service name after it has once been deployed means the consumer will start over from the beginning of the event stream.
|
See kalix service deploy for details on how to set environment variables when deploying a service.
Handling Serialization
You do not need to handle serialization for messages. Kalix functions serve gRPC interfaces, and the input and output messages are protobuf
messages that get serialized to the protobuf
format.
The gRPC services are also exposed as HTTP endpoints with JSON messages. See Transcoding HTTP.
Subscribing to a Topic
It’s also possible to subscribe to a Pub/Sub or Kafka topic. To receive messages from a topic, annotate a service method in the Protobuf service definition with the option (kalix.method).eventing.in
annotation and specify the topic name in the topic
section of the annotation.
In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers |
For illustration purpose, we can add a second Action that consumes from the Pub Sub topic counter-events
from the previous example.
- Java
-
src/main/proto/com/example/actions/counter_topic_sub.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/actions/counter_topic.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "CounterTopicSubApi"; service CounterTopicSubscription { option (kalix.codegen) = { action: {} (2) }; rpc OnIncreased (com.example.actions.Increased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (3) topic: "counter-events" }; } rpc OnDecreased (com.example.actions.Decreased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { topic: "counter-events" }; } }
1 Import the Counter Topic types from previous example. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in
indicating that the source of events is the topiccounter-events
. - Scala
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/actions/counter_topic.proto"; (1) import "google/protobuf/empty.proto"; service CounterTopicSubscription { option (kalix.codegen) = { action: {} (2) }; rpc OnIncreased (com.example.actions.Increased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (3) topic: "counter-events" }; } rpc OnDecreased (com.example.actions.Decreased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { topic: "counter-events" }; } }
1 Import the Counter Topic types from previous example. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated. 3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in
indicating that the source of events is the topiccounter-events
.
The class CounterTopicSubscriptionAction
gets generated for us based on the proto file defined above.
- Java
-
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
public class CounterTopicSubscriptionAction extends AbstractCounterTopicSubscriptionAction { private Logger logger = LoggerFactory.getLogger(getClass()); public CounterTopicSubscriptionAction(ActionCreationContext creationContext) {} @Override public Effect<Empty> onIncreased(CounterTopicApi.Increased increased) { (1) logger.info("Received increase event: " + increased.toString()); return effects().reply(Empty.getDefaultInstance()); } @Override public Effect<Empty> onDecreased(CounterTopicApi.Decreased decreased) { logger.info("Received decrease event: " + decreased.toString()); return effects().reply(Empty.getDefaultInstance()); } }
- Scala
-
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
class CounterTopicSubscriptionAction(creationContext: ActionCreationContext) extends AbstractCounterTopicSubscriptionAction { private val logger = LoggerFactory.getLogger(getClass()) /** Handler for "Increase". */ override def onIncreased(increased: Increased): Action.Effect[Empty] = { logger.info("Received increase event: " + increased.toString()) effects.reply(Empty.defaultInstance) } override def onDecreased(decreased: Decreased): Action.Effect[Empty] = { logger.info("Received decrease event: " + decreased.toString()) effects.reply(Empty.defaultInstance) } }
The events from the topic are delivered to the new Action. The implementation may vary, for this simplified example we are just logging it, but it could a forward to some other component or external service.
Receiving CloudEvents
Kalix uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.
Describing the structure of the message payload is the CloudEvents feature most important to Kalix.
An example of that is the capability to send serialized Protobuf messages and have Kalix deserialize them accordingly.
To allow proper reading of Protobuf messages from topics, the messages need to specify the message attributes:
-
Content-Type
=application/protobuf
-
ce-specversion
=1.0
-
ce-type
= fully qualified protobuf message name (e.g.,shopping.cart.api.TopicOperation
)
(The ce-
prefixed attributes are part of the CloudEvents specification.)
The Protobuf rpc declaration uses the expected Protobuf message type and specifies the topic to subscribe to. You’ll normally want to share the exact Protobuf message declaration with the sending system.
- Java
-
A
proto
definition of an Action that consumes CloudEvent messages can look like this:src/main/proto/com/example/topics_action.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/empty.proto"; option java_outer_classname = "MyTopics"; message TopicOperation { string operation = 1; } service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "shopping-cart-protobuf-cloudevents" }; } }
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata. - Scala
-
A
proto
definition of an Action that consumes CloudEvent messages can look like this:src/main/proto/com/example/json/json_api.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/empty.proto"; message TopicOperation { string operation = 1; } service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "shopping-cart-protobuf-cloudevents" }; } }
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata.
Receiving messages from an external source
When a message arrives from a topic, Kalix detects the message payload type based on the Content-Type
or ce-datacontenttype
header or attribute of the message. If there is no such metadata, the content is handled as raw bytes.
If the content type starts with application/protobuf
, application/x-protobuf
or application/vnd.google.protobuf
the payload is expected to also have a ce-type
header or attribute identifying the concrete protobuf message type. Such messages will be decoded into the described message type before being handed to a topic subscriber method, which must accept that specific message type.
If the publishing service is also a Kalix service, this is handled transparently for you as shown in the previous section.
For messages that are consumed from or published to topics when interacting with external services, it can be a requirement to use a format other than protobuf
. Other supported message formats include JSON, text, or raw bytes.
In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers |
JSON
If the incoming content type starts with application/json
or application/…+json
and possibly a ce-type
field identifying a specific type object in the JSON. The topic subscriber method must accept a protobuf Any
message.
Kalix provides a utility to serialize and deserialize JSON messages based on Jackson.
- Java
-
Kalix provides the
JsonSupport
utility to serialize and deserialize JSON messages.A
proto
definition of an Action that consumes JSON messages and produces JSON messages can look like this:src/main/proto/com/example/json/json_api.protoimport "kalix/annotations.proto"; import "google/protobuf/any.proto"; import "google/protobuf/empty.proto"; option java_outer_classname = "MyServiceApi"; message KeyValue { string key = 1; int32 value = 2; } service MyService { option (kalix.codegen) = { action: {} }; rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "notifications" }; } rpc Produce(KeyValue) returns (google.protobuf.Any) { (2) option (kalix.method).eventing.out = { topic: "notifications" }; } }
1 When consuming JSON messages from a topic the input type must be google.protobuf.Any
.2 When producing a JSON message to a topic the return type must be google.protobuf.Any
.The type_url
in thegoogle.protobuf.Any
must start withjson.kalix.io/
. The suffix of thetype_url
is a type hint of the concrete message type that is encoded. - Scala
-
Kalix provides the
JsonSupport
utility to serialize and deserialize JSON messages. Aproto
definition of an Action that consumes JSON messages and produces JSON messages can look like this:src/main/proto/com/example/json/json_api.protoimport "kalix/annotations.proto"; import "google/protobuf/any.proto"; import "google/protobuf/empty.proto"; message KeyValue { string key = 1; int32 value = 2; } service MyService { option (kalix.codegen) = { action: {} }; rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "notifications" }; } rpc Produce(KeyValue) returns (google.protobuf.Any) { (2) option (kalix.method).eventing.out = { topic: "notifications" }; } }
1 When consuming JSON messages from a topic the input type must be google.protobuf.any.Any
.2 When producing a JSON message to a topic the return type must be google.protobuf.any.Any
.The type_url
in thegoogle.protobuf.any.Any
must start withjson.kalix.io/
. The suffix of thetype_url
is a type hint of the concrete message type that is encoded.
The corresponding implementation class:
- Java
-
src/main/java/com/example/json/MyServiceAction.java
import kalix.javasdk.JsonSupport; import kalix.javasdk.action.ActionCreationContext; import com.google.protobuf.Any; import com.google.protobuf.Empty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyServiceAction extends AbstractMyServiceAction { private static final Logger LOG = LoggerFactory.getLogger(MyServiceAction.class); public MyServiceAction(ActionCreationContext creationContext) {} @Override public Effect<Empty> consume(Any any) { JsonKeyValueMessage jsonMessage = JsonSupport.decodeJson(JsonKeyValueMessage.class, any); (1) LOG.info("Consumed " + jsonMessage); return effects().reply(Empty.getDefaultInstance()); } @Override public Effect<Any> produce(MyServiceApi.KeyValue keyValue) { JsonKeyValueMessage jsonMessage = new JsonKeyValueMessage(keyValue.getKey(), keyValue.getValue()); (2) Any jsonAny = JsonSupport.encodeJson(jsonMessage); (3) return effects().reply(jsonAny); } }
1 Decode the JSON message to a Java class JsonKeyValueMessage
.2 Convert the Protobuf message KeyValue
to a Java classJsonKeyValueMessage
.3 Encode the Java class JsonKeyValueMessage
to JSON. - Scala
-
src/main/scala/com/example/json/MyServiceAction.scala
class MyServiceAction(creationContext: ActionCreationContext) extends AbstractMyServiceAction { private val log = LoggerFactory.getLogger(classOf[MyServiceAction]) override def consume(any: ScalaPbAny): Action.Effect[Empty] = { val jsonMessage = JsonSupport.decodeJson[JsonKeyValueMessage](any) (1) log.info("Consumed {}", jsonMessage) effects.reply(Empty.defaultInstance) } override def produce(keyValue: KeyValue): Action.Effect[ScalaPbAny] = { val jsonMessage = JsonKeyValueMessage(keyValue.key, keyValue.value) (2) val jsonAny = JsonSupport.encodeJson(jsonMessage) (3) effects.reply(jsonAny) } }
1 Decode the JSON message to a Scala class JsonKeyValueMessage
.2 Convert the Protobuf message KeyValue
to a Scala classJsonKeyValueMessage
.3 Encode the Scala class JsonKeyValueMessage
to JSON.
Kalix uses Jackson to serialize JSON.
Text
If the content type starts with text/
it is treated as a string message. The topic subscriber method must accept the google.protobuf.StringValue
message.
- Java
-
A
proto
definition of an Action that consumes String messages can look like this:src/main/proto/com/example/json/json_api.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "MyTopics"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "strings_topic" }; } }
1 google.protobuf.StringValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming text messages from a topic the input type must be google.protobuf.StringValue
. - Scala
-
A
proto
definition of an Action that consumes String messages can look like this:src/main/proto/com/example/topics_action.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "strings_topic" }; } }
1 google.protobuf.StringValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming text messages from a topic the input type must be google.protobuf.StringValue
.
If an action has a return type of StringValue
and publishes to a topic, the events published to the topic will have content-type text/plain; charset=utf-8
.
Bytes
If the content type is application/octet-stream
, no content type is present, or the type is unknown to Kalix the message is treated as a binary message. The topic subscriber method must accept the google.protobuf.BytesValue
message.
- Java
-
A
proto
definition of an Action that consumes binary messages with raw bytes can look like this:src/main/proto/com/example/topics_action.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "MyTopics"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "bytes_topic" }; } }
1 google.protobuf.BytesValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming raw bytes messages from a topic the input type must be google.protobuf.BytesValue
. - Scala
-
A
proto
definition of an Action that consumes binary messages with raw bytes can look like this:src/main/proto/com/example/json/json_api.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "bytes_topic" }; } }
1 google.protobuf.BytesValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming text messages from a topic the input type must be google.protobuf.BytesValue
.
If an action has a return type of BytesValue
and publishes to a topic, the events published to the topic will have content-type application/octet-stream
.
Deployment dependent topic names
It is possible to use environment variables to control the name of the topic that is used for consuming from or producing events to, this is useful for example for using the same image in staging and production deployments but having them interact with separate topics.
Referencing environment variables is done with the syntax ${VAR_NAME}
in the topic
string in eventing.in.topic
or eventing.out.topic
blocks. See kalix service deploy for details on how to set environment variables when deploying a service.
Changing the topic name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.
|
Accessing the Entity ID
- Java
-
For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field
subject
, accessible througheventSubject
in theActionContext
.You can access the
ActionContext
through methodactionContext()
.src/main/java/com/example/actions/CounterJournalToTopicAction.java@Override public Effect<CounterTopicApi.Increased> onIncreased(CounterDomain.ValueIncreased valueIncreased) { Optional<String> counterId = actionContext().eventSubject(); (1) ... }
- Scala
-
For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field
subject
, accessible througheventSubject
in theActionContext
.You can access the
ActionContext
through methodactionContext
.src/main/proto/customer/domain/customer_domain.protooverride def onIncreased(valueIncreased: ValueIncreased): Action.Effect[Increased] = { val counterId = actionContext.eventSubject (1) ... }
Ignoring events
When consuming events, each event must be matched by a Protobuf service method. In case your component is only interested in certain events, you may declare a method to receive all events that are not received by the other methods. If an event type is not handled, the Action will fail. Actions are designed to restart, but since the handler is missing, it will fail again. Therefore, it’s important to define methods for all events or define a catch-all method in case you want to discard some events.
- Java
-
src/main/proto/com/example/actions/counter_topic.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (1) event_sourced_entity: "counter" ignore: true (2) }; }
1 We must annotate it with a (kalix.method).eventing.in
.2 Set ignore: true
option. - Scala
-
src/main/proto/customer/domain/customer_domain.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (1) event_sourced_entity: "counter" ignore: true (2) }; }
1 We must annotate it with a (kalix.method).eventing.in
.2 Set ignore: true
option.
The Ignore
method here is defined as a catch-all because it has input type Any
. Instead of using a catch-all it can be better to define concrete methods for all known event types that should be ignored because then there is no risk of accidentally ignoring events that are added in later evolution of the service.
When adding the ignore: true
annotation the corresponding implementation is not needed in the component. It is more efficient to use ignore: true
than implementing the method with an immediate reply.
Testing the Integration
When a Kalix service relies on a broker, it might be useful to use integration tests to assert that those boundaries work as intended. For such scenarios, you can either:
-
Use TestKit’s mocked topic:
-
this offers a general API to inject messages into topics or read the messages written to another topic, regardless of the specific broker integration you have configured.
-
-
Run an external broker instance:
-
if you’re interested in running your integration tests against a real instance, you need to provide the broker instance yourself by running it in a separate process in your local setup and make sure to disable the use of TestKit’s test broker. Currently, the only external broker supported in integration tests is Google PubSub Emulator.
-
TestKit Mocked Incoming Messages
Following up on the counter entity example used above, let’s consider an example (composed by 2 Actions and 1 Event Sourced entity) as pictured below:
In this example:
-
commands are consumed from an external topic
event-commands
and forwarded to a Counter entity; -
the Counter entity is an Event Sourced Entity and has its events published to another topic
counter-events
.
To test this flow, we will take advantage of the TestKit to be able to push commands into the event-commands
topic and check what messages are produced to topic counter-events
.
- Java
-
src/it/java/com/example/CounterTopicIntegrationTest.java
import kalix.javasdk.testkit.EventingTestKit; import kalix.javasdk.testkit.KalixTestKit; import kalix.javasdk.testkit.junit.jupiter.KalixTestKitExtension; // ... public class CounterTopicIntegrationTest { /** * The test kit starts both the service container and the Kalix Runtime. */ @RegisterExtension public static final KalixTestKitExtension testKit = new KalixTestKitExtension(Main.createKalix(), KalixTestKit.Settings.DEFAULT .withTopicIncomingMessages("counter-commands") .withTopicOutgoingMessages("counter-events")); (1) private EventingTestKit.IncomingMessages commandsTopic; private EventingTestKit.OutgoingMessages eventsTopic; public CounterTopicIntegrationTest() { commandsTopic = testKit.getTopicIncomingMessages("counter-commands"); (2) eventsTopic = testKit.getTopicOutgoingMessages("counter-events"); (3) } @Test public void verifyCounterCommandsAndPublish() { var counterId = "test-topic"; var increaseCmd = CounterApi.IncreaseValue.newBuilder().setCounterId(counterId).setValue(4).build(); var decreaseCmd = CounterApi.DecreaseValue.newBuilder().setCounterId(counterId).setValue(1).build(); commandsTopic.publish(increaseCmd, counterId); (4) commandsTopic.publish(decreaseCmd, counterId); var increasedEvent = eventsTopic.expectOneTyped(CounterTopicApi.Increased.class); (5) var decreasedEvent = eventsTopic.expectOneTyped(CounterTopicApi.Decreased.class); assertEquals(increaseCmd.getValue(), increasedEvent.getPayload().getValue()); (6) assertEquals(decreaseCmd.getValue(), decreasedEvent.getPayload().getValue()); } }
1 Start the TestKit. Set the configuration to mock incoming messages from the counter-commands
topic and mock outgoing messages from thecounter-events
topic.2 Get IncomingMessages
for the topic namedcounter-commands
from the TestKit.3 Get OutgoingMessages
for the topic namedcounter-events
from the TestKit.4 Build 2 commands and publish both to the topic. The counterId
is passed as the subject id of the message.5 Read 2 messages, one at a time. We pass in the expected class type for the next message. 6 Assert the received messages have the same value as the commands sent. - Scala
-
src/test/scala/com/example/CounterServiceIntegrationSpec.scala
import kalix.scalasdk.testkit.{ KalixTestKit, Message } import org.scalatest.BeforeAndAfterEach // ... class CounterServiceIntegrationSpec extends AnyWordSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with ScalaFutures { private val testKit = KalixTestKit( Main.createKalix(), KalixTestKit.DefaultSettings .withTopicIncomingMessages("counter-commands") .withTopicOutgoingMessages("counter-events")).start() (1) private val commandsTopic = testKit.getTopicIncomingMessages("counter-commands") (2) private val eventsTopic = testKit.getTopicOutgoingMessages("counter-events") (3) "CounterService" must { val counterId = "xyz" "handle commands from topic and publishing related events out" in { commandsTopic.publish(IncreaseValue(counterId, 4), counterId) (4) commandsTopic.publish(DecreaseValue(counterId, 1), counterId) val Message(incEvent, _) = eventsTopic.expectOneTyped[Increased] (5) val Message(decEvent, _) = eventsTopic.expectOneTyped[Decreased] incEvent shouldBe Increased(4) (6) decEvent shouldBe Decreased(1) } } override def afterAll(): Unit = { testKit.stop() super.afterAll() } }
1 Start the TestKit. Set the configuration to mock incoming messages from the counter-commands
topic and mock outgoing messages from thecounter-events
topic.2 Get IncomingMessages
for topic namedcounter-commands
from the TestKit.3 Get OutgoingMessages
for topic namedcounter-events
from the TestKit.4 Build 2 commands and publish both to the topic. Note the counterId
is passed as the subject id of the message.5 Read 2 messages, one at a time and assert the received messages values. Note we pass in the expected class type for the next message.
In the example above we take advantage of the TestKit to serialize / deserialize the messages and pass all the required metadata automatically for us. However, the API also offers the possibility to read and write raw bytes, construct your metadata or read multiple messages at once. |
Metadata
Typically, messages are published with associated metadata. If you want to construct your own Metadata
to be consumed by a service or make sure the messages published out of your service have specific metadata attached, you can do so using the TestKit, as shown below.
- Java
-
src/it/java/com/example/CounterTopicIntegrationTest.java
@Test public void verifyCounterCommandsAndPublishWithMetadata() { var counterId = "test-topic-metadata"; var increaseCmd = CounterApi.IncreaseValue.newBuilder().setCounterId(counterId).setValue(10).build(); var metadata = CloudEvent.of( (1) "cmd1", URI.create("CounterTopicIntegrationTest"), increaseCmd.getDescriptorForType().getFullName()) .withSubject(counterId) (2) .asMetadata() .add("Content-Type", "application/protobuf"); (3) commandsTopic.publish(testKit.getMessageBuilder().of(increaseCmd, metadata)); (4) var increasedEvent = eventsTopicWithMeta.expectOneTyped(CounterTopicApi.Increased.class, Duration.ofSeconds(10)); var actualMd = increasedEvent.getMetadata(); (5) assertEquals(counterId, actualMd.asCloudEvent().subject().get()); (6) assertEquals("application/protobuf", actualMd.get("Content-Type").get()); }
1 Build a CloudEvent
object with the 3 required attributes, respectively:id
,source
andtype
.2 Add the subject to which the message is related, that is the counterId
.3 Set the mandatory header "Content-Type" accordingly. 4 Build and publish the message along with its metadata to topic commandsTopic
.5 Upon receiving the message, access the metadata. 6 Assert the headers Content-Type
andce-subject
(every CloudEvent header is prefixed with "ce-") have the expected values. - Scala
-
src/test/scala/com/example/CounterServiceIntegrationSpec.scala
"allow passing and reading metadata for messages" in { val increaseCmd = IncreaseValue(counterId, 4) val md = CloudEvent( (1) id = "cmd1", source = URI.create("CounterServiceIntegrationSpec"), `type` = increaseCmd.companion.javaDescriptor.getFullName) .withSubject(counterId) (2) .asMetadata .add("Content-Type", "application/protobuf"); (3) commandsTopic.publish(Message(increaseCmd, md)) (4) val Message(incEvent, actualMd) = eventsTopicWithMeta.expectOneTyped[Increased] (5) incEvent shouldBe Increased(4) actualMd.get("Content-Type") should contain("application/protobuf") (6) actualMd.asCloudEvent.subject should contain(counterId) }
1 Build a CloudEvent
object with the 3 required attributes, respectively:id
,source
andtype
.2 Add the subject to which the message is related, that is the counterId
.3 Set the mandatory header "Content-Type" accordingly. 4 Build and publish the message along with its metadata to topic commandsTopic
.5 Receive the message of correct type and extract Metadata
.6 Assert the headers Content-Type
andce-subject
(every CloudEvent header is prefixed with "ce-") have the expected values.
One Suite, Multiple Tests
When running multiple test cases under the same test suite and thus using a common TestKit instance, you might face some issues if unconsumed messages from previous tests mess up with the current one. To avoid this, be sure to:
-
have the tests run in sequence, not in parallel;
-
clear the contents of the topics in use before the test.
As an alternative, you can consider using different test suites which will use independent TestKit instances.
- Java
-
src/it/java/com/example/CounterTopicIntegrationTest.java
@BeforeEach (1) public void clearTopics() { eventsTopic.clear(); (2) eventsTopicWithMeta.clear(); }
1 Run this before each test. 2 Clear the topic ignoring any unread messages. - Scala
-
src/test/scala/com/example/CounterServiceIntegrationSpec.scala
override def beforeEach(): Unit = { (1) eventsTopic.clear() (2) eventsTopicWithMeta.clear() }
1 Override method from trait BeforeAndAfterEach
.2 Clear the topic ignoring any unread messages.
Despite the example, you are neither forced to clear all topics nor to do it before each test. You can do it selectively, or you might not even need it depending on your tests and the flows they test. |
External Broker
To run an integration test against a real instance of Google PubSub (or its Emulator) or Kafka, use the TestKit settings to override the default eventing support, as shown below:
- Java
-
private static final KalixTestKitExtension testKit = new KalixTestKitExtension( Main.createKalix(), Settings.DEFAULT.withEventingSupport(EventingSupport.GOOGLE_PUBSUB) );
- Scala
-
private val testKit = KalixTestKit( Main.createKalix(), DefaultSettings.withEventingSupport(GooglePubSub) ).start()