Hilla and Kafka. Part 3: Kafka Streams

René Wilby | Jul 15, 2025 min read

Article series

This is the third part of a small article series about how to build Hilla web apps for Apache Kafka.

In this blog post, we will learn how to utilize Kafka Streams in web apps build with Hilla.

Setup Kafka and Hilla project

Please read the first blog post of this article series to find out how to set up a local Kafka Broker and how to initialize a new Hilla project.

Kafka Streams

A general introduction to Kafka Streams is out of scope for this blog post. Instead, we will focus on a specific features that can be useful when building full-stack web apps using Hilla and Kafka Streams.

To enable Kafka Stream support in our Hilla application, we need to provide the required dependencies in our pom.xml.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>${kafka.version}</version>
</dependency>

In order to be able to use Kafka Streams, we need to add some configuration to our Hilla backend. We could create the required beans as described in the Spring for Apache Kafka documentation, or we could use Spring Boot’s auto-configuration capabilities (as described in the Spring Boot documentation for Apache Kafka) by providing the required configuration in the application.properties of our Hilla backend.

# Set up Kafka Streams:
spring.kafka.streams.application-id=hilla-kafka-streams-example
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.client-id=reservation-client
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties[default.value.serde]=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.properties[spring.json.value.default.type]=de.rwi.hillakafkastreamsexample.reservation.Reservation

Most of the configuration should be self-explaining. When receiving messages, we want to deserialize the message to our Reservation model. For this purpose, we can use the JsonSerde serializer that is provided by Spring. In addition, we configure our Reservation record to be the default type for incoming messages. This way we don’t have to care about message headers and their mapping.

Now, we create a KafkaStreamsConfig that will be responsible for creating a KStream bean that contains all messages that will be received in the reservations topic.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream("reservations");
    }
}

Interactive Queries

One feature that is very useful when building web apps based on Kafka Stream is Interactive Queries. Interactive Queries help us to query the latest state of our application - that is, the state that is the result of some kind of (stream) processing. The full state is typically split across many distributed instances of our app, and across many so called state stores that are managed locally by these application instances. For simplicity, we assume that our web app has just one instance and all the relevant state is available in one local state store. Querying remote state stores for all app instances is out of scope here (please take a look at the corresponding documentation for more details about this).

A local state store can be seen as a lightweight embedded database, which we can query, and we can configure which state should be materialized in a state store.

Query all reservations

The example application, we created in the first two blog posts, lists all the available reservations. To achieve this it has to listen to the topic reservations, read every reservation and hold it in-memory in a sink to provide it as a Flux to a consumer, which is the Hilla frontend in our case. We can change this to read all reservations from the local state store instead. This requires the materialization of the stream into a local state store called all-reservations-state-store. Spring’s Interactive Query support is centered around an API called KafkaStreamsInteractiveQueryService which is a facade around interactive queries APIs in the Kafka Streams library. Our Hilla app needs to create an instance of this service as a bean that allows the retrieval of the state store by its name.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
            StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    }

    @Bean
    public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, Reservation> stream = streamsBuilder.stream("reservations");

        // materialize all reservations by key
        stream
                .groupByKey()
                .reduce((Reservation aggregate, Reservation newValue) -> newValue,
                        Materialized.as("all-reservations-state-store"));

        return stream;
    }
}

We can now create a browser-callable service called KafkaStreamsService that will read all reservations from the local state store and will return them as a list.

