Implementing Event Sourced Entities in JavaScript
Event Sourced Entities persist their state with ACID semantics , scale horizontally, and isolate failures. They use the Event Sourcing Model—rather than persisting the current state, they persist all of the events that led to the current state. Kalix stores these events in a journal.
An Event Sourced Entity must not update its in-memory state directly as a result of a command. The handling of a command, if it results in changes being required to state, should emit events. These events will then be received, at which point the in-memory state can and should be changed in response.
When you need to read state in your service, ask yourself what events should I be listening to?. When you need to write state, ask yourself what events should I be emitting?
To load an Entity, Kalix reads the journal and replays events to compute the Entity’s current state. As an optimization, by default, Event Sourced Entities persist state snapshots periodically. This allows Kalix to recreate an Entity from the most recent snapshot plus any events saved after the snapshot.
In contrast with typical create, read, update (CRUD) systems, event sourcing allows the state of the Entity to be reliably replicated to other services. Event Sourced Entities use offset tracking in the journal to record which portions of the system have replicated which events.
To learn more about event sourcing, check out the free Lightbend Academy course, Reactive Architecture: CQRS & Event Sourcing |
Persistence types and serialization
Event Sourced Entities persist events and snapshots in the journal, so Kalix must serialize them. Kalix will automatically detect if an emitted event or snapshot is a protobuf
type, and serialize it using protobufjs
. However, as a JavaScript developer, you may find it more natural to use JSON. Serialization options for JavaScript services describes how to do that. See the protobuf documentation for more information on
protobufjs
.
While protobuf
is the recommended format for persisting events, we recommend that you do not persist your service’s protobuf
messages, rather, you should create new messages, even if they are identical. While this may introduce some overhead in converting from one type to the other, it will allow the service’s public interface to evolve independently from its data storage format, which should be private.
The following shows an example shopping cart definition in a domain.proto
file:
syntax = "proto3";
package example.shoppingcart.domain;
message LineItem {
string productId = 1;
string name = 2;
int32 quantity = 3;
}
message ItemAdded {
LineItem item = 1;
}
message ItemRemoved {
string productId = 1;
}
message CheckedOut {}
message Cart {
repeated LineItem items = 1;
bool checkedout = 2;
}
In this file, the Cart
message represents the state snapshot, while ItemAdded
and ItemRemoved
are events. Note the event names are in past tense—events are facts, indisputable things that happened in the past. A fact never becomes false: after an item has been added to a shopping cart, it never becomes untrue that that item was added to the cart. It can be removed, but that doesn’t change the fact that it was added, it only changes the current state of the cart. The names of events should always be in past tense to reflect the indisputable fact that they represent.
Creating an entity
Create an Event Sourced Entity with the EventSourcedEntity class.
- JavaScript
-
const EventSourcedEntity = require("@lightbend/kalix-javascript-sdk").EventSourcedEntity; const entity = new EventSourcedEntity( ["shoppingcart.proto", "domain.proto"], "example.shoppingcart.ShoppingCartService", { persistenceId: "shopping-cart", snapshotEvery: 100 } );
- TypeScript
-
import { EventSourcedEntity } from "@kalix-io/kalix-javascript-sdk"; const entity: EventSourcedEntity = new EventSourcedEntity( ["shoppingcart.proto", "domain.proto"], "example.shoppingcart.ShoppingCartService", "shopping-cart", { snapshotEvery: 100 } );
Using protobuf types
When you pass an event or snapshot to persist, Kalix needs to know how to serialize it. Simply passing a regular object does not provide enough information to know how protobuf
should serialize the objects. To emit an event or snapshot, you first must lookup the protobuf
types, and then use the create
method to create them.
The EventSourced
class provides a helper method called lookupType
. So before implementing, we’ll use
lookupType
to get these types to use later.
- JavaScript
-
const pkg = "example.shoppingcart.domain."; const ItemAdded = entity.lookupType(pkg + "ItemAdded"); const ItemRemoved = entity.lookupType(pkg + "ItemRemoved"); const Cart = entity.lookupType(pkg + "Cart");
- TypeScript
-
const pkg = "example.shoppingcart.domain."; const ItemAdded = entity.lookupType(pkg + "ItemAdded"); const ItemRemoved = entity.lookupType(pkg + "ItemRemoved"); const Cart = entity.lookupType(pkg + "Cart");
Initial state
When there are no snapshots persisted for an entity (such as when the entity is first created), the entity needs to have an initial state. Note that Event Sourced Entities are not explicitly created, they are implicitly created when a command arrives for them. And, nothing is persisted until an event is created for that Entity. So, if user "X" opens their shopping cart for the first time, an Entity will be created for them, with no events in the log. It will just be in the initial state.
To create the initial state, we set the initial
callback. This takes the id of the entity being created, and returns a new empty state, in this case, an empty shopping cart:
- JavaScript
-
entity.initial = entityId => Cart.create({ items: [] });
- TypeScript
-
entity.setInitial(entityId => Cart.create({ items: [] }));
Note the use of Cart.create()
, this creates a protobuf
message using the Cart
protobuf
message type that we looked up earlier.
Behavior
Now we need to define the behavior for our entity. The behavior consists of two parts, command handlers, and event handlers.
Command handlers
A command handler is a function that takes a command, the current state, and an
EventSourcedEntityCommandContext
. It implements a service call on the Entity’s gRPC interface.
The command is the input message type for the gRPC service call. For example, the GetCart
service call has an input type of GetShoppingCart
, while the AddItem
service call has an input type of AddLineItem
. The command will be an object that matches the structure of these protobuf types.
The command handler must return a message of the same type as the output type of the gRPC service call, in the case of our GetCart
command, this must be a Cart
message. Note that unlike for the state and events, this message does not need to be created using a looked up protobuf message type—Kalix already knows the output type of the gRPC service call and so can infer it. It only has to be a plain JavaScript object that matches the structure of the protobuf type.
The following shows the implementation of the GetCart
command handler. This command handler is a read-only command handler, it doesn’t emit any events, it just returns some state:
- JavaScript
-
function getCart(request, cart) { return cart; }
- TypeScript
-
function getCart(request: GetShoppingCart, cart: State): replies.Reply { return replies.message(cart); }
Emitting events from commands
Commands that modify the state MUST do so by emitting events.
The only way a command handler may modify its state is by emitting an event. Any modifications made directly to the state from the command handler will not be persisted, and will be lost as soon as the command handler finishes executing. |
A command handler may emit an event by using the emit
method on the
EventSourcedEntityCommandContext
.
The following command handler example emits an event:
- JavaScript
-
function addItem(addItem, cart, ctx) { if (addItem.quantity < 1) { ctx.fail( "Quantity for item " + addItem.productId + " must be greater than zero.", 3, // optional parameter, sets the gRPC status code to 3 - INVALID_ARGUMENT ); } else { const itemAdded = ItemAdded.create({ item: { productId: addItem.productId, name: addItem.name, quantity: addItem.quantity } }); ctx.emit(itemAdded); return {}; } }
- TypeScript
-
function addItem( addItem: AddLineItem, cart: State, ctx: Context ): replies.Reply { if (addItem.quantity < 1) { return replies.failure( "Quantity for item " + addItem.productId + " must be greater than zero.", replies.GrpcStatus.InvalidArgument, // optional parameter, customise gRPC code ); } const itemAdded = ItemAdded.create({ item: { productId: addItem.productId, name: addItem.name, quantity: addItem.quantity } }); ctx.emit(itemAdded); return replies.message({}); }
This command handler also validates the command by ensuring that the quantity of items added is greater than zero. Invoking fail
fails the command - this method throws so there’s no need to explicitly throw an exception.
Event handlers
An event handler is invoked at two points:
-
When restoring Entities from the journal, before any commands are handled
-
Each time a new event is emitted
An event handler’s responsibility is to update the state of the entity according to the event. Event handlers are the only place where its safe to mutate the state of an Entity.
An event handler must be declared for each type of event that gets emitted. The type is defined by the protobuf message type in the case of protobuf
events, or the type
property on a JSON object in the case of JSON events. The mapping for these type names to functions will be discussed later, for now we’ll just look at the functions.
Event handlers take the event they are handling, and the state, and must return the new state. The handler may update the existing state passed in, but it still has to return that state as its return value. The command handler accumulates the emitted events and applies the events to the managed state after command processing.
Here’s an example event handler for the ItemAdded
event:
- JavaScript
-
function itemAdded(added, cart) { const existing = cart.items.find(item => { return item.productId === added.item.productId; }); if (existing) { existing.quantity = existing.quantity + added.item.quantity; } else { cart.items.push(added.item); } return cart; }
- TypeScript
-
function itemAdded(added: ItemAdded, cart: State): State { const existing = cart.items.find(item => { return item.productId === added.item?.productId; }); if (existing) { existing.quantity = (existing.quantity || 0) + (added.item?.quantity || 0); } else { added.item ? cart.items.push(added.item) : {}; } return cart; }
Setting the behavior
Once you have command handler and event handler functions implemented, you can set your behavior. The behavior callback takes the current state of the entity, and returns an object with two properties:
commandHandlers
and eventHandlers
. The callback may return different sets of handlers according to the current state, this will be explored more later, for now we’ll just implement an entity with one set of handlers.
The behavior callback can be set by setting the behavior
property on the entity:
- JavaScript
-
entity.behavior = cart => { return { commandHandlers: { AddItem: addItem, RemoveItem: removeItem, GetCart: getCart }, eventHandlers: { ItemAdded: itemAdded, ItemRemoved: itemRemoved } }; };
- TypeScript
-
entity.setBehavior((cart: State) => { return { commandHandlers: { AddItem: addItem, RemoveItem: removeItem, GetCart: getCart }, eventHandlers: { ItemAdded: itemAdded, ItemRemoved: itemRemoved } }; });
The command handlers are a mapping of the gRPC service call names to the command handler functions we implemented. Note the names, as in the gRPC convention for service call names, are uppercase.
The event handlers are a mapping of event names to the event handler functions that we implemented. The event names must match the type of the events that are being persisted. In the case of protobuf messages, this is either the fully qualified name of the protobuf message, or the unqualified name of the protobuf message. For JSON messages, this is the value of the type
property in the message.
Multiple behaviors
In the examples above, our shopping cart entity only has one behavior. An entity may have different states, where command and event handling may differ according to the state it is currently in. While this could be implemented using if statements in the handlers, Kalix also provides multiple behavior support, so that an entity can change its behavior. This multiple behavior support allows implementing entities as finite state machines.
The Entity’s behavior can be changed by returning different sets of handlers from the behavior
callback after inspecting the state. This callback is invoked each time a handler is needed, so there’s no need to explicitly transition behaviors.
In the example below, we show a shopping cart that also has a checkout command. Once checked out, the shopping cart no longer accepts any commands to add or remove items, its state and therefore behavior changes:
- JavaScript
-
function checkout(checkout, cart, ctx) { ctx.emit(CheckedOut.create({})); return {}; } function checkedOut(checkedOut, cart) { cart.checkedOut = true; return cart; } function alreadyCheckedOut(cmd, state, ctx) { ctx.fail("Cart is already checked out!"); } entity.behavior = cart => { if (cart.checkedout) { return { commandHandlers: { AddItem: alreadyCheckedOut, RemoveItem: alreadyCheckedOut, Checkout: alreadyCheckedOut, GetCart: getCart }, eventHandlers : {} }; } else { return { commandHandlers: { AddItem: addItem, RemoveItem: removeItem, Checkout: checkout, GetCart: getCart }, eventHandlers: { ItemAdded: itemAdded, ItemRemoved: itemRemoved, CheckedOut: checkedOut } }; } };
- TypeScript
-
function checkout(checkout: Checkout, cart: State, ctx: Context): replies.Reply { ctx.emit(CheckedOut.create({})); return replies.message({}); } function checkedOut(checkedOut: CheckedOut, cart: State): State { cart.checkedOut = true; return cart; } function alreadyCheckedOut(): replies.Reply { return replies.failure("Cart is already checked out!"); } entity.behavior = cart => { if (cart.checkedout) { return { commandHandlers: { AddItem: alreadyCheckedOut, RemoveItem: alreadyCheckedOut, Checkout: alreadyCheckedOut, GetCart: getCart }, eventHandlers : {} }; } else { return { commandHandlers: { AddItem: addItem, RemoveItem: removeItem, Checkout: checkout, GetCart: getCart }, eventHandlers: { ItemAdded: itemAdded, ItemRemoved: itemRemoved, CheckedOut: checkedOut } }; } };
Starting the entity
You can let Kalix start entities by adding the entities to an Kalix server
instance:
- JavaScript
-
const Kalix = require("@lightbend/kalix-javascript-sdk").Kalix; const server = new Kalix(); server.addComponent(require("./shoppingcart")); server.start();
- TypeScript
-
import { Kalix } from "@kalix-io/kalix-javascript-sdk"; import shoppingcartEntity from "./shoppingcart"; new Kalix().addComponent(shoppingcartEntity).start();