Serialization options for Java and Scala services

You do not need to handle serialization for messages. Kalix functions serve gRPC interfaces, and the input and output messages are protobuf messages that get serialized to the protobuf format.

The gRPC services are also exposed as HTTP endpoints with JSON messages. See Transcoding HTTP.

When consuming Messages from Topics

When a message arrives from a topic, Kalix detects the message payload type based on the Content-Type or ce-datacontenttype header or attribute of the message. If there is no such metadata, the content is handled as raw bytes.

If the content type starts with application/protobuf, application/x-protobuf or application/vnd.google.protobuf the payload is expected to also have a ce-type header or attribute identifying the concrete protobuf message type. Such messages will be decoded into the described message type before being handed to a topic subscriber method, which must accept that specific message type.

If the publishing service is also a Kalix service, this is handled transparently for you as shown in Publishing and Subscribing with Actions.

For messages that are consumed from or published to topics when interacting with external services, it can be a requirement to use a format other than protobuf. Other supported message formats include JSON, text, or raw bytes.

In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers

JSON

If the incoming content type starts with application/json or application/…​+json and possibly a ce-type field identifying a specific type object in the JSON. The topic subscriber method must accept a protobuf Any message.

Kalix provides a utility to serialize and deserialize JSON messages based on Jackson.

Java

Kalix provides the JsonSupport new tab utility to serialize and deserialize JSON messages.

A proto definition of an Action that consumes JSON messages and produces JSON messages can look like this:

src/main/proto/com/example/json/json_api.proto
import "kalix/annotations.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "MyServiceApi";

message KeyValue {
  string key = 1;
  int32 value = 2;
}

service MyService {
  option (kalix.codegen) = {
    action: {}
  };

  rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "notifications"
    };
  }

  rpc Produce(KeyValue) returns (google.protobuf.Any) { (2)
    option (kalix.method).eventing.out = {
      topic:  "notifications"
    };
  }

}
1 When consuming JSON messages from a topic the input type must be google.protobuf.Any.
2 When producing a JSON message to a topic the return type must be google.protobuf.Any.
The type_url in the google.protobuf.Any must start with json.kalix.io/. The suffix of the type_url is a type hint of the concrete message type that is encoded.
Scala

Kalix provides the JsonSupport new tab utility to serialize and deserialize JSON messages. A proto definition of an Action that consumes JSON messages and produces JSON messages can look like this:

src/main/proto/com/example/json/json_api.proto
import "kalix/annotations.proto";
import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";

message KeyValue {
  string key = 1;
  int32 value = 2;
}

service MyService {
  option (kalix.codegen) = {
    action: {}
  };

  rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "notifications"
    };
  }

  rpc Produce(KeyValue) returns (google.protobuf.Any) { (2)
    option (kalix.method).eventing.out = {
      topic:  "notifications"
    };
  }

}
1 When consuming JSON messages from a topic the input type must be google.protobuf.any.Any.
2 When producing a JSON message to a topic the return type must be google.protobuf.any.Any.
The type_url in the google.protobuf.any.Any must start with json.kalix.io/. The suffix of the type_url is a type hint of the concrete message type that is encoded.

The corresponding implementation class:

Java
src/main/java/com/example/json/MyServiceAction.java
import kalix.javasdk.JsonSupport;
import kalix.javasdk.action.ActionCreationContext;
import com.google.protobuf.Any;
import com.google.protobuf.Empty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyServiceAction extends AbstractMyServiceAction {

  private static final Logger LOG = LoggerFactory.getLogger(MyServiceAction.class);

  public MyServiceAction(ActionCreationContext creationContext) {}

  @Override
  public Effect<Empty> consume(Any any) {
    JsonKeyValueMessage jsonMessage =
        JsonSupport.decodeJson(JsonKeyValueMessage.class, any); (1)
    LOG.info("Consumed " + jsonMessage);
    return effects().reply(Empty.getDefaultInstance());
  }

  @Override
  public Effect<Any> produce(MyServiceApi.KeyValue keyValue) {
    JsonKeyValueMessage jsonMessage =
        new JsonKeyValueMessage(keyValue.getKey(), keyValue.getValue()); (2)
    Any jsonAny = JsonSupport.encodeJson(jsonMessage); (3)
    return effects().reply(jsonAny);
  }
}
1 Decode the JSON message to a Java class JsonKeyValueMessage.
2 Convert the Protobuf message KeyValue to a Java class JsonKeyValueMessage.
3 Encode the Java class JsonKeyValueMessage to JSON.
Scala
src/main/scala/com/example/json/MyServiceAction.scala
class MyServiceAction(creationContext: ActionCreationContext) extends AbstractMyServiceAction {