@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {

    private final KafkaStreamsInteractiveQueryService interactiveQueryService;

    public KafkaStreamsService(KafkaStreamsInteractiveQueryService interactiveQueryService) {
        this.interactiveQueryService = interactiveQueryService;
    }

    public @NonNull List<@NonNull Reservation> list(Pageable pageable) {
        List<Reservation> reservations = getAllReservationsFromStateStore();
        reservations = applySorting(pageable.getSort(), reservations);
        reservations = applyPagination(pageable, reservations);
        return reservations;
    }

    private List<Reservation> getAllReservationsFromStateStore() {
        ReadOnlyKeyValueStore<String, Reservation> allReservationsStateStore = interactiveQueryService
                .retrieveQueryableStore("all-reservations-state-store", QueryableStoreTypes.keyValueStore());
        List<Reservation> allReservations = new ArrayList<>();
        Iterator<KeyValue<String, Reservation>> iterator = allReservationsStateStore.all();
        while (iterator.hasNext()) {
            allReservations.add(iterator.next().value);
        }
        return allReservations;
    }

    private List<Reservation> applySorting(Sort sort, List<Reservation> reservations) {
        if (sort != null && sort.isSorted()) {
            reservations.sort((r1, r2) -> {
                for (Sort.Order order : sort) {
                    try {
                        var field = Reservation.class.getDeclaredField(order.getProperty());
                        field.setAccessible(true);
                        Comparable<Object> v1 = (Comparable<Object>) field.get(r1);
                        Object v2 = field.get(r2);
                        int cmp = v1.compareTo(v2);
                        if (cmp != 0) {
                            return order.isAscending() ? cmp : -cmp;
                        }
                    } catch (Exception e) {
                        // Ignore and continue to next sort property
                    }
                }
                return 0;
            });
        }
        return reservations;
    }

    private List<Reservation> applyPagination(Pageable pageable, List<Reservation> reservations) {
        int pageSize = pageable.getPageSize();
        int pageNumber = pageable.getPageNumber();
        int fromIndex = Math.min(pageNumber * pageSize, reservations.size());
        int toIndex = Math.min(fromIndex + pageSize, reservations.size());

        return reservations.subList(fromIndex, toIndex);
    }
}

The list method receives a Pageable object and uses it to apply a sorting and pagination on the reservations read from the local state store. This makes this method a perfect match for a Grid component in the Hilla frontend.

export default function ReservationsView() {
  const dataProvider = useDataProvider<Reservation>({
    list: (pageable) => KafkaStreamsService.list(pageable),
  });

  return (
    <VerticalLayout theme="padding" className="h-full">
      <ViewToolbar title="Reservations" />
      <Grid dataProvider={dataProvider.dataProvider}>
        <GridSortColumn path="id" />
        <GridSortColumn path="date" />
        <GridSortColumn path="customer" />
      </Grid>
    </VerticalLayout>
  );
}

Compared to the approach based on a KafkaListener, as described in the first blog post of this article series, a solution based on Interactive Queries offers a few advantages:

  • State can be persisted in state stores in the file system instead of in-memory.
  • Interactive Queries allow us to fetch the latest state directly from the state store, without persisting it in and querying it from an external database.
  • State stores are backed by Kafka changelogs, so state can be recovered after failures.

Keeping all reservations persisted and queryable could also be achieved with a traditional database. Writing reservations into a table once they have been received using a KafkaListener, is of course a valid option and the result (from a frontend/UI perspective) might be the same. It will most likely depend on the concrete requirements, which approach fits best. When you have to process a large amount of data, or you may have some kind of real-time processing, a state store might offer a better performance.

But Kafka Streams and Interactive Queries can also help you in other areas, for example when you need aggregations on the data you process with Kafka.

Reservation dashboard

A typical requirement for business web apps are dashboards. Dashboards usually aggregate some kind of data to give a quick overview. In our sample application, we could add a dashboard that shows a total number of reservations and the number of reservations per customer, for example.

Dashboard

We can extend the existing Kafka Streams processing in our KafkaStreamsConfig.java to add two more state stores, that materialize the total number of reservations and the total number of reservations per customer.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean
    public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
            StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    }

    @Bean
    public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, Reservation> stream = streamsBuilder.stream("reservations");

        // materialize all reservations by key
        stream
                .groupByKey()
                .reduce((Reservation aggregate, Reservation newValue) -> newValue,
                        Materialized.as("all-reservations-state-store"));

        // materialize total number of reservations (count)
        stream
                .map((key, value) -> new KeyValue<>("total", value))
                .groupByKey()
                .count(Materialized.as("reservations-count-state-store"));

        // materialize total number of reservations per customer
        stream
                .groupBy((key, reservation) -> reservation.customer(), Grouped.with(null, null))
                .count(Materialized.as("reservations-count-per-customer-state-store"));

        return stream;
    }
}

