Customer Registry with Kafka in Java/Protobuf
Create a customer registry that includes publishing to Kafka. Package it into a container, and run it on Kalix.
In this sample you will learn:
-
How to add additional functionality, allowing to publish customer’s events to Kafka.
-
How to package the customer registry into a container.
-
How to deploy and run the customer registry on Kalix.
Before you begin
-
If you’re new to Kalix, create an account so you can try out Kalix for free.
-
You’ll need to install the Kalix CLI to deploy from a terminal window.
-
You’ll also need
-
Java 11 or higher
-
Configure your project message broker to use Kafka and create the topic
customer_changes
by following the Configure message brokers how-to. When setting the broker the configuration file you pass should be likekafka/my-dev.kafka.properties
that you can find in this project.
If you want to bypass writing code and jump straight to the deployment:
|
Start from the Customer Registry Entity
Start by downloading the Customer Registry sample source code using the Kalix CLI:
kalix quickstart download customer-registry-java
In this guide, we will describe how to subscribe to events from the entity and forward them to a Kafka Broker. How to do this with an Action. Publishing an event each time a customer is created or updated
Define an Action
The customer_action.proto
will contain the definition of this action.
-
In your project, create a directory for your protobuf file,
src/main/proto/customer/action
.- Linux or macOS
-
mkdir -p ./src/main/proto/customer/action
- Windows 10+
-
mkdir src/main/proto/customer/action
-
Create a
customer_action.proto
file and save it in thesrc/main/proto/customer/action
directory. -
Add declarations for:
-
The protobuf syntax version,
proto3
. -
The package name,
customer.action
. -
The required Java outer classname,
CustomerAction
. Messages defined in this file will be generated as inner classes. -
Import
customer/api/customer_api.proto
,customer/domain/customer_domain.proto
, and Kalixkalix/annotations.proto
.src/main/proto/customer/action/customer_action.protosyntax = "proto3"; package customer.action; option java_outer_classname = "CustomerAction"; import "customer/api/customer_api.proto"; import "customer/domain/customer_domain.proto"; import "kalix/annotations.proto";
-
-
Add the service definition. The service definition is annotated with
kalix.codegen
indicating we want to generate an Action for this service. -
Add declarations for:
-
Listening to
customer.domain.CustomerState
events from the value entitycustomer
by using the optioneventing.in
. -
Publishing to the Kafka topic
customer_changes
by using the optioneventing.out
.src/main/proto/customer/action/customer_action.protoservice CustomerStateSubscription { option (kalix.codegen) = { action: {} }; rpc OnStateChange (customer.domain.CustomerState) returns (customer.api.Customer) { option (kalix.method).eventing.in = { value_entity: "customers" }; option (kalix.method).eventing.out = { topic: "customer_changes" }; } }
-
-
Run
mvn compile
from the project root directory to generate source classes in which you add business logic.mvn compile
Create an Action
Actions are stateless functions that can be triggered in multiple ways. In this case, the action is triggered by each value change customer.domain.CustomerState
received by the Value Entity customer.domain.Customer
.
-
If it’s not open already, open
src/main/java/customer/action/CustomerStateSubscriptionAction.java
for editing. -
Modify the
onStateChange
method by adding the logic to handle the action. The complete method should include the following:src/main/java/customer/action/CustomerStateSubscriptionAction.java@Override public Effect<CustomerApi.Customer> onStateChange(CustomerDomain.CustomerState customerState) { // not populating address for public consumption CustomerApi.Customer customer = CustomerApi.Customer.newBuilder() .setCustomerId(customerState.getCustomerId()) .setEmail(customerState.getEmail()) .setName(customerState.getName()) .build(); logger.info("Publishing public customer state out: {}", customer); return effects().reply(customer); }
-
The incoming message contains the updated state of the customer entity and this action converts it to a
CustomerApi.Customer
and passes it to the Pub/Sub mechanism of choice. In this example Kafka.
-
Package and deploy your service
To build and publish the container image and then deploy the service, follow these steps:
-
If you haven’t done so yet, sign in to your Kalix account. If this is your first time using Kalix, this will let you register an account, create your first project, and set this project as the default.
kalix auth login
-
Use the
deploy
target to build the container image, publish it to the container registry as configured in thepom.xml
file, and use the targetkalix:deploy
to automatically deploy the service to Kalix:mvn deploy kalix:deploy
If you time stamp your image. For example, <dockerTag>${project.version}-${build.timestamp}</dockerTag>
you must always run both targets in one pass, i.e.mvn deploy kalix:deploy
. You cannot runmvn deploy
first and thenmvn kalix:deploy
because they will have different timestamps and thus different `dockerTag`s. This makes it impossible to reference the image in the repository from the second target. -
You can verify the status of the deployed service using:
kalix service list
Invoke your service
Once the service has started successfully, you can start a proxy locally to access the service:
kalix service proxy <service name> --grpcui
The --grpcui
option also starts and opens a gRPC web UI for exploring and invoking the service (available at http://127.0.0.1:8080/ui/).
Or you can use command line gRPC or HTTP clients, such as grpcurl
or curl
, to invoke the service through the proxy at localhost:8080
, using plaintext connections.
A customer can be created using the Create
method on CustomerService
, in the gRPC web UI, or with grpcurl
:
grpcurl \ -d '{ "customer_id": "abc123", "email": "someone@example.com", "name": "Someone", "address": { "street": "123 Some Street", "city": "Somewhere" } }' \ --plaintext localhost:8080 \ customer.api.CustomerService/Create
The GetCustomer
method can be used to retrieve this customer, in the gRPC web UI, or with grpcurl
:
grpcurl \ -d '{"customer_id": "abc123"}' \ --plaintext localhost:8080 \ customer.api.CustomerService/GetCustomer
Check the change made to the Customer appears in the topic customer_changes
in the Kafka cluster on your Confluent Cloud.
Next steps
-
You can learn more about Value Entities in the reference documentation.
-
Continue this example by adding Views, which makes it possible to query the customer registry.