Publishing and Subscribing with Actions

A very common use case when building Microservices is to publish and subscribe to a stream of events. The source of events can be the journal of an event sourced entity, the value entity state changes, a Google Cloud Pub/Sub or Apache Kafka topic for asynchronous messaging between services.

With Actions you can:

  • subscribe to events emitted by an event sourced entity within the same service.

  • subscribe to state changes emitted by a value entity within the same service.

  • subscribe to events from Event Sourced Entities published as service to service eventing

  • subscribe to external events from Google Cloud Pub/Sub or Apache Kafka.

  • publish events to a Google Cloud Pub/Sub or Apache Kafka topic.

Messages are guaranteed to be delivered at least once. This means that receivers must be able to handle duplicate messages.

Publishing Entity events to a Topic

To illustrate how to publish entity events, we will assume the existence of an Event Sourced Counter entity that emits events of types: ValueIncreased and ValueDecreased. We will get the events delivered to an Action, apply some transformation and let them be published to a topic.

In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

Java
src/main/proto/com/example/actions/counter_topic.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";

option java_outer_classname = "CounterTopicApi";

message Increased {
  int32 value = 1;
}

message Decreased {
  int32 value = 1;
}

service CounterJournalToTopic {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc Increase (com.example.domain.ValueIncreased) returns (Increased) { (3)
    option (kalix.method).eventing.in = { (4)
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = { (5)
      topic:  "counter-events"
    };
  }

  rpc IncreaseConditional (com.example.domain.ValueIncreased) returns (Increased) {
    option (kalix.method).eventing.out = {
      topic:  "counter-events"
    };
  }

  rpc Decrease (com.example.domain.ValueDecreased) returns (Decreased) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = {
      topic:  "counter-events"
    };
  }

}
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The Increase method receives the event ValueIncreased and returns the transformed type Increased.
4 The (kalix.method).eventing.in annotation indicates that events from the entity type counter should be delivered to this method (when the type is ValueIncreased).
5 The (kalix.method).eventing.out annotation indicates that the returned value from this method should be published to the topic called counter-events.
Scala
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";
import "google/protobuf/any.proto";

message Increased {
  int32 value = 1;
}

message Decreased {
  int32 value = 1;
}

service CounterJournalToTopic {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc Increase (com.example.domain.ValueIncreased) returns (Increased) { (3)
    option (kalix.method).eventing.in = { (4)
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = { (5)
      topic:  "counter-events"
    };
  }

  rpc IncreaseConditional (com.example.domain.ValueIncreased) returns (Increased) {
    option (kalix.method).eventing.out = {
      topic:  "counter-events"
    };
  }

  rpc Decrease (com.example.domain.ValueDecreased) returns (Decreased) {
    option (kalix.method).eventing.in = {
      event_sourced_entity: "counter"
    };
    option (kalix.method).eventing.out = {
      topic:  "counter-events"
    };
  }

}
1 Import the Counter Domain file containing the definitions of the events. This is typically a proto definition within the same service.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The Increase method receives the event ValueIncreased and returns the transformed type Increased.
4 The (kalix.method).eventing.in annotation indicates that events from the entity type counter should be delivered to this method (when the type is ValueIncreased).
5 The (kalix.method).eventing.out annotation indicates that the returned value from this method should be published to the topic called counter-events.

The class CounterJournalToTopicAction gets generated for us based on the proto file defined above.

Java
src/main/java/com/example/actions/CounterJournalToTopicAction.java
public class CounterJournalToTopicAction extends AbstractCounterJournalToTopicAction {
  public CounterJournalToTopicAction(ActionCreationContext creationContext) {}

  @Override
  public Effect<CounterTopicApi.Increased> increase(CounterDomain.ValueIncreased valueIncreased) {
    CounterTopicApi.Increased increased = (1)
      CounterTopicApi.Increased.newBuilder()
        .setValue(valueIncreased.getValue())
        .build();

    return effects().reply(increased); (2)
  }
}
1 We convert the incoming domain event CounterDomain.ValueIncreased to the outgoing topic API CounterTopicApi.Increased.
2 We use the converted object to build a reply. The CounterTopicApi.Increased message will be published to the topic.
Scala
src/main/scala/com/example/actions/CounterJournalToTopicAction.scala
class CounterJournalToTopicAction(creationContext: ActionCreationContext) extends AbstractCounterJournalToTopicAction {

  override def increase(valueIncreased: ValueIncreased): Action.Effect[Increased] = {
    effects.reply(Increased(valueIncreased.value)) (1)
  }
}
1 We convert the incoming domain event ValueIncreased to the outgoing topic API Increased and return that as a reply.

In this example we have published Protobuf messages to the topic which is convenient if the consuming end is also a Kalix service. For an external consumer Protobuf may not be a supported format, for details on publishing other formats see Handling Serialization.

Subscribing to state changes from a Value Entity

Similar to subscribing to events from an Event Sourced Entity, you can also subscribe to state changes from a Value Entity.

Java
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "StateSubscriptionApi";

service CounterStateSubscription {
  option (kalix.codegen) = {
    action: {} (2)
  };

  rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter" (3)
    };
  }

  rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter"
      handle_deletes: true (4)
    };
  }

}
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities).
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The (kalix.method).eventing.in annotation indicates that state changes from the value entity type counter should be delivered to this method.
4 The second (kalix.method).eventing.in annotation with handle_deletes: true and (required) google.protobuf.Empty as a parameter defines that this method is used for handling value entity deletes.
Scala
src/main/proto/com/example/actions/counter_states_sub.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/domain/counter_domain.proto"; (1)
import "google/protobuf/empty.proto";