The aggregated information can easily be provided for the frontend by extending the KafkaStreamsService.java accordingly.

@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {

    // existing code omitted

    public @NonNull Long totalReservationsCount() {
        return getTotalReservationsCountFromStateStore();
    }

    private Long getTotalReservationsCountFromStateStore() {
        ReadOnlyKeyValueStore<String, Long> reservationsCountStateStore = interactiveQueryService
                .retrieveQueryableStore("reservations-count-state-store", QueryableStoreTypes.keyValueStore());
        Long totalReservationsCount = reservationsCountStateStore.get("total");
        return totalReservationsCount != null ? totalReservationsCount : 0;
    }

    public @NonNull List<@NonNull ReservationCountPerCustomer> reservationsCountPerCustomer() {
        return getReservationsCountPerCustomer();
    }

    private List<ReservationCountPerCustomer> getReservationsCountPerCustomer() {
        ReadOnlyKeyValueStore<String, Long> reservationsCountPerCustomerStateStore = interactiveQueryService
                .retrieveQueryableStore("reservations-count-per-customer-state-store",
                        QueryableStoreTypes.keyValueStore());
        List<ReservationCountPerCustomer> reservationsCountPerCustomer = new ArrayList<>();
        Iterator<KeyValue<String, Long>> iterator = reservationsCountPerCustomerStateStore.all();
        while (iterator.hasNext()) {
            KeyValue<String, Long> element = iterator.next();
            reservationsCountPerCustomer.add(new ReservationCountPerCustomer(element.key, element.value));
        }
        reservationsCountPerCustomer.sort((a, b) -> Long.compare(b.count(), a.count()));
        return reservationsCountPerCustomer;
    }
}

While getTotalReservationsCountFromStateStore only reads the value total from the corresponding state store, getReservationsCountPerCustomer reads the customers and their reservations from the corresponding state store and also sorts the result by the number of reservations.

Let’s have a look at the corresponding DashboardView that will call the new methods provided by the KafkaStreamsService.

export default function DashboardView() {
  const totalReservationsCount = useSignal(0);
  const reservationsCountPerCustomer = useSignal<ReservationCountPerCustomer[]>();

  useEffect(() => {
    KafkaStreamsService.totalReservationsCount().then((result) => (totalReservationsCount.value = result));
    KafkaStreamsService.reservationsCountPerCustomer().then((result) => (reservationsCountPerCustomer.value = result));
  }, []);

  return (
    <VerticalLayout theme="padding" className="h-full">
      <ViewToolbar title="Dashboard" />
      <HorizontalLayout theme="spacing">
        <Card cardTitle="Total Reservations" theme="elevated" className="self-start">
          <div className="text-center">
            <b>{totalReservationsCount}</b>
          </div>
        </Card>
        <Card cardTitle="Reservations per Customer" theme="elevated">
          <Grid items={reservationsCountPerCustomer.value}>
            <GridColumn path={'customer'} />
            <GridColumn path={'count'} />
          </Grid>
        </Card>
      </HorizontalLayout>
    </VerticalLayout>
  );
}

The view fetches the relevant data from the backend on its initial rendering. It uses the Card component to visualize the information.

Update dashboard automatically

Every time a new reservation message is available in the corresponding topic, the stream processing in our kStream bean will run, aggregate the data and update the existing local state stores. It would be nice if the dashboard would update itself accordingly without the need of a manual refresh in the browser.

The first blog post of this article series already showed, how we can create reactive browser callables that the frontend can subscribe to. We will use the same approach for the dashboard as well.

