Publishing and Subscribing with Actions
A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.
With Actions you can:
-
Subscribe to events emitted by an event sourced entity within the same service.
-
Subscribe to state changes emitted by a value entity within the same service.
-
Subscribe to events from Event Sourced Entities published as service to service eventing
-
Subscribe to external events from Google Cloud Pub/Sub or Apache Kafka.
-
Publish events to a Google Cloud Pub/Sub or Apache Kafka topic.
Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.
Publishing Entity events to a Topic
To illustrate how to publish entity events, let’s assume the existence of an Event Sourced Counter entity that emits events of types: ValueIncreased
and ValueDecreased
. You will get the events delivered to an Action, apply some transformation and let them be published to a topic.
You can subscribe an Action to events from an Event Source Entity by annotating a method with @Subscribe.EventSourcedEntity
and specifying the class of the entity. The input type of the method must be the same of the events generated by the entity.
To publish the events you need to add the annotation @Publish.Topic
to the method subscribed to the events and add the name of the topic.
import kalix.javasdk.action.Action;
import kalix.javasdk.annotations.Publish;
import kalix.javasdk.annotations.Subscribe;
public class CounterJournalToTopicAction extends Action {
@Subscribe.EventSourcedEntity(value = Counter.class) (1)
@Publish.Topic("counter-events") (2)
public Action.Effect<CounterEvent> onValueIncreased(ValueIncreased event){ (3)
return effects().reply(event); (4)
}
}
1 | Subscribing to the events from the Counter. |
2 | Publishing to a topic name 'counter-events'. |
3 | Setting the method input type to the events produced by the counter. |
4 | Any return Action.Effect response is valid. |
The messages stored in the topic are serialized as JSON. You cannot change this serialization. |
Subscribing to a Value Entity
You can subscribe an Action to events from a Value Entity. It works the same as an Event Sourced Entity except for the annotation. To receive messages from the entity, annotate a service method @Subscribe.ValueEntiy
and specify the class of the entity.
Subscribing to a Topic
To receive messages from a Google Cloud Pub/Sub or Apache Kafka topic, annotate the service method @Subscribe.Topic
and specify the topic name.
In the following example the events from the topic are delivered to the Action and logged.
import kalix.javasdk.action.Action;
import kalix.javasdk.annotations.Subscribe;
public class CounterTopicSubscriptionAction extends Action {
private Logger logger = LoggerFactory.getLogger(CounterTopicSubscriptionAction.class);
@Subscribe.Topic(value = "counter-events") (1)
public Action.Effect<Confirmed> onValueIncreased(ValueIncreased event){ (2)
logger.info("Received increased event: " + event.toString());
return effects().reply(Confirmed.instance); (3)
}
@Subscribe.Topic(value = "counter-events") (4)
public Action.Effect<Confirmed> onValueMultiplied(ValueMultiplied event){ (5)
logger.info("Received multiplied event: " + event.toString());
return effects().reply(Confirmed.instance); (6)
}
}
1 | Subscribing to the ValueIncreased from topic 'counter-events'. |
2 | Setting the method input type to the events produced by the counter. |
3 | Any return is valid. |
4 | Subscribing to the ValueMultiplied from topic 'counter-events'. |
5 | Setting the method input type to the events produced by the counter. |
6 | Any return is valid. |
The events from the topic are delivered to the Action. The implementation may vary: for this simplified example you are just logging it, but it could be a forward to some other component or external service.
The return value of the method is an Action.Effect
with message Confirmed
, but can be any other of type Action.Effect<>
if the return type of the method defines it. The Kalix framework needs the type Effect
to ensure that the event was successfully processed. If no exception is thrown and the method returns a effects().reply
, the framework assumes that the event was successfully processed and marks it as such. This allows the next event to be sent to the subscribing method.
However, if an exception is raised and not handled, or the method return effects().error()
this action will not process any more events until the necessary handling of the current event is added such its return is a reply
. Otherwise, it will raise the same error over and over again until the application is fixed and restarted.
By default, Kalix assumes the messages in the topic were serialized as JSON and as such, deserializes them into the input type of your method. You can find more about this in Handling Serialization. |
Receiving messages from an external Topic
In the above example, you consumed JSON messages from a topic that you control. If you are consuming an external topic, the message format may not be under your control and may not be JSON. If this is the case the Java SDK can’t consume from that topic.
Type level annotations for subscribing
You can subscribe to a topic or an Event Source Entity by adding @Subscribe
as a type level annotation, at the top of the class. This provides additional functionality for subscribing: all methods returning Action.Effect
are selected to process incoming events. The Action will fail if it receives an event for which there is no method handler, unless the subscription is set with ignoreUnknown = true
.
In the following example you can take a look at how the Action is configured to ignore unknown messages because it only has a method handler for ValueIncrease
, while it could also receive a ValueMultiplied
.
import kalix.javasdk.action.Action;
import kalix.javasdk.annotations.Subscribe;
@Subscribe.EventSourcedEntity(value = Counter.class, ignoreUnknown = true) (1)
public class SubscribeTypeLevelAction extends Action {
private Logger logger = LoggerFactory.getLogger(SubscribeTypeLevelAction.class);
public Action.Effect<Confirmed> onIncrease(ValueIncreased event){ (2)
logger.info("Received increased event: " + event.toString());
return effects().reply(Confirmed.instance); (3)
}
}
1 | Set to ignore unknown events. |
2 | Only processing ValueIncreased events. |
3 | Any return is valid. |
If you don’t add ignoreUnknown=true
, the action would fail when processing a ValueMultiplied
. The default is false.
Subscribing and acting upon
Another possible usage for Actions is to consume events and act upon.
For example, you may consume events from one entity or from a topic, transform to commands and send to an another entity or an external system. This is similar to the usage explained in Actions as Controller, except that the Action is driven by the flow of incoming events instead of external user requests.
Accessing the Entity ID
For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field subject
, accessible through eventSubject
in the ActionContext
.
You can access the ActionContext
through method
actionContext()
.