Publishing and Subscribing with Actions
A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.
With Actions you can:
-
subscribe to events emitted by an event sourced entity within the same service.
-
subscribe to state changes emitted by a value entity within the same service.
-
subscribe to events from Event Sourced Entities published as service to service eventing
-
subscribe to external events from Google Cloud Pub/Sub or Apache Kafka.
-
publish events to a Google Cloud Pub/Sub or Apache Kafka topic.
Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.
Publishing Entity events to a Topic
To illustrate how to publish entity events, we will assume the existence of an Event Sourced Counter entity that emits events of types: ValueIncreased
and ValueDecreased
. We will get the events delivered to an Action, apply some transformation and let them be published to a topic.
In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers |
- Java
-
src/main/proto/com/example/actions/counter_topic.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; import "google/protobuf/any.proto"; option java_outer_classname = "CounterTopicApi"; message Increased { int32 value = 1; } message Decreased { int32 value = 1; } service CounterJournalToTopic { option (kalix.codegen) = { action: {} (2) }; rpc Increase (com.example.domain.ValueIncreased) returns (Increased) { (3) option (kalix.method).eventing.in = { (4) event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { (5) topic: "counter-events" }; } rpc IncreaseConditional (com.example.domain.ValueIncreased) returns (Increased) { option (kalix.method).eventing.out = { topic: "counter-events" }; } rpc Decrease (com.example.domain.ValueDecreased) returns (Decreased) { option (kalix.method).eventing.in = { event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { topic: "counter-events" }; } }
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The Increase
method receives the eventValueIncreased
and returns the transformed typeIncreased
.4 The (kalix.method).eventing.in
annotation indicates that events from the entity typecounter
should be delivered to this method (when the type isValueIncreased
).5 The (kalix.method).eventing.out
annotation indicates that the returned value from this method should be published to the topic calledcounter-events
. - Scala
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; import "google/protobuf/any.proto"; message Increased { int32 value = 1; } message Decreased { int32 value = 1; } service CounterJournalToTopic { option (kalix.codegen) = { action: {} (2) }; rpc Increase (com.example.domain.ValueIncreased) returns (Increased) { (3) option (kalix.method).eventing.in = { (4) event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { (5) topic: "counter-events" }; } rpc IncreaseConditional (com.example.domain.ValueIncreased) returns (Increased) { option (kalix.method).eventing.out = { topic: "counter-events" }; } rpc Decrease (com.example.domain.ValueDecreased) returns (Decreased) { option (kalix.method).eventing.in = { event_sourced_entity: "counter" }; option (kalix.method).eventing.out = { topic: "counter-events" }; } }
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The Increase
method receives the eventValueIncreased
and returns the transformed typeIncreased
.4 The (kalix.method).eventing.in
annotation indicates that events from the entity typecounter
should be delivered to this method (when the type isValueIncreased
).5 The (kalix.method).eventing.out
annotation indicates that the returned value from this method should be published to the topic calledcounter-events
.
The class CounterJournalToTopicAction
gets generated for us based on the proto file defined above.
- Java
-
src/main/java/com/example/actions/CounterJournalToTopicAction.java
public class CounterJournalToTopicAction extends AbstractCounterJournalToTopicAction { public CounterJournalToTopicAction(ActionCreationContext creationContext) {} @Override public Effect<CounterTopicApi.Increased> increase(CounterDomain.ValueIncreased valueIncreased) { CounterTopicApi.Increased increased = (1) CounterTopicApi.Increased.newBuilder() .setValue(valueIncreased.getValue()) .build(); return effects().reply(increased); (2) } }
1 We convert the incoming domain event CounterDomain.ValueIncreased
to the outgoing topic APICounterTopicApi.Increased
.2 We use the converted object to build a reply. The CounterTopicApi.Increased
message will be published to the topic. - Scala
-
src/main/scala/com/example/actions/CounterJournalToTopicAction.scala
class CounterJournalToTopicAction(creationContext: ActionCreationContext) extends AbstractCounterJournalToTopicAction { override def increase(valueIncreased: ValueIncreased): Action.Effect[Increased] = { effects.reply(Increased(valueIncreased.value)) (1) } }
1 We convert the incoming domain event ValueIncreased
to the outgoing topic APIIncreased
and return that as a reply.
In this example we have published Protobuf messages to the topic which is convenient if the consuming end is also a Kalix service. For an external consumer Protobuf may not be a supported format, for details on publishing other formats see Handling Serialization.
Subscribing to state changes from a Value Entity
Similar to subscribing to events from an Event Sourced Entity, you can also subscribe to state changes from a Value Entity.
- Java
-
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "StateSubscriptionApi"; service CounterStateSubscription { option (kalix.codegen) = { action: {} (2) }; rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" (3) }; } rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" handle_deletes: true (4) }; } }
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities). 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The (kalix.method).eventing.in
annotation indicates that state changes from the value entity typecounter
should be delivered to this method.4 The second (kalix.method).eventing.in
annotation withhandle_deletes: true
and (required)google.protobuf.Empty
as a parameter defines that this method is used for handling value entity deletes. - Scala
-
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/domain/counter_domain.proto"; (1) import "google/protobuf/empty.proto"; service CounterStateSubscription { option (kalix.codegen) = { action: {} (2) }; rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" (3) }; } rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { value_entity: "counter" handle_deletes: true (4) }; } }
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities). 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated. 3 The (kalix.method).eventing.in
annotation indicates that state changes from the value entity typecounter
should be delivered to this method.4 The second (kalix.method).eventing.in
annotation withhandle_deletes: true
and (required)google.protobuf.Empty
as a parameter defines that this method is used for handling value entity deletes.
Subscribing to a Topic
It’s also possible to subscribe to a Pub/Sub topic. To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the option (kalix.method).eventing.in
annotation and specify the topic name in the topic
section of the annotation.
In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers |
For illustration purpose, we can add a second Action that consumes from the Pub Sub topic counter-events
from the previous example.
- Java
-
src/main/proto/com/example/actions/counter_topic_sub.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/actions/counter_topic.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "CounterTopicSubApi"; service CounterTopicSubscription { option (kalix.codegen) = { action: {} (2) }; rpc Increase (com.example.actions.Increased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (3) topic: "counter-events" }; } rpc Decrease (com.example.actions.Decreased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { topic: "counter-events" }; } }
1 Import the Counter Topic types from previous example. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated. 3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in
indicating that the source of events is the topiccounter-events
. - Scala
-
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3"; package com.example.actions; import "kalix/annotations.proto"; import "com/example/actions/counter_topic.proto"; (1) import "google/protobuf/empty.proto"; service CounterTopicSubscription { option (kalix.codegen) = { action: {} (2) }; rpc Increase (com.example.actions.Increased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (3) topic: "counter-events" }; } rpc Decrease (com.example.actions.Decreased) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { topic: "counter-events" }; } }
1 Import the Counter Topic types from previous example. 2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated. 3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in
indicating that the source of events is the topiccounter-events
.
The class CounterTopicSubscriptionAction
gets generated for us based on the proto file defined above.
- Java
-
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
public class CounterTopicSubscriptionAction extends AbstractCounterTopicSubscriptionAction { private Logger logger = LoggerFactory.getLogger(getClass()); public CounterTopicSubscriptionAction(ActionCreationContext creationContext) {} @Override public Effect<Empty> increase(CounterTopicApi.Increased increased) { (1) logger.info("Received increase event: " + increased.toString()); return effects().reply(Empty.getDefaultInstance()); } @Override public Effect<Empty> decrease(CounterTopicApi.Decreased decreased) { logger.info("Received increase event: " + decreased.toString()); return effects().reply(Empty.getDefaultInstance()); } }
- Scala
-
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
class CounterTopicSubscriptionAction(creationContext: ActionCreationContext) extends AbstractCounterTopicSubscriptionAction { private val logger = LoggerFactory.getLogger(getClass()) /** Handler for "Increase". */ override def increase(increased: Increased): Action.Effect[Empty] = { logger.info("Received increase event: " + increased.toString()) effects.reply(Empty.defaultInstance) } override def decrease(decreased: Decreased): Action.Effect[Empty] = { logger.info("Received decrease event: " + decreased.toString()) effects.reply(Empty.defaultInstance) } }
The events from the topic are delivered to the new Action. The implementation may vary, for this simplified example we are just logging it, but it could a forward to some other component or external service.
Receiving messages from an external Topic
In the example above, we consumed Protobuf messages from a topic that we control ourselves. When consuming an external topic, it’s very likely that the message format is not under your control and is not known by Kalix.
In such case, the Action definition should receive a type depending on the type of the message payload. See Handling Serialization for more information on how to deal with data formats.
Subscribing and acting upon
Another possible usage for Actions is to consume events and act upon.
For example, you may consume events from one entity or from a topic, transform to commands and send to an another entity or an external system. This is similar to the usage explained in Actions as Controller, except that the Action is driven by the flow of incoming events instead of external user requests.
For that purpose, it’s enough to add the (kalix.method).eventing.in
and omit the (kalix.method).eventing.out
.
Accessing the Entity ID
- Java
-
For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field
subject
, accessible througheventSubject
in theActionContext
.
You can access the
ActionContext
through method
actionContext()
.src/main/java/com/example/actions/CounterJournalToTopicAction.java@Override public Effect<CounterTopicApi.Increased> increase(CounterDomain.ValueIncreased valueIncreased) { Optional<String> counterId = actionContext().eventSubject(); (1) ... }
- Scala
-
For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field
subject
, accessible througheventSubject
in theActionContext
.
You can access the
ActionContext
through method
actionContext
.src/main/proto/customer/domain/customer_domain.protooverride def increase(valueIncreased: ValueIncreased): Action.Effect[Increased] = { val counterId = actionContext.eventSubject (1) ... }
Ignoring events
When consuming events, each event must be matched by a Protobuf service method. In case your component is only interested in certain events, you may declare a method to receive all events that are not received by the other methods. If an event type is not handled, the Action will fail. Actions are designed to restart, but since the handler is missing, it will fail again. Therefore, it’s important to define methods for all events or define a catch-all method in case you want to discard some events.
- Java
-
src/main/proto/com/example/actions/counter_topic.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (1) event_sourced_entity: "counter" ignore: true (2) }; }
1 We must annotate it with a (kalix.method).eventing.in
.2 Set ignore: true
option. - Scala
-
src/main/proto/customer/domain/customer_domain.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) { option (kalix.method).eventing.in = { (1) event_sourced_entity: "counter" ignore: true (2) }; }
1 We must annotate it with a (kalix.method).eventing.in
.2 Set ignore: true
option.
The Ignore
method here is defined as a catch-all because it has input type Any
. Instead of using a catch-all it can be better to define concrete methods for all known event types that should be ignored because then there is no risk of accidentally ignoring events that are added in later evolution of the service.
When adding the ignore: true
annotation the corresponding implementation is not needed in the component. It is more efficient to use ignore: true
than implementing the method with an immediate reply.
Deployment dependent topic names
It is possible to use environment variables to control the name of the topic that is used for consuming from or producing events to, this is useful for example for using the same image in staging and production deployments but having them interact with separate topics.
Referencing environment variables is done with the syntax ${VAR_NAME}
in the topic
string in eventing.in.topic
or eventing.out.topic
blocks.
Note that if changing the topic
name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.
See kalix service deploy for details on how to set environment variables when deploying a service.