At first, we extend KafkaStreamsConfig.java to publish a new application event, every time we receive a new reservation message.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Bean
    public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
            StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    }

    @Bean
    public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
        KStream<String, Reservation> stream = streamsBuilder.stream("reservations");

        // materialize all reservations by key
        stream
                .groupByKey()
                .reduce((Reservation aggregate, Reservation newValue) -> newValue,
                        Materialized.as("all-reservations-state-store"));

        // materialize total number of reservations (count)
        stream
                .map((key, value) -> new KeyValue<>("total", value))
                .groupByKey()
                .count(Materialized.as("reservations-count-state-store"));

        // materialize total number of reservations per customer
        stream
                .groupBy((key, reservation) -> reservation.customer(), Grouped.with(null, null))
                .count(Materialized.as("reservations-count-per-customer-state-store"));

        // Publish event for each new reservation
        stream.peek((key, reservation) -> applicationEventPublisher.publishEvent(new NewReservationEvent()));

        return stream;
    }
}

The NewReservationEvent is just a simple Java Record without any fields. Our KafkaStreamsService will receive the NewReservationEvent application event and emits it to a sink. The sink will be provided as Flux for the frontend.

@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {

    private final KafkaStreamsInteractiveQueryService interactiveQueryService;
    private final Many<NewReservationEvent> newReservationEventSink;

    public KafkaStreamsService(KafkaStreamsInteractiveQueryService interactiveQueryService) {
        this.interactiveQueryService = interactiveQueryService;
        this.newReservationEventSink = Sinks.many().replay().latest();
    }

    // existing methods omitted

    @EventListener
    public void handleNewReservationEvent(NewReservationEvent newReservationEvent) {
        newReservationEventSink.tryEmitNext(newReservationEvent);
    }

    public Flux<@NonNull NewReservationEvent> newReservationEvent() {
        return newReservationEventSink.asFlux();
    }
}

Finally, the DashboardView can subscribe to the Flux and refresh its data every time a new reservation has been processed in the backend.

export default function DashboardView() {
  const totalReservationsCount = useSignal(0);
  const reservationsCountPerCustomer = useSignal<ReservationCountPerCustomer[]>();

  const refreshData = useCallback(() => {
    KafkaStreamsService.totalReservationsCount().then((result) => (totalReservationsCount.value = result));
    KafkaStreamsService.reservationsCountPerCustomer().then((result) => (reservationsCountPerCustomer.value = result));
  }, [totalReservationsCount, reservationsCountPerCustomer]);

  useEffect(() => {
    refreshData();
  }, []);

  useEffect(() => {
    const newReservationEventSubscription = KafkaStreamsService.newReservationEvent().onNext(
      (_newReservationEvent: NewReservationEvent) => setTimeout(refreshData, 500)
    );
    return () => newReservationEventSubscription.cancel();
  }, []);

  return (
    <VerticalLayout theme="padding" className="h-full">
      <ViewToolbar title="Dashboard" />
      <HorizontalLayout theme="spacing">
        <Card cardTitle="Total Reservations" theme="elevated" className="self-start">
          <div className="text-center">
            <b>{totalReservationsCount}</b>
          </div>
        </Card>
        <Card cardTitle="Reservations per Customer" theme="elevated">
          <Grid items={reservationsCountPerCustomer.value}>
            <GridColumn path={'customer'} />
            <GridColumn path={'count'} />
          </Grid>
        </Card>
      </HorizontalLayout>
    </VerticalLayout>
  );
}

Every time a NewReservationEvent has been received from the subscription, we wait for 500ms, because there can be a small delay between when the event is published from the stream and when the state store is fully updated and queryable. Adding a small delay is a practical way to ensure the frontend queries the state store after the update has been persisted.

Summary

Kafka Streams offer a large set of functionality for many different use cases. Not all of them are relevant for full-stack web development. Interactive Queries offer an interesting approach to consume and aggregate streaming data in real-time and to query this data from a local state store. The Spring Boot based backend of a Hilla app makes it easy to integrate Kafka Streams and Interactive Queries. Together with Hilla’s support for reactive programming, and it’s UI components, we can quickly create a useful frontend to visualize the materialized and aggregated data.

You can find the source code of the shown example Hilla app at GitHub.

Image Credits: