Article series
This is the third part of a small article series about how to build Hilla web apps for Apache Kafka.
- Part 1: Consuming messages
- Part 2: Producing messages
- Part 3: Kafka Streams
In this blog post, we will learn how to utilize Kafka Streams in web apps build with Hilla.
Setup Kafka and Hilla project
Please read the first blog post of this article series to find out how to set up a local Kafka Broker and how to initialize a new Hilla project.
Kafka Streams
A general introduction to Kafka Streams is out of scope for this blog post. Instead, we will focus on a specific features that can be useful when building full-stack web apps using Hilla and Kafka Streams.
To enable Kafka Stream support in our Hilla application, we need to provide the required dependencies in our pom.xml
.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
In order to be able to use Kafka Streams, we need to add some configuration to our Hilla backend. We could create the required beans as described in the Spring for Apache Kafka documentation, or we could use Spring Boot’s auto-configuration capabilities (as described in the Spring Boot documentation for Apache Kafka) by providing the required configuration in the application.properties
of our Hilla backend.
# Set up Kafka Streams:
spring.kafka.streams.application-id=hilla-kafka-streams-example
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.client-id=reservation-client
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties[default.value.serde]=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.properties[spring.json.value.default.type]=de.rwi.hillakafkastreamsexample.reservation.Reservation
Most of the configuration should be self-explaining. When receiving messages, we want to deserialize the message to our Reservation
model. For this purpose, we can use the JsonSerde
serializer that is provided by Spring. In addition, we configure our Reservation
record to be the default type for incoming messages. This way we don’t have to care about message headers and their mapping.
Now, we create a KafkaStreamsConfig
that will be responsible for creating a KStream
bean that contains all messages that will be received in the reservations
topic.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream("reservations");
}
}
Interactive Queries
One feature that is very useful when building web apps based on Kafka Stream is Interactive Queries. Interactive Queries help us to query the latest state of our application - that is, the state that is the result of some kind of (stream) processing. The full state is typically split across many distributed instances of our app, and across many so called state stores
that are managed locally by these application instances. For simplicity, we assume that our web app has just one instance and all the relevant state is available in one local state store. Querying remote state stores for all app instances is out of scope here (please take a look at the corresponding documentation for more details about this).
A local state store can be seen as a lightweight embedded database, which we can query, and we can configure which state should be materialized in a state store.
Query all reservations
The example application, we created in the first two blog posts, lists all the available reservations. To achieve this it has to listen to the topic reservations
, read every reservation and hold it in-memory in a sink to provide it as a Flux to a consumer, which is the Hilla frontend in our case. We can change this to read all reservations from the local state store instead. This requires the materialization of the stream into a local state store called all-reservations-state-store
. Spring’s Interactive Query support is centered around an API called KafkaStreamsInteractiveQueryService
which is a facade around interactive queries APIs in the Kafka Streams library. Our Hilla app needs to create an instance of this service as a bean that allows the retrieval of the state store by its name.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Reservation> stream = streamsBuilder.stream("reservations");
// materialize all reservations by key
stream
.groupByKey()
.reduce((Reservation aggregate, Reservation newValue) -> newValue,
Materialized.as("all-reservations-state-store"));
return stream;
}
}
We can now create a browser-callable service called KafkaStreamsService
that will read all reservations from the local state store and will return them as a list.
@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {
private final KafkaStreamsInteractiveQueryService interactiveQueryService;
public KafkaStreamsService(KafkaStreamsInteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
}
public @NonNull List<@NonNull Reservation> list(Pageable pageable) {
List<Reservation> reservations = getAllReservationsFromStateStore();
reservations = applySorting(pageable.getSort(), reservations);
reservations = applyPagination(pageable, reservations);
return reservations;
}
private List<Reservation> getAllReservationsFromStateStore() {
ReadOnlyKeyValueStore<String, Reservation> allReservationsStateStore = interactiveQueryService
.retrieveQueryableStore("all-reservations-state-store", QueryableStoreTypes.keyValueStore());
List<Reservation> allReservations = new ArrayList<>();
Iterator<KeyValue<String, Reservation>> iterator = allReservationsStateStore.all();
while (iterator.hasNext()) {
allReservations.add(iterator.next().value);
}
return allReservations;
}
private List<Reservation> applySorting(Sort sort, List<Reservation> reservations) {
if (sort != null && sort.isSorted()) {
reservations.sort((r1, r2) -> {
for (Sort.Order order : sort) {
try {
var field = Reservation.class.getDeclaredField(order.getProperty());
field.setAccessible(true);
Comparable<Object> v1 = (Comparable<Object>) field.get(r1);
Object v2 = field.get(r2);
int cmp = v1.compareTo(v2);
if (cmp != 0) {
return order.isAscending() ? cmp : -cmp;
}
} catch (Exception e) {
// Ignore and continue to next sort property
}
}
return 0;
});
}
return reservations;
}
private List<Reservation> applyPagination(Pageable pageable, List<Reservation> reservations) {
int pageSize = pageable.getPageSize();
int pageNumber = pageable.getPageNumber();
int fromIndex = Math.min(pageNumber * pageSize, reservations.size());
int toIndex = Math.min(fromIndex + pageSize, reservations.size());
return reservations.subList(fromIndex, toIndex);
}
}
The list
method receives a Pageable
object and uses it to apply a sorting and pagination on the reservations read from the local state store. This makes this method a perfect match for a Grid component in the Hilla frontend.
export default function ReservationsView() {
const dataProvider = useDataProvider<Reservation>({
list: (pageable) => KafkaStreamsService.list(pageable),
});
return (
<VerticalLayout theme="padding" className="h-full">
<ViewToolbar title="Reservations" />
<Grid dataProvider={dataProvider.dataProvider}>
<GridSortColumn path="id" />
<GridSortColumn path="date" />
<GridSortColumn path="customer" />
</Grid>
</VerticalLayout>
);
}
Compared to the approach based on a KafkaListener
, as described in the first blog post of this article series, a solution based on Interactive Queries offers a few advantages:
- State can be persisted in state stores in the file system instead of in-memory.
- Interactive Queries allow us to fetch the latest state directly from the state store, without persisting it in and querying it from an external database.
- State stores are backed by Kafka changelogs, so state can be recovered after failures.
Keeping all reservations persisted and queryable could also be achieved with a traditional database. Writing reservations into a table once they have been received using a KafkaListener
, is of course a valid option and the result (from a frontend/UI perspective) might be the same. It will most likely depend on the concrete requirements, which approach fits best. When you have to process a large amount of data, or you may have some kind of real-time processing, a state store might offer a better performance.
But Kafka Streams and Interactive Queries can also help you in other areas, for example when you need aggregations on the data you process with Kafka.
Reservation dashboard
A typical requirement for business web apps are dashboards. Dashboards usually aggregate some kind of data to give a quick overview. In our sample application, we could add a dashboard that shows a total number of reservations and the number of reservations per customer, for example.
We can extend the existing Kafka Streams processing in our KafkaStreamsConfig.java
to add two more state stores, that materialize the total number of reservations and the total number of reservations per customer.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Reservation> stream = streamsBuilder.stream("reservations");
// materialize all reservations by key
stream
.groupByKey()
.reduce((Reservation aggregate, Reservation newValue) -> newValue,
Materialized.as("all-reservations-state-store"));
// materialize total number of reservations (count)
stream
.map((key, value) -> new KeyValue<>("total", value))
.groupByKey()
.count(Materialized.as("reservations-count-state-store"));
// materialize total number of reservations per customer
stream
.groupBy((key, reservation) -> reservation.customer(), Grouped.with(null, null))
.count(Materialized.as("reservations-count-per-customer-state-store"));
return stream;
}
}
The aggregated information can easily be provided for the frontend by extending the KafkaStreamsService.java
accordingly.
@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {
// existing code omitted
public @NonNull Long totalReservationsCount() {
return getTotalReservationsCountFromStateStore();
}
private Long getTotalReservationsCountFromStateStore() {
ReadOnlyKeyValueStore<String, Long> reservationsCountStateStore = interactiveQueryService
.retrieveQueryableStore("reservations-count-state-store", QueryableStoreTypes.keyValueStore());
Long totalReservationsCount = reservationsCountStateStore.get("total");
return totalReservationsCount != null ? totalReservationsCount : 0;
}
public @NonNull List<@NonNull ReservationCountPerCustomer> reservationsCountPerCustomer() {
return getReservationsCountPerCustomer();
}
private List<ReservationCountPerCustomer> getReservationsCountPerCustomer() {
ReadOnlyKeyValueStore<String, Long> reservationsCountPerCustomerStateStore = interactiveQueryService
.retrieveQueryableStore("reservations-count-per-customer-state-store",
QueryableStoreTypes.keyValueStore());
List<ReservationCountPerCustomer> reservationsCountPerCustomer = new ArrayList<>();
Iterator<KeyValue<String, Long>> iterator = reservationsCountPerCustomerStateStore.all();
while (iterator.hasNext()) {
KeyValue<String, Long> element = iterator.next();
reservationsCountPerCustomer.add(new ReservationCountPerCustomer(element.key, element.value));
}
reservationsCountPerCustomer.sort((a, b) -> Long.compare(b.count(), a.count()));
return reservationsCountPerCustomer;
}
}
While getTotalReservationsCountFromStateStore
only reads the value total
from the corresponding state store, getReservationsCountPerCustomer
reads the customers and their reservations from the corresponding state store and also sorts the result by the number of reservations.
Let’s have a look at the corresponding DashboardView
that will call the new methods provided by the KafkaStreamsService
.
export default function DashboardView() {
const totalReservationsCount = useSignal(0);
const reservationsCountPerCustomer = useSignal<ReservationCountPerCustomer[]>();
useEffect(() => {
KafkaStreamsService.totalReservationsCount().then((result) => (totalReservationsCount.value = result));
KafkaStreamsService.reservationsCountPerCustomer().then((result) => (reservationsCountPerCustomer.value = result));
}, []);
return (
<VerticalLayout theme="padding" className="h-full">
<ViewToolbar title="Dashboard" />
<HorizontalLayout theme="spacing">
<Card cardTitle="Total Reservations" theme="elevated" className="self-start">
<div className="text-center">
<b>{totalReservationsCount}</b>
</div>
</Card>
<Card cardTitle="Reservations per Customer" theme="elevated">
<Grid items={reservationsCountPerCustomer.value}>
<GridColumn path={'customer'} />
<GridColumn path={'count'} />
</Grid>
</Card>
</HorizontalLayout>
</VerticalLayout>
);
}
The view fetches the relevant data from the backend on its initial rendering. It uses the Card component to visualize the information.
Update dashboard automatically
Every time a new reservation message is available in the corresponding topic, the stream processing in our kStream
bean will run, aggregate the data and update the existing local state stores. It would be nice if the dashboard would update itself accordingly without the need of a manual refresh in the browser.
The first blog post of this article series already showed, how we can create reactive browser callables that the frontend can subscribe to. We will use the same approach for the dashboard as well.
At first, we extend KafkaStreamsConfig.java
to publish a new application event, every time we receive a new reservation message.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Reservation> stream = streamsBuilder.stream("reservations");
// materialize all reservations by key
stream
.groupByKey()
.reduce((Reservation aggregate, Reservation newValue) -> newValue,
Materialized.as("all-reservations-state-store"));
// materialize total number of reservations (count)
stream
.map((key, value) -> new KeyValue<>("total", value))
.groupByKey()
.count(Materialized.as("reservations-count-state-store"));
// materialize total number of reservations per customer
stream
.groupBy((key, reservation) -> reservation.customer(), Grouped.with(null, null))
.count(Materialized.as("reservations-count-per-customer-state-store"));
// Publish event for each new reservation
stream.peek((key, reservation) -> applicationEventPublisher.publishEvent(new NewReservationEvent()));
return stream;
}
}
The NewReservationEvent
is just a simple Java Record without any fields. Our KafkaStreamsService
will receive the NewReservationEvent
application event and emits it to a sink. The sink will be provided as Flux for the frontend.
@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {
private final KafkaStreamsInteractiveQueryService interactiveQueryService;
private final Many<NewReservationEvent> newReservationEventSink;
public KafkaStreamsService(KafkaStreamsInteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
this.newReservationEventSink = Sinks.many().replay().latest();
}
// existing methods omitted
@EventListener
public void handleNewReservationEvent(NewReservationEvent newReservationEvent) {
newReservationEventSink.tryEmitNext(newReservationEvent);
}
public Flux<@NonNull NewReservationEvent> newReservationEvent() {
return newReservationEventSink.asFlux();
}
}
Finally, the DashboardView
can subscribe to the Flux and refresh its data every time a new reservation has been processed in the backend.
export default function DashboardView() {
const totalReservationsCount = useSignal(0);
const reservationsCountPerCustomer = useSignal<ReservationCountPerCustomer[]>();
const refreshData = useCallback(() => {
KafkaStreamsService.totalReservationsCount().then((result) => (totalReservationsCount.value = result));
KafkaStreamsService.reservationsCountPerCustomer().then((result) => (reservationsCountPerCustomer.value = result));
}, [totalReservationsCount, reservationsCountPerCustomer]);
useEffect(() => {
refreshData();
}, []);
useEffect(() => {
const newReservationEventSubscription = KafkaStreamsService.newReservationEvent().onNext(
(_newReservationEvent: NewReservationEvent) => setTimeout(refreshData, 500)
);
return () => newReservationEventSubscription.cancel();
}, []);
return (
<VerticalLayout theme="padding" className="h-full">
<ViewToolbar title="Dashboard" />
<HorizontalLayout theme="spacing">
<Card cardTitle="Total Reservations" theme="elevated" className="self-start">
<div className="text-center">
<b>{totalReservationsCount}</b>
</div>
</Card>
<Card cardTitle="Reservations per Customer" theme="elevated">
<Grid items={reservationsCountPerCustomer.value}>
<GridColumn path={'customer'} />
<GridColumn path={'count'} />
</Grid>
</Card>
</HorizontalLayout>
</VerticalLayout>
);
}
Every time a NewReservationEvent
has been received from the subscription, we wait for 500ms, because there can be a small delay between when the event is published from the stream and when the state store is fully updated and queryable. Adding a small delay is a practical way to ensure the frontend queries the state store after the update has been persisted.
Summary
Kafka Streams offer a large set of functionality for many different use cases. Not all of them are relevant for full-stack web development. Interactive Queries offer an interesting approach to consume and aggregate streaming data in real-time and to query this data from a local state store. The Spring Boot based backend of a Hilla app makes it easy to integrate Kafka Streams and Interactive Queries. Together with Hilla’s support for reactive programming, and it’s UI components, we can quickly create a useful frontend to visualize the materialized and aggregated data.
You can find the source code of the shown example Hilla app at GitHub.
Image Credits:
- Cover image source: https://www.flickr.com/photos/mgaylard/54107308934
- Cover image license: https://creativecommons.org/licenses/by/2.0/deed.en