  private val log = LoggerFactory.getLogger(classOf[MyServiceAction])

  override def consume(any: ScalaPbAny): Action.Effect[Empty] = {
    val jsonMessage = JsonSupport.decodeJson[JsonKeyValueMessage](any) (1)
    log.info("Consumed {}", jsonMessage)
    effects.reply(Empty.defaultInstance)
  }

  override def produce(keyValue: KeyValue): Action.Effect[ScalaPbAny] = {
    val jsonMessage = JsonKeyValueMessage(keyValue.key, keyValue.value) (2)
    val jsonAny = JsonSupport.encodeJson(jsonMessage) (3)
    effects.reply(jsonAny)
  }
}
1 Decode the JSON message to a Scala class JsonKeyValueMessage.
2 Convert the Protobuf message KeyValue to a Scala class JsonKeyValueMessage.
3 Encode the Scala class JsonKeyValueMessage to JSON.

Kalix uses Jackson to serialize JSON.

Text

If the content type starts with text/ it is treated as a string message. The topic subscriber method must accept the google.protobuf.StringValue message.

Java

A proto definition of an Action that consumes String messages can look like this:

src/main/proto/com/example/json/json_api.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "MyTopics";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "strings_topic"
    };
  }
}
1 google.protobuf.StringValue requires the import google/protobuf/wrappers.proto.
2 When consuming text messages from a topic the input type must be google.protobuf.StringValue.
Scala

A proto definition of an Action that consumes String messages can look like this:

src/main/proto/com/example/topics_action.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "strings_topic"
    };
  }
}
1 google.protobuf.StringValue requires the import google/protobuf/wrappers.proto.
2 When consuming text messages from a topic the input type must be google.protobuf.StringValue.

If an action has a return type of StringValue and publishes to a topic, the events published to the topic will have content-type text/plain; charset=utf-8.

Bytes

If the content type is application/octet-stream, no content type is present, or the type is unknown to Kalix the message is treated as a binary message. The topic subscriber method must accept the google.protobuf.BytesValue message.

Java

A proto definition of an Action that consumes binary messages with raw bytes can look like this:

src/main/proto/com/example/topics_action.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";

option java_outer_classname = "MyTopics";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "bytes_topic"
    };
  }
}
1 google.protobuf.BytesValue requires the import google/protobuf/wrappers.proto.
2 When consuming raw bytes messages from a topic the input type must be google.protobuf.BytesValue.
Scala

A proto definition of an Action that consumes binary messages with raw bytes can look like this:

src/main/proto/com/example/json/json_api.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/wrappers.proto"; (1)
import "google/protobuf/empty.proto";


service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2)
    option (kalix.method).eventing.in = {
      topic: "bytes_topic"
    };
  }
}
1 google.protobuf.BytesValue requires the import google/protobuf/wrappers.proto.
2 When consuming text messages from a topic the input type must be google.protobuf.BytesValue.

If an action has a return type of BytesValue and publishes to a topic, the events published to the topic will have content-type application/octet-stream.

Receiving CloudEvents

Kalix uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.

Describing the structure of the message payload is the CloudEvents feature most important to Kalix.

An example of that is the capability to send serialized Protobuf messages and have Kalix deserialize them accordingly.

To allow proper reading of Protobuf messages from topics, the messages need to specify the message attributes:

  • Content-Type = application/protobuf

  • ce-specversion = 1.0

  • ce-type = fully qualified protobuf message name (e.g., shopping.cart.api.TopicOperation)

(The ce- prefixed attributes are part of the CloudEvents specification.)

The Protobuf rpc declaration uses the expected Protobuf message type and specifies the topic to subscribe to. You’ll normally want to share the exact Protobuf message declaration with the sending system.

Java

A proto definition of an Action that consumes CloudEvent messages can look like this:

src/main/proto/com/example/topics_action.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/empty.proto";

option java_outer_classname = "MyTopics";

message TopicOperation {
  string operation = 1;
}

service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "shopping-cart-protobuf-cloudevents"
    };
  }
}
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata.
Scala

A proto definition of an Action that consumes CloudEvent messages can look like this:

src/main/proto/com/example/json/json_api.proto
syntax = "proto3";
package com.example;

import "kalix/annotations.proto";
import "google/protobuf/empty.proto";

message TopicOperation {
  string operation = 1;
}

service MyTopicsAction {
  option (kalix.codegen) = {
    action: {}
  };

  rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1)
    option (kalix.method).eventing.in = {
      topic:  "shopping-cart-protobuf-cloudevents"
    };
  }
}
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata.