Customer Registry with Kafka in Java/Protobuf

Create a customer registry that includes publishing to Kafka. Package it into a container, and run it on Kalix.

In this sample you will learn:

  • How to add additional functionality, allowing to publish customer’s events to Kafka.

  • How to package the customer registry into a container.

  • How to deploy and run the customer registry on Kalix.

Before you begin

  • If you’re new to Kalix, create an account so you can try out Kalix for free.

  • You’ll need to install the Kalix CLI to deploy from a terminal window.

  • You’ll also need

If you want to bypass writing code and jump straight to the deployment:

  1. Download the source code using the Kalix CLI: kalix quickstart download customer-registry-kafka-java-protobuf

  2. Skip to Package and deploy your service.

Start from the Customer Registry Entity

Start by downloading the Customer Registry sample source code using the Kalix CLI:

kalix quickstart download customer-registry-java

In this guide, we will describe how to subscribe to events from the entity and forward them to a Kafka Broker. How to do this with an Action. Publishing an event each time a customer is created or updated

Define an Action

The customer_action.proto will contain the definition of this action.

  1. In your project, create a directory for your protobuf file, src/main/proto/customer/action.

    Linux or macOS
    mkdir -p ./src/main/proto/customer/action
    Windows 10+
    mkdir src/main/proto/customer/action
  2. Create a customer_action.proto file and save it in the src/main/proto/customer/action directory.

  3. Add declarations for:

    • The protobuf syntax version, proto3.

    • The package name, customer.action.

    • The required Java outer classname, CustomerAction. Messages defined in this file will be generated as inner classes.

    • Import customer/api/customer_api.proto,customer/domain/customer_domain.proto, and Kalix kalix/annotations.proto.

      src/main/proto/customer/action/customer_action.proto
      syntax = "proto3";
      package customer.action;
      
      option java_outer_classname = "CustomerAction";
      
      import "customer/api/customer_api.proto";
      import "customer/domain/customer_domain.proto";
      import "kalix/annotations.proto";
  4. Add the service definition. The service definition is annotated with kalix.codegen indicating we want to generate an Action for this service.

  5. Add declarations for:

    • Listening to customer.domain.CustomerState events from the value entity customer by using the option eventing.in.

    • Publishing to the Kafka topic customer_changes by using the option eventing.out.

      src/main/proto/customer/action/customer_action.proto
      service CustomerStateSubscription {
        option (kalix.codegen) = {
          action: {}
        };
      
        rpc OnStateChange (customer.domain.CustomerState) returns (customer.api.Customer) {
          option (kalix.method).eventing.in = {
            value_entity: "customers"
          };
          option (kalix.method).eventing.out = {
            topic: "customer_changes"
          };
        }
      }
  6. Run mvn compile from the project root directory to generate source classes in which you add business logic.

    mvn compile

Create an Action

Actions are stateless functions that can be triggered in multiple ways. In this case, the action is triggered by each value change customer.domain.CustomerState received by the Value Entity customer.domain.Customer.

  1. If it’s not open already, open src/main/java/customer/action/CustomerStateSubscriptionAction.java for editing.

  2. Modify the onStateChange method by adding the logic to handle the action. The complete method should include the following:

    src/main/java/customer/action/CustomerStateSubscriptionAction.java
      @Override
      public Effect<CustomerApi.Customer> onStateChange(CustomerDomain.CustomerState customerState) {
    
        // not populating address for public consumption
        CustomerApi.Customer customer = CustomerApi.Customer.newBuilder()
            .setCustomerId(customerState.getCustomerId())
            .setEmail(customerState.getEmail())
            .setName(customerState.getName())
            .build();
    
    
        logger.info("Publishing public customer state out: {}", customer);
        return effects().reply(customer);
      }
    • The incoming message contains the updated state of the customer entity and this action converts it to a CustomerApi.Customer and passes it to the Pub/Sub mechanism of choice. In this example Kafka.

Package and deploy your service

To build and publish the container image and then deploy the service, follow these steps:

  1. If you haven’t done so yet, sign in to your Kalix account. If this is your first time using Kalix, this will let you register an account, create your first project, and set this project as the default.

    kalix auth login
  2. Use the deploy target to build the container image, publish it to the container registry as configured in the pom.xml file, and use the target kalix:deploy to automatically deploy the service to Kalix:

    mvn deploy kalix:deploy
    If you time stamp your image. For example, <dockerTag>${project.version}-${build.timestamp}</dockerTag> you must always run both targets in one pass, i.e. mvn deploy kalix:deploy. You cannot run mvn deploy first and then mvn kalix:deploy because they will have different timestamps and thus different `dockerTag`s. This makes it impossible to reference the image in the repository from the second target.
  3. You can verify the status of the deployed service using:

    kalix service list

Invoke your service

Once the service has started successfully, you can start a proxy locally to access the service:

kalix service proxy <service name> --grpcui

The --grpcui option also starts and opens a gRPC web UI for exploring and invoking the service (available at http://127.0.0.1:8080/ui/).

Or you can use command line gRPC or HTTP clients, such as grpcurl or curl, to invoke the service through the proxy at localhost:8080, using plaintext connections.

A customer can be created using the Create method on CustomerService, in the gRPC web UI, or with grpcurl:

grpcurl \
  -d '{
    "customer_id": "abc123",
    "email": "someone@example.com",
    "name": "Someone",
    "address": {
      "street": "123 Some Street",
      "city": "Somewhere"
    }
  }' \
  --plaintext localhost:8080 \
  customer.api.CustomerService/Create

The GetCustomer method can be used to retrieve this customer, in the gRPC web UI, or with grpcurl:

grpcurl \
  -d '{"customer_id": "abc123"}' \
  --plaintext localhost:8080 \
  customer.api.CustomerService/GetCustomer

Check the change made to the Customer appears in the topic customer_changes in the Kafka cluster on your Confluent Cloud.

Next steps