service CounterStateSubscription {
  option (kalix.codegen) = {
    action: {} (2)
  };

  rpc OnUpdateState (com.example.domain.CounterState) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter" (3)
    };
  }

  rpc OnDeleteEntity (google.protobuf.Empty) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      value_entity: "counter"
      handle_deletes: true (4)
    };
  }

}
1 Import the Counter Domain from the Value Entity example (see Implementing Value Entities).
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.
3 The (kalix.method).eventing.in annotation indicates that state changes from the value entity type counter should be delivered to this method.
4 The second (kalix.method).eventing.in annotation with handle_deletes: true and (required) google.protobuf.Empty as a parameter defines that this method is used for handling value entity deletes.

Subscribing to a Topic

It’s also possible to subscribe to a Pub/Sub topic. To receive messages from a Pub/Sub topic, annotate a service method in the Protobuf service definition with the option (kalix.method).eventing.in annotation and specify the topic name in the topic section of the annotation.

In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

For illustration purpose, we can add a second Action that consumes from the Pub Sub topic counter-events from the previous example.

Java
src/main/proto/com/example/actions/counter_topic_sub.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/actions/counter_topic.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "CounterTopicSubApi";

service CounterTopicSubscription {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc Increase (com.example.actions.Increased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = { (3)
      topic:  "counter-events"
    };
  }

  rpc Decrease (com.example.actions.Decreased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      topic:  "counter-events"
    };
  }
}
1 Import the Counter Topic types from previous example.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix Maven plugin. This annotation indicates to the code-generation that an Action must be generated.
3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in indicating that the source of events is the topic counter-events.
Scala
src/main/proto/customer/domain/customer_domain.proto
syntax = "proto3";
package com.example.actions;

import "kalix/annotations.proto";
import "com/example/actions/counter_topic.proto"; (1)
import "google/protobuf/empty.proto";

service CounterTopicSubscription {
  option (kalix.codegen) = {
    action: {}  (2)
  };

  rpc Increase (com.example.actions.Increased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = { (3)
      topic:  "counter-events"
    };
  }

  rpc Decrease (com.example.actions.Decreased) returns (google.protobuf.Empty) {
    option (kalix.method).eventing.in = {
      topic:  "counter-events"
    };
  }
}
1 Import the Counter Topic types from previous example.
2 The protobuf option (kalix.codegen) is specific to code-generation as provided by the Kalix sbt plugin. This annotation indicates to the code-generation that an Action must be generated.
3 Define methods for each of the possible incoming messages and annotate them with (kalix.method).eventing.in indicating that the source of events is the topic counter-events.

The class CounterTopicSubscriptionAction gets generated for us based on the proto file defined above.

Java
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
public class CounterTopicSubscriptionAction extends AbstractCounterTopicSubscriptionAction {

  private Logger logger = LoggerFactory.getLogger(getClass());

  public CounterTopicSubscriptionAction(ActionCreationContext creationContext) {}

  @Override
  public Effect<Empty> increase(CounterTopicApi.Increased increased) { (1)
    logger.info("Received increase event: " + increased.toString());
    return effects().reply(Empty.getDefaultInstance());
  }

  @Override
  public Effect<Empty> decrease(CounterTopicApi.Decreased decreased) {
    logger.info("Received increase event: " + decreased.toString());
    return effects().reply(Empty.getDefaultInstance());
  }
}
Scala
src/main/java/com/example/actions/CounterTopicSubscriptionAction.java
class CounterTopicSubscriptionAction(creationContext: ActionCreationContext)
    extends AbstractCounterTopicSubscriptionAction {
  private val logger = LoggerFactory.getLogger(getClass())
  /** Handler for "Increase". */
  override def increase(increased: Increased): Action.Effect[Empty] = {
    logger.info("Received increase event: " + increased.toString())
    effects.reply(Empty.defaultInstance)
  }

  override def decrease(decreased: Decreased): Action.Effect[Empty] = {
    logger.info("Received decrease event: " + decreased.toString())
    effects.reply(Empty.defaultInstance)
  }
}

The events from the topic are delivered to the new Action. The implementation may vary, for this simplified example we are just logging it, but it could a forward to some other component or external service.

Receiving messages from an external Topic

In the example above, we consumed Protobuf messages from a topic that we control ourselves. When consuming an external topic, it’s very likely that the message format is not under your control and is not known by Kalix.

In such case, the Action definition should receive a type depending on the type of the message payload. See Handling Serialization for more information on how to deal with data formats.

Subscribing and acting upon

Another possible usage for Actions is to consume events and act upon.

For example, you may consume events from one entity or from a topic, transform to commands and send to an another entity or an external system. This is similar to the usage explained in Actions as Controller, except that the Action is driven by the flow of incoming events instead of external user requests.

For that purpose, it’s enough to add the (kalix.method).eventing.in and omit the (kalix.method).eventing.out.

Accessing the Entity ID

Java

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field subject, accessible through eventSubject in the ActionContextnew tab.

You can access the ActionContextnew tab through method actionContext().

src/main/java/com/example/actions/CounterJournalToTopicAction.java
@Override
public Effect<CounterTopicApi.Increased> increase(CounterDomain.ValueIncreased valueIncreased) {
  Optional<String> counterId = actionContext().eventSubject(); (1)
  ...
}
Scala

For many use cases, a subscriber to an event log will trigger other services and needs to pass the entity ID to the receiver. The events of an Event Sourced entity, by design, do not include the entity ID, but it is made available to the subscriber via the metadata field subject, accessible through eventSubject in the ActionContextnew tab.

You can access the ActionContextnew tab through method actionContext.

src/main/proto/customer/domain/customer_domain.proto
override def increase(valueIncreased: ValueIncreased): Action.Effect[Increased] = {
  val counterId = actionContext.eventSubject (1)
  ...
}

Ignoring events

When consuming events, each event must be matched by a Protobuf service method. In case your component is only interested in certain events, you may declare a method to receive all events that are not received by the other methods. If an event type is not handled, the Action will fail. Actions are designed to restart, but since the handler is missing, it will fail again. Therefore, it’s important to define methods for all events or define a catch-all method in case you want to discard some events.

Java
src/main/proto/com/example/actions/counter_topic.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) {
  option (kalix.method).eventing.in = { (1)
    event_sourced_entity: "counter"
    ignore: true (2)
  };
}
1 We must annotate it with a (kalix.method).eventing.in.
2 Set ignore: true option.
Scala
src/main/proto/customer/domain/customer_domain.proto
rpc Ignore(google.protobuf.Any) returns (google.protobuf.Empty) {
  option (kalix.method).eventing.in = { (1)
    event_sourced_entity: "counter"
    ignore: true (2)
  };
}
1 We must annotate it with a (kalix.method).eventing.in.
2 Set ignore: true option.

The Ignore method here is defined as a catch-all because it has input type Any. Instead of using a catch-all it can be better to define concrete methods for all known event types that should be ignored because then there is no risk of accidentally ignoring events that are added in later evolution of the service.

When adding the ignore: true annotation the corresponding implementation is not needed in the component. It is more efficient to use ignore: true than implementing the method with an immediate reply.

Deployment dependent topic names

It is possible to use environment variables to control the name of the topic that is used for consuming from or producing events to, this is useful for example for using the same image in staging and production deployments but having them interact with separate topics.

Referencing environment variables is done with the syntax ${VAR_NAME} in the topic string in eventing.in.topic or eventing.out.topic blocks.

Note that if changing the topic name after it has once been deployed for an event consumer means the consumer will start over from the beginning of the topic.

See kalix service deploy for details on how to set environment variables when deploying a service.