Publishing and Subscribing with Actions
A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.
With Actions you can:
-
subscribe to events emitted by an event sourced entity within the same service.
-
subscribe to state changes emitted by a value entity within the same service.
-
subscribe to external events from Google Cloud Pub/Sub or Apache Kafka.
-
publish events to a Google Cloud Pub/Sub or Apache Kafka topic.
Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.
Publishing Entity events to a Topic
The Event Sourced Entity journal contains events that capture all state changes. By subscribing to the journal, with the Event Sourced Entity type name, another component can receive all events emitted of that type.
Subscribing to state changes from an Entity
To subscribe to an Event Sourced Entity log, define Protobuf rpc methods for each Journal event that you want to receive. Annotate these methods with the (kalix.method).eventing
annotation and specify the Entity type name of the Event Sourced Entity.
syntax = "proto3";
package shopping.product.actions;
import "kalix/annotations.proto";
import "cart/shopping_cart_domain.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";
service ToProductPopularityService {
(1)
rpc ForwardAdded(shopping.cart.domain.ItemAdded) returns (google.protobuf.Empty) {
option (kalix.method).eventing.in = { (2)
event_sourced_entity: "eventsourced-shopping-cart" (3)
};
}
}
1 | create rpc methods for all Protobuf types in the journal |
2 | annotate the methods with (kalix.method).eventing |
3 | specify the Event Sourced Entity’s type name as journal source |
There is nothing specific required in the implementation of these methods. The implementation usually is an Action that forwards a converted message to a different component (e.g. an Event Sourced Entity).
Subscribing to a Topic
It’s also possible to subscribe to a Pub/Sub topic. To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the option (kalix.method).eventing.in
annotation and specify the topic name in the topic
section of the annotation.
In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers |
syntax = "proto3";
package shopping.cart.actions;
import "kalix/annotations.proto";
import "cart/shopping_cart_domain.proto";
import "google/protobuf/empty.proto";
service ShoppingCartAnalyticsService {
// get ItemAdded from the topic
rpc ProcessAdded(shopping.cart.domain.ItemAdded) returns (google.protobuf.Empty) {
option (kalix.method).eventing.in = { (1)
topic: "shopping-cart-events" (2)
};
}
}
1 | annotate the Protobuf rpc method with (kalix.method).eventing |
2 | use in and topic to subscribe to a topic |
There is nothing specific required in the implementation of ProcessAdded
. The implementation will in most cases be an Action and forward a converted message to a different component (eg. an Event Sourced Entity).
Receiving messages from an external Topic
In the example above, we consumed Protobuf messages from a topic that we control ourselves. When consuming an external topic, it’s very likely that the message format is not under your control and is not known by Kalix.
In such case, the Action definition should receive a type depending on the type of the message payload. See Handling Serialization for more information on how to deal with data formats.
Subscribing and acting upon
Another possible usage for Actions is to consume events and act upon.
For example, you may consume events from one entity or from a topic, transform to commands and send to an another entity or an external system. This is similar to the usage explained in Forwarding and effects, except that the Action is driven by the flow of incoming events instead of external user requests.
For that purpose, it’s enough to add the (kalix.method).eventing.in
and omit the (kalix.method).eventing.out
.
Accessing the Entity ID
For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the CloudEvent metadata field subject
, via the ActionContext and the CloudEvent metadata:
context.metadata().asCloudEvent().subject()
Ignoring events
When listening to an event log, all events emitted by the Event Sourced entity must be matched by a Protobuf service method. In case your component is interested only in certain events, you may declare a method to receive all events that are not received by the other methods.
// handle other events which are not managed above
rpc CatchOthers(google.protobuf.Any) returns (google.protobuf.Empty) {
option (kalix.method).eventing.in = {
event_sourced_entity: "eventsourced-shopping-cart"
};
}
The corresponding implementation must exist in the component.