Serialization options for Java and Scala services
You do not need to handle serialization for messages. Kalix functions serve gRPC interfaces, and the input and output messages are protobuf
messages that get serialized to the protobuf
format.
The gRPC services are also exposed as HTTP endpoints with JSON messages. See Transcoding HTTP.
When consuming Messages from Topics
When a message arrives from a topic, Kalix detects the message payload type based on the Content-Type
or ce-datacontenttype
header or attribute of the message. If there is no such metadata, the content is handled as raw bytes.
If the content type starts with application/protobuf
, application/x-protobuf
or application/vnd.google.protobuf
the payload is expected to also have a ce-type
header or attribute identifying the concrete protobuf message type. Such messages will be decoded into the described message type before being handed to a topic subscriber method, which must accept that specific message type.
If the publishing service is also a Kalix service, this is handled transparently for you as shown in Publishing and Subscribing with Actions.
For messages that are consumed from or published to topics when interacting with external services, it can be a requirement to use a format other than protobuf
. Other supported message formats include JSON, text, or raw bytes.
In the Protobuf descriptors, only topic names are referenced and no additional details about how to connect to the topics are needed. When deploying the application there must be a broker configuration in the Kalix project, with credentials and details on how connect to the broker. For details about configuring a broker see Configure message brokers |
JSON
If the incoming content type starts with application/json
or application/…+json
and possibly a ce-type
field identifying a specific type object in the JSON. The topic subscriber method must accept a protobuf Any
message.
Kalix provides a utility to serialize and deserialize JSON messages based on Jackson.
- Java
-
Kalix provides the
JsonSupport
utility to serialize and deserialize JSON messages.
A
proto
definition of an Action that consumes JSON messages and produces JSON messages can look like this:src/main/proto/com/example/json/json_api.protoimport "kalix/annotations.proto"; import "google/protobuf/any.proto"; import "google/protobuf/empty.proto"; option java_outer_classname = "MyServiceApi"; message KeyValue { string key = 1; int32 value = 2; } service MyService { option (kalix.codegen) = { action: {} }; rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "notifications" }; } rpc Produce(KeyValue) returns (google.protobuf.Any) { (2) option (kalix.method).eventing.out = { topic: "notifications" }; } }
1 When consuming JSON messages from a topic the input type must be google.protobuf.Any
.2 When producing a JSON message to a topic the return type must be google.protobuf.Any
.The type_url
in thegoogle.protobuf.Any
must start withjson.kalix.io/
. The suffix of thetype_url
is a type hint of the concrete message type that is encoded. - Scala
-
Kalix provides the
JsonSupport
utility to serialize and deserialize JSON messages. A
proto
definition of an Action that consumes JSON messages and produces JSON messages can look like this:src/main/proto/com/example/json/json_api.protoimport "kalix/annotations.proto"; import "google/protobuf/any.proto"; import "google/protobuf/empty.proto"; message KeyValue { string key = 1; int32 value = 2; } service MyService { option (kalix.codegen) = { action: {} }; rpc Consume(google.protobuf.Any) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "notifications" }; } rpc Produce(KeyValue) returns (google.protobuf.Any) { (2) option (kalix.method).eventing.out = { topic: "notifications" }; } }
1 When consuming JSON messages from a topic the input type must be google.protobuf.any.Any
.2 When producing a JSON message to a topic the return type must be google.protobuf.any.Any
.The type_url
in thegoogle.protobuf.any.Any
must start withjson.kalix.io/
. The suffix of thetype_url
is a type hint of the concrete message type that is encoded.
The corresponding implementation class:
- Java
-
src/main/java/com/example/json/MyServiceAction.java
import kalix.javasdk.JsonSupport; import kalix.javasdk.action.ActionCreationContext; import com.google.protobuf.Any; import com.google.protobuf.Empty; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MyServiceAction extends AbstractMyServiceAction { private static final Logger LOG = LoggerFactory.getLogger(MyServiceAction.class); public MyServiceAction(ActionCreationContext creationContext) {} @Override public Effect<Empty> consume(Any any) { JsonKeyValueMessage jsonMessage = JsonSupport.decodeJson(JsonKeyValueMessage.class, any); (1) LOG.info("Consumed " + jsonMessage); return effects().reply(Empty.getDefaultInstance()); } @Override public Effect<Any> produce(MyServiceApi.KeyValue keyValue) { JsonKeyValueMessage jsonMessage = new JsonKeyValueMessage(keyValue.getKey(), keyValue.getValue()); (2) Any jsonAny = JsonSupport.encodeJson(jsonMessage); (3) return effects().reply(jsonAny); } }
1 Decode the JSON message to a Java class JsonKeyValueMessage
.2 Convert the Protobuf message KeyValue
to a Java classJsonKeyValueMessage
.3 Encode the Java class JsonKeyValueMessage
to JSON. - Scala
-
src/main/scala/com/example/json/MyServiceAction.scala
class MyServiceAction(creationContext: ActionCreationContext) extends AbstractMyServiceAction { private val log = LoggerFactory.getLogger(classOf[MyServiceAction]) override def consume(any: ScalaPbAny): Action.Effect[Empty] = { val jsonMessage = JsonSupport.decodeJson[JsonKeyValueMessage](any) (1) log.info("Consumed {}", jsonMessage) effects.reply(Empty.defaultInstance) } override def produce(keyValue: KeyValue): Action.Effect[ScalaPbAny] = { val jsonMessage = JsonKeyValueMessage(keyValue.key, keyValue.value) (2) val jsonAny = JsonSupport.encodeJson(jsonMessage) (3) effects.reply(jsonAny) } }
1 Decode the JSON message to a Scala class JsonKeyValueMessage
.2 Convert the Protobuf message KeyValue
to a Scala classJsonKeyValueMessage
.3 Encode the Scala class JsonKeyValueMessage
to JSON.
Kalix uses Jackson to serialize JSON.
Text
If the content type starts with text/
it is treated as a string message. The topic subscriber method must accept the google.protobuf.StringValue
message.
- Java
-
A
proto
definition of an Action that consumes String messages can look like this:src/main/proto/com/example/json/json_api.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "MyTopics"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "strings_topic" }; } }
1 google.protobuf.StringValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming text messages from a topic the input type must be google.protobuf.StringValue
. - Scala
-
A
proto
definition of an Action that consumes String messages can look like this:src/main/proto/com/example/topics_action.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeStringTopic(google.protobuf.StringValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "strings_topic" }; } }
1 google.protobuf.StringValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming text messages from a topic the input type must be google.protobuf.StringValue
.
If an action has a return type of StringValue
and publishes to a topic, the events published to the topic will have content-type text/plain; charset=utf-8
.
Bytes
If the content type is application/octet-stream
, no content type is present, or the type is unknown to Kalix the message is treated as a binary message. The topic subscriber method must accept the google.protobuf.BytesValue
message.
- Java
-
A
proto
definition of an Action that consumes binary messages with raw bytes can look like this:src/main/proto/com/example/topics_action.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; option java_outer_classname = "MyTopics"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "bytes_topic" }; } }
1 google.protobuf.BytesValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming raw bytes messages from a topic the input type must be google.protobuf.BytesValue
. - Scala
-
A
proto
definition of an Action that consumes binary messages with raw bytes can look like this:src/main/proto/com/example/json/json_api.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/wrappers.proto"; (1) import "google/protobuf/empty.proto"; service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ConsumeRawBytesTopic(google.protobuf.BytesValue) returns (google.protobuf.Empty) { (2) option (kalix.method).eventing.in = { topic: "bytes_topic" }; } }
1 google.protobuf.BytesValue
requires the importgoogle/protobuf/wrappers.proto
.2 When consuming text messages from a topic the input type must be google.protobuf.BytesValue
.
If an action has a return type of BytesValue
and publishes to a topic, the events published to the topic will have content-type application/octet-stream
.
Receiving CloudEvents
Kalix uses the CloudEvents standard when receiving from and publishing to topics. The CloudEvents specification standardizes message metadata so that systems can integrate more easily.
Describing the structure of the message payload is the CloudEvents feature most important to Kalix.
An example of that is the capability to send serialized Protobuf messages and have Kalix deserialize them accordingly.
To allow proper reading of Protobuf messages from topics, the messages need to specify the message attributes:
-
Content-Type
=application/protobuf
-
ce-specversion
=1.0
-
ce-type
= fully qualified protobuf message name (e.g.,shopping.cart.api.TopicOperation
)
(The ce-
prefixed attributes are part of the CloudEvents specification.)
The Protobuf rpc declaration uses the expected Protobuf message type and specifies the topic to subscribe to. You’ll normally want to share the exact Protobuf message declaration with the sending system.
- Java
-
A
proto
definition of an Action that consumes CloudEvent messages can look like this:src/main/proto/com/example/topics_action.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/empty.proto"; option java_outer_classname = "MyTopics"; message TopicOperation { string operation = 1; } service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "shopping-cart-protobuf-cloudevents" }; } }
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata. - Scala
-
A
proto
definition of an Action that consumes CloudEvent messages can look like this:src/main/proto/com/example/json/json_api.protosyntax = "proto3"; package com.example; import "kalix/annotations.proto"; import "google/protobuf/empty.proto"; message TopicOperation { string operation = 1; } service MyTopicsAction { option (kalix.codegen) = { action: {} }; rpc ProtobufFromTopic(TopicOperation) returns (google.protobuf.Empty) { (1) option (kalix.method).eventing.in = { topic: "shopping-cart-protobuf-cloudevents" }; } }
1 When consuming a CloudEvent containing a Protobuf message the handler request must have the message type specified in the metadata.