Exporting and importing data
Kalix allows exporting and importing data into and out of your services. This can be useful for the following use cases:
-
Migrating data between environments
-
Bootstrapping development environments with data
-
Migrating data from an existing system into Kalix
-
Migrating data out of Kalix
-
Disaster recovery - note, Lightbend already backs up all data for disaster recovery purposes
Kalix exports/imports data to/from local cloud object stores. If your service is deployed to an AWS region, then you can export/import data to/from S3, and if GCP, then to/from GCS. Event sourced entities and value entities are supported, however at present views are not supported.
The export format can either be protobuf or line separated JSON.
Prepare
Before you can perform an export or import, you will need to setup your Kalix project to be able to access an object store in your cloud provider.
Preparing on AWS
On AWS, you will need to:
-
Create an S3 bucket for the export/import to be written to or read from.
-
Create a user for Kalix to use to perform operations on S3.
-
Create an IAM policy that grants allow permission for operations on the S3 bucket and objects in that bucket.
-
Attach the IAM policy to user created in step 2.
-
Create an access key for the user created in step 2.
-
Place the access key created in step 5 in a Kalix secret.
Create an S3 bucket
We recommend creating an S3 bucket in the same region as the Kalix region your service is running in. If you already have a bucket that you would like to use for this, you can skip this step. For the purposes of these instructions, we’re going to call the bucket my-kalix-bucket
.
Instructions for creating a bucket in AWS can be found here.
Create a user
For the purposes of these instructions, we’re going to call the user my-kalix-user
. Instructions for creating users in AWS can be found here.
Create an IAM policy
For imports, the IAM policy must grant ListBucket
permissions on the bucket, and GetObject
permissions on the object paths you wish to export to. For exports, PutObject
and DeleteObject
will also be needed on the object paths. Here is an example policy:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::my-kalix-bucket",
"arn:aws:s3:::my-kalix-bucket/*"
]
}
]
}
Instructions for creating IAM policies in AWS can be found here.
Attach IAM policy to user
Having created the policy, you can now attach the policy to the user you created earlier. Instructions for doing that can be found here.
Create an access key
Now that the user has the right permissions, you need to create an access key for that user. Instructions for doing this can be found here. Copy the access key ID and secret access key for use in the next step.
Create a Kalix secret with the access key
Now the access key that you created needs to be given to Kalix. This can be done by creating a Kalix secret. We’re going to call this secret my-credentials
:
kalix secrets create generic my-credentials \
--literal access-key-id=C2VJCMV0IHNLY3JLDCBZZWNYZXQK \
--literal secret-access-key=dGhpcyBpcyB2ZXJ5IHNlY3JldCBpcyBpdCBub3QK
Note that the keys access-key-id
and secret-access-key
can be called by other names, but these are the default names that the data management tool expects, so using these names will require less configuration later on.
Preparing on GCP
On GCP, you will need to:
-
Create an GCS bucket for the export/import to be written to or read from.
-
Create a service account for Kalix to use to perform operations on GCS.
-
Grant the service account access to the GCS bucket.
-
Create service account key for the service account.
-
Place the service account key JSON in a Kalix secret.
Create a GCS bucket
We recommend creating a GCS bucket in the same region as the Kalix service is running. If you already have a bucket that you would like to use for this, you can skip this step. For the purposes of these instructions, we’re going to call the bucket my-kalix-bucket
.
Instructions for creating a bucket in GCS can be found here.
Create a service account
For the purposes of these instructions, we’re going to call the user my-kalix-service-account
. Instructions for creating service accounts in GCP can be found here.
Granting access
For exports, you will need to grant the service account the Storage Object Admin role to the bucket (roles/storage.objectAdmin
). For imports, you will need to grant the service account the Storage Object Viewer role to the bucket (roles/storage.objectViewer
). Instructions for managing IAM policies for buckets can be fonud here.
Create the service account key
You will need to create a JSON service account key for the my-kalix-service-account
for Kalix to use to access the bucket. This can be done by following these instructions. Once the key is created, place it in a file called key.json
.
Create a Kalix secret
You now need to place the previously created key in a Kalix secret. For the purpose of these instructions, we’re going to call this secret my-credentials
. While the name of the file in the secret can be anything, we recommend key.json
as this is the default that subsequent commands will use.
kalix secrets create generic my-credentials --from-file key.json=path/to/key.json
Data format
The format that Kalix exports data to/from can either be JSON or protobuf. By default, the exports are gzipped.
Protobuf
The protobuf schema for the export can be found here. While an export can be parsed and formatted as a single protobuf message, for large exports, this may not be practical or possible. Instead, we recommend a streaming approach for reading and writing exports.
Writing exports as streams
For streaming exports, we can take advantage of the fact that writing a single protobuf message out to a stream with a repeating field many times will result in a single large message with the entries from each message written being concatenated into one large repeating field.
public void export(Iterable<EventSourcedEntityEvent> events) throws IOException {
var fileStream = new FileOutputStream("my-export.binpb");
var stream = CodedOutputStream.newInstance(fileStream);
// First write the header
Export.newBuilder()
.setVersion("v1")
.setExportHeader(ExportHeader.newBuilder()
.setId("my-export")
.setRecordType(ExportRecordType.EVENT_SOURCED_ENTITY_EVENT)
).build()
.writeTo(stream);
for (var event : events) {
// Now write each event as if it was a single export message with a single records field.
// This has the effect of concatenating the event into the records field of the
// previously written message.
Export.newBuilder()
.addRecords(event.toByteString())
.build()
.writeTo(stream);
}
// Flush and close.
stream.flush();
fileStream.close();
}
Reading exports as streams
Reading exports is a little more complex, it requires using the protobuf CodedInputStream class, but is nonetheless straight forward.
public void readExport(File file) throws IOException {
var extensionRegistry = ExtensionRegistry.newInstance()
var stream = CodedInputStream.newInstance(new FileInputStream(file));
int tag = stream.readTag();
while (tag != 0) {
switch (WireFormat.getTagFieldNumber(tag)) {
case 1:
// Version field
var version = stream.readString();
if (!version.equals("v1")) {
throw new RuntimeException("Unknown export version: " + version);
}
break;
case 2:
// Header field
var headerBuilder = ExportHeader.newBuilder();
stream.readMessage(headerBuilder, extensionRegistry);
var header = headerBuilder.build();
if (header.getRecordType() != ExportRecordType.EVENT_SOURCED_ENTITY_EVENT) {
throw new RuntimeException("Expected event sourced entity events, but got: " + header.getRecordType().name());
}
break;
case 3:
// A record
var builder = EventSourcedEntityEvent.newBuilder();
stream.readMessage(builder, extensionRegistry);
var event = builder.build();
handleEvent(event);
break;
}
tag = stream.readTag();
}
}
JSON
JSON exports use the protobuf schema specified here, with the exception that the JSON is formatted as newline separated JSON.
The first line should be the Export
object with no records
fields set. Each subsequent line should be a record for the records field.
In addition to this, JSON payloads in the export are encoded specially. Rather than encoding the JSON bytes as base64 in the value
field of the protobuf Any
message, the JSON message is placed in a field called json
, with any type associated with the message being encoded in a field called type
. Otherwise, it gets encoded as a regular protobuf Any
message, with the bytes base64 encoded in the value
field, and the typeUrl
field populated with the type of the protobuf message.
Here is an example JSON export:
{"version":"v1","exportHeader":{"recordType":"EVENT_SOURCED_ENTITY_EVENT"}}
{"entityType":"asset","entityId":"1","seqNr":1,"timestamp":"2024-01-12T00:19:14.006111Z","payload":{"type":"asset-moved","json":{"location":"workshop"}}}
{"entityType":"asset","entityId":"1","seqNr":2,"timestamp":"2024-01-12T00:20:29.782976Z","payload":{"type":"asset-moved","json":{"location":"toolshed"}}}
Running exports
The kalix service data export
command can be used to run an export. For example:
kalix service data export assets --s3-secret my-credentials \
--s3-bucket my-bucket --include-event-sourced-entities
The format of the export is controlled using the --format
flag, which can either be protobuf
or json
. The default is protobuf
. Additionally, the compression algorithm that is used to compress/decompress the export can be specified using the --compression
flag, which can either be gzip
or none
, with the default being gzip
.
To control what gets exported, the --include-event-sourced-entities
and --include-value-entities
flags can be used.
By default, the export will be run synchronously, displaying the progress of the export as it runs. Cancelling the export command will not stop the export, it will continue to run in the background. The export command can be run asynchronously by passing the --async
flag.
Exporting to S3
To export to S3, you must supply the S3 credentials secret via the --s3-secret
argument. You also need to supply the bucket name using the --s3-bucket
argument, and if that bucket doesn’t live in the same region as your service is running, you must supply the region name with the --s3-region
argument.
An optional prefix can be prepended to the names of the files that get exported, this can be specified using the --s3-object-prefix
argument.
Exporting to GCS
To export to GCS, you must supply the GCS credentials secret via the --gcs-secret
argument. You also need to supply the bucket name using the --gcs-bucket
argument.
An optional prefix can be prepended to the names of the files that get exported, this can be specified using the --gcs-object-prefix
argument.
Running imports
The kalix service data import
command can be used to run an export. For example:
kalix service data import assets --s3-secret my-credentials \
--s3-bucket my-bucket --include-event-sourced-entities
When importing to a service, the first thing Kalix will do is pause that service. If the service is running while the import is happening, the services data may be corrupted. If the import succeeds, Kalix will resume the service.
If you wish to clear the services database before importing, use the --truncate
flag. This will delete all existing data in the service.
The names of the files to import can either be specified explicitly, by passing --event-sourced-entity-object
, --event-ousrced-entity-snapshot-object
and/or --value-entity-object
flags, or Kalix can use the default filenames that it selects when creating an export. In the latter case, --s3-object-prefix
and --gcs-object-prefix
can be used, and --include-event-sourced-entities
and --include-value-entities
can be used to control which export types Kalix will look for.
The flags for specifying credentials and object names specific to S3 and GCS are the same as for exporting.
Managing tasks
When an import or export task is submitted, before anything is done, Kalix will validate the credentials and other aspects of the task to ensure that it can be issued. Once that validation is complete, the task will be submitted to a task queue for processing. The status of the task can be queried by running kalix service data list-tasks
.
If a task fails, Kalix will attempt to re-run it, using an exponential back-off. To cancel a task permanently, kalix service data cancel
can be used.
If you wish to watch a task that is running, you can use kalix service data watch
.