Artikelreihe
Dies ist der dritte Teil einer kleinen Artikelserie darüber, wie man Hilla-Anwendungen für Apache Kafka erstellt.
- Teil 1: Nachrichten konsumieren
- Teil 2: Nachrichten produzieren
- Teil 3: Kafka Streams
In diesem Blog-Post wird aufgezeigt, wie man Kafka Streams in Webanwendungen nutzen kann, die mit Hilla erstellt werden.
Kafka und Hilla-Projekt einrichten
Bitte lesen Sie den ersten Blog-Post dieser Artikelserie, um herauszufinden, wie man einen lokalen Kafka-Broker einrichtet und wie man ein neues Hilla-Projekt initialisiert.
Kafka Streams
Eine allgemeine Einführung in Kafka Streams würde den Rahmen dieses Blog-Posts sprengen. Stattdessen liegt der Fokus auf einer bestimmten Funktion, die bei der Entwicklung von Full-Stack-Webanwendungen mit Hilla und Kafka Streams nützlich sein kann.
Damit Kafka-Streams in einer Hilla-Anwendung verwendet werden kann, müssen die erforderlichen Abhängigkeiten in der pom.xml
hinzugefügt werden.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
Um Kafka-Streams nutzen zu können, müssen im Hilla-Backend einige Konfigurationen hinzufügt werden. Dafür könnte man die erforderlichen Beans erstellen, wie in der Dokumentation von Spring für Apache Kafka beschrieben, oder man kann die Autokonfigurationsfähigkeiten von Spring Boot nutzen (wie in der Spring Boot Dokumentation für Apache Kafka beschrieben), indem man die erforderliche Konfiguration in der application.properties
des Hilla-Backends bereitstellt.
# Set up Kafka Streams:
spring.kafka.streams.application-id=hilla-kafka-streams-example
spring.kafka.streams.bootstrap-servers=localhost:9092
spring.kafka.streams.client-id=reservation-client
spring.kafka.streams.properties[default.key.serde]=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.kafka.streams.properties[default.value.serde]=org.springframework.kafka.support.serializer.JsonSerde
spring.kafka.streams.properties[spring.json.value.default.type]=de.rwi.hillakafkastreamsexample.reservation.Reservation
Der Großteil der Konfiguration sollte selbsterklärend sein. Die empfangenen Nachrichten werden in das Reservation
-Modell deserialisiert. Zu diesem Zweck kann man den JsonSerde
Serializer verwenden, der von Spring bereitgestellt wird. Darüber hinaus wird Reservation
als Standardtyp für eingehende Nachrichten konfiguriert. Auf diese Weise muss man sich nicht um die Nachrichten-Header und deren Zuordnung kümmern.
Nun kann die KafkaStreamsConfig
erstellt werden, die für die Erstellung einer KStream
-Bean verantwortlich ist, die alle Nachrichten enthält, die im Topic reservations
empfangen werden.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
return streamsBuilder.stream("reservations");
}
}
Interactive Queries
Eine Funktion, die bei der Entwicklung von Webanwendungen auf Grundlage von Kafka Stream sehr nützlich ist, ist Interactive Queries. Interactive Queries ermöglichen es, den aktuellen Zustand einer Anwendung abzufragen. Dieser Zustand ist das Ergebnis einer (Stream-)Verarbeitung. Der vollständige Zustand ist in der Regel auf viele verteilte Instanzen der Anwendung und auf viele so genannte State Stores
verteilt, die lokal von diesen Anwendungsinstanzen verwaltet werden. Der Einfachheit halber wird im Folgenden davon ausgegangen, dass die Webanwendung nur eine Instanz hat und der gesamte relevante Zustand in einem lokalen State Store verfügbar ist. Die Abfrage von Remote-State-Stores für alle App-Instanzen wird in diesem Blog-Post nicht beschrieben (weitere Einzelheiten hierzu findet man in der entsprechenden Dokumentation).
Ein lokaler State Store kann als eine leichtgewichtige, eingebettete Datenbank betrachtet werden, die man abfragen kann und für die konfiguriert werden kann, welcher Zustand in einem State Store abgelegt werden soll.
Alle Reservierungen abfragen
Die Beispielanwendung, die in den ersten beiden Blog-Posts erstellt wurde, listet alle verfügbaren Reservierungen auf. Um dies zu erreichen, benötigt sie einen Listener auf das Topic reservations
, sie muss jede Reservierung lesen und sie in einer Senke im Arbeitsspeicher halten, um sie als Flux an einen Konsumenten zu liefern, der im vorliegenden Fall das Hilla-Frontend ist. Diese Vorgehensweise kann so angepasst werden, dass stattdessen alle Reservierungen aus dem lokalen State Store gelesen werden. Dies erfordert das Ablegen des Streams in einem lokalen State Store namens all-reservations-state-store
. Die Unterstützung von Spring für Interactive Queries basiert im Kern auf einer API namens KafkaStreamsInteractiveQueryService
, die eine Fassade für die Interactive Queries-APIs in der Kafka-Streams-Bibliothek darstellt. Die Hilla-Anwendung muss eine Instanz dieses Services als Bean erstellen, die den Abruf des State Stores anhand seines Namens ermöglicht.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Reservation> stream = streamsBuilder.stream("reservations");
// materialize all reservations by key
stream
.groupByKey()
.reduce((Reservation aggregate, Reservation newValue) -> newValue,
Materialized.as("all-reservations-state-store"));
return stream;
}
}
Anschließend kann man einen Browser Callable Service namens KafkaStreamsService
erstellen, der alle Reservierungen aus dem lokalen State Store liest und sie als Liste zurückgibt.
@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {
private final KafkaStreamsInteractiveQueryService interactiveQueryService;
public KafkaStreamsService(KafkaStreamsInteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
}
public @NonNull List<@NonNull Reservation> list(Pageable pageable) {
List<Reservation> reservations = getAllReservationsFromStateStore();
reservations = applySorting(pageable.getSort(), reservations);
reservations = applyPagination(pageable, reservations);
return reservations;
}
private List<Reservation> getAllReservationsFromStateStore() {
ReadOnlyKeyValueStore<String, Reservation> allReservationsStateStore = interactiveQueryService
.retrieveQueryableStore("all-reservations-state-store", QueryableStoreTypes.keyValueStore());
List<Reservation> allReservations = new ArrayList<>();
Iterator<KeyValue<String, Reservation>> iterator = allReservationsStateStore.all();
while (iterator.hasNext()) {
allReservations.add(iterator.next().value);
}
return allReservations;
}
private List<Reservation> applySorting(Sort sort, List<Reservation> reservations) {
if (sort != null && sort.isSorted()) {
reservations.sort((r1, r2) -> {
for (Sort.Order order : sort) {
try {
var field = Reservation.class.getDeclaredField(order.getProperty());
field.setAccessible(true);
Comparable<Object> v1 = (Comparable<Object>) field.get(r1);
Object v2 = field.get(r2);
int cmp = v1.compareTo(v2);
if (cmp != 0) {
return order.isAscending() ? cmp : -cmp;
}
} catch (Exception e) {
// Ignore and continue to next sort property
}
}
return 0;
});
}
return reservations;
}
private List<Reservation> applyPagination(Pageable pageable, List<Reservation> reservations) {
int pageSize = pageable.getPageSize();
int pageNumber = pageable.getPageNumber();
int fromIndex = Math.min(pageNumber * pageSize, reservations.size());
int toIndex = Math.min(fromIndex + pageSize, reservations.size());
return reservations.subList(fromIndex, toIndex);
}
}
Die Methode list
empfängt ein Pageable
-Objekt und verwendet es, um eine Sortierung und Paginierung auf die aus dem lokalen State Store gelesenen Reservierungen anzuwenden. Damit ist diese Methode eine perfekte Grundlage für die Verwendung einer Grid-Komponente im Hilla-Frontend.
export default function ReservationsView() {
const dataProvider = useDataProvider<Reservation>({
list: (pageable) => KafkaStreamsService.list(pageable),
});
return (
<VerticalLayout theme="padding" className="h-full">
<ViewToolbar title="Reservations" />
<Grid dataProvider={dataProvider.dataProvider}>
<GridSortColumn path="id" />
<GridSortColumn path="date" />
<GridSortColumn path="customer" />
</Grid>
</VerticalLayout>
);
}
Verglichen mit dem Ansatz, der auf einem KafkaListener
basiert, so wie er im ersten Blog-Post dieser Artikelserie beschrieben wurde, bietet eine Lösung, die auf Interactive Queries basiert, einige Vorteile:
- Der Zustand kann im Dateisystem persistiert werden, anstatt nur im Arbeitsspeicher.
- Interactive Queries ermöglichen es, den neuesten Zustand direkt aus dem State Store abzurufen, ohne diesen zuvor in einer externen Datenbank abzulegen und ihn von dort abfragen zu müssen.
- State Stores werden durch Kafka Changelogs abgesichert, so dass der Zustand nach Ausfällen wiederhergestellt werden kann.
Alle Reservierungen persistent und abfragbar zu halten, könnte auch mit einer traditionellen Datenbank erreicht werden. Das Schreiben von Reservierungen in eine Tabelle, sobald sie mit einem KafkaListener
empfangen wurden, ist natürlich eine valide Option und das Ergebnis (aus der Frontend/UI-Perspektive) könnte das Gleiche sein. Es wird höchstwahrscheinlich von den konkreten Anforderungen abhängen, welcher Ansatz am besten passt. Wenn man eine große Menge an Daten verarbeiten muss oder eine Art von Echtzeitverarbeitung realisieren möchte, könnte ein State-Store jedoch eine bessere Leistung bieten.
Kafka-Streams und Interactive Queries können aber auch in anderen Bereichen helfen, z. B. wenn man Aggregationen für die Daten benötigt, die mit Kafka verarbeitet werden.
Dashboard für Reservierungen
Eine typische Anforderung an Webanwendungen für Unternehmen sind Dashboards. Dashboards fassen normalerweise Daten zusammen, um einen schnellen Überblick zu geben. In der Beispielanwendung könnte man ein Dashboard hinzufügen, das z. B. die Gesamtzahl der Reservierungen und die Anzahl der Reservierungen pro Kunde anzeigt.
Man kann die bestehende Kafka-Streams-Verarbeitung in der Datei KafkaStreamsConfig.java
erweitern, um zwei weitere State Stores hinzuzufügen, die die Gesamtzahl der Reservierungen und die Gesamtzahl der Reservierungen pro Kunde enthalten.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Reservation> stream = streamsBuilder.stream("reservations");
// materialize all reservations by key
stream
.groupByKey()
.reduce((Reservation aggregate, Reservation newValue) -> newValue,
Materialized.as("all-reservations-state-store"));
// materialize total number of reservations (count)
stream
.map((key, value) -> new KeyValue<>("total", value))
.groupByKey()
.count(Materialized.as("reservations-count-state-store"));
// materialize total number of reservations per customer
stream
.groupBy((key, reservation) -> reservation.customer(), Grouped.with(null, null))
.count(Materialized.as("reservations-count-per-customer-state-store"));
return stream;
}
}
Die aggregierten Informationen können leicht für das Frontend bereitgestellt werden, indem die Datei KafkaStreamsService.java
entsprechend erweitert wird.
@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {
// existing code omitted
public @NonNull Long totalReservationsCount() {
return getTotalReservationsCountFromStateStore();
}
private Long getTotalReservationsCountFromStateStore() {
ReadOnlyKeyValueStore<String, Long> reservationsCountStateStore = interactiveQueryService
.retrieveQueryableStore("reservations-count-state-store", QueryableStoreTypes.keyValueStore());
Long totalReservationsCount = reservationsCountStateStore.get("total");
return totalReservationsCount != null ? totalReservationsCount : 0;
}
public @NonNull List<@NonNull ReservationCountPerCustomer> reservationsCountPerCustomer() {
return getReservationsCountPerCustomer();
}
private List<ReservationCountPerCustomer> getReservationsCountPerCustomer() {
ReadOnlyKeyValueStore<String, Long> reservationsCountPerCustomerStateStore = interactiveQueryService
.retrieveQueryableStore("reservations-count-per-customer-state-store",
QueryableStoreTypes.keyValueStore());
List<ReservationCountPerCustomer> reservationsCountPerCustomer = new ArrayList<>();
Iterator<KeyValue<String, Long>> iterator = reservationsCountPerCustomerStateStore.all();
while (iterator.hasNext()) {
KeyValue<String, Long> element = iterator.next();
reservationsCountPerCustomer.add(new ReservationCountPerCustomer(element.key, element.value));
}
reservationsCountPerCustomer.sort((a, b) -> Long.compare(b.count(), a.count()));
return reservationsCountPerCustomer;
}
}
Während getTotalReservationsCountFromStateStore
nur den Wert total
aus dem entsprechenden State Store liest, liest getReservationsCountPerCustomer
die Kunden und ihre Reservierungen aus dem entsprechenden State Store und sortiert das Ergebnis auch nach der Anzahl der Reservierungen.
Die vom KafkaStreamsService
bereitgestellten Methoden können nun in einer neuen DashboardView
aufgerufen werden.
export default function DashboardView() {
const totalReservationsCount = useSignal(0);
const reservationsCountPerCustomer = useSignal<ReservationCountPerCustomer[]>();
useEffect(() => {
KafkaStreamsService.totalReservationsCount().then((result) => (totalReservationsCount.value = result));
KafkaStreamsService.reservationsCountPerCustomer().then((result) => (reservationsCountPerCustomer.value = result));
}, []);
return (
<VerticalLayout theme="padding" className="h-full">
<ViewToolbar title="Dashboard" />
<HorizontalLayout theme="spacing">
<Card cardTitle="Total Reservations" theme="elevated" className="self-start">
<div className="text-center">
<b>{totalReservationsCount}</b>
</div>
</Card>
<Card cardTitle="Reservations per Customer" theme="elevated">
<Grid items={reservationsCountPerCustomer.value}>
<GridColumn path={'customer'} />
<GridColumn path={'count'} />
</Grid>
</Card>
</HorizontalLayout>
</VerticalLayout>
);
}
Die View lädt die relevanten Daten beim ersten Rendering aus dem Backend. Sie verwendet die Card-Komponente, um die Informationen zu visualisieren.
Dashboard automatisch aktualisieren
Jedes Mal, wenn eine neue Reservierung im entsprechenden Topic verfügbar ist, wird die Stream-Verarbeitung in der kStream
-Bean ausgeführt, die Daten werden aggregiert und die vorhandenen lokalen State Stores aktualisiert. Es wäre schön, wenn sich das Dashboard in diesem Zusammenhang automatisch aktualisieren würde, ohne dass eine manuelle Aktualisierung im Browser erforderlich ist.
Im ersten Blog-Post dieser Artikelserie wurde bereits gezeigt, wie man reaktive Browser Callable Services erstellen kann, die das Frontend abonnieren kann. Der gleiche Ansatz wird nun auch für das Dashboard verwendet.
Zunächst wird KafkaStreamsConfig.java
so erweitert, dass jedes Mal, wenn eine neue Reservierung empfangen wird, ein neues Application Event veröffentlicht wird.
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(
StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
return new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
}
@Bean
public KStream<String, Reservation> kStream(StreamsBuilder streamsBuilder) {
KStream<String, Reservation> stream = streamsBuilder.stream("reservations");
// materialize all reservations by key
stream
.groupByKey()
.reduce((Reservation aggregate, Reservation newValue) -> newValue,
Materialized.as("all-reservations-state-store"));
// materialize total number of reservations (count)
stream
.map((key, value) -> new KeyValue<>("total", value))
.groupByKey()
.count(Materialized.as("reservations-count-state-store"));
// materialize total number of reservations per customer
stream
.groupBy((key, reservation) -> reservation.customer(), Grouped.with(null, null))
.count(Materialized.as("reservations-count-per-customer-state-store"));
// Publish event for each new reservation
stream.peek((key, reservation) -> applicationEventPublisher.publishEvent(new NewReservationEvent()));
return stream;
}
}
Das NewReservationEvent
ist ein einfacher Java-Record ohne Felder. Der KafkaStreamsService
empfängt das Application Event NewReservationEvent
und gibt es an eine Senke weiter. Die Senke wird als Flux für das Frontend bereitgestellt.
@BrowserCallable
@AnonymousAllowed
public class KafkaStreamsService {
private final KafkaStreamsInteractiveQueryService interactiveQueryService;
private final Many<NewReservationEvent> newReservationEventSink;
public KafkaStreamsService(KafkaStreamsInteractiveQueryService interactiveQueryService) {
this.interactiveQueryService = interactiveQueryService;
this.newReservationEventSink = Sinks.many().replay().latest();
}
// existing methods omitted
@EventListener
public void handleNewReservationEvent(NewReservationEvent newReservationEvent) {
newReservationEventSink.tryEmitNext(newReservationEvent);
}
public Flux<@NonNull NewReservationEvent> newReservationEvent() {
return newReservationEventSink.asFlux();
}
}
Anschließend kann die DashboardView
den Flux abonnieren und ihre Daten jedes Mal aktualisieren, wenn eine neue Reservierung im Backend verarbeitet wurde.
export default function DashboardView() {
const totalReservationsCount = useSignal(0);
const reservationsCountPerCustomer = useSignal<ReservationCountPerCustomer[]>();
const refreshData = useCallback(() => {
KafkaStreamsService.totalReservationsCount().then((result) => (totalReservationsCount.value = result));
KafkaStreamsService.reservationsCountPerCustomer().then((result) => (reservationsCountPerCustomer.value = result));
}, [totalReservationsCount, reservationsCountPerCustomer]);
useEffect(() => {
refreshData();
}, []);
useEffect(() => {
const newReservationEventSubscription = KafkaStreamsService.newReservationEvent().onNext(
(_newReservationEvent: NewReservationEvent) => setTimeout(refreshData, 500)
);
return () => newReservationEventSubscription.cancel();
}, []);
return (
<VerticalLayout theme="padding" className="h-full">
<ViewToolbar title="Dashboard" />
<HorizontalLayout theme="spacing">
<Card cardTitle="Total Reservations" theme="elevated" className="self-start">
<div className="text-center">
<b>{totalReservationsCount}</b>
</div>
</Card>
<Card cardTitle="Reservations per Customer" theme="elevated">
<Grid items={reservationsCountPerCustomer.value}>
<GridColumn path={'customer'} />
<GridColumn path={'count'} />
</Grid>
</Card>
</HorizontalLayout>
</VerticalLayout>
);
}
Jedes Mal, wenn ein NewReservationEvent
von der Subscription empfangen wurde, wird für 500ms gewartet, da es eine kleine Verzögerung zwischen der Veröffentlichung des Events aus dem Stream und der vollständigen Aktualisierung und Abfragbarkeit des State Store geben kann. Das Hinzufügen einer kleinen Verzögerung ist ein praktischer Weg, um sicherzustellen, dass das Frontend den State Store abfragt, nachdem die Aktualisierung persistiert wurde.
Fazit
Kafka-Streams bieten eine große Anzahl von Funktionen für viele verschiedene Anwendungsfälle. Nicht alle davon sind für die Full-Stack-Webentwicklung relevant. Interactive Queries bieten einen interessanten Ansatz, um Streaming-Daten in Echtzeit zu konsumieren und zu aggregieren und diese Daten von einem lokalen State Store abzufragen. Das Spring Boot basierte Backend einer Hilla-Anwendung macht es einfach, Kafka Streams und Interactive Queries zu integrieren. Zusammen mit Hillas Unterstützung für reaktive Programmierung und seinen UI-Komponenten kann man schnell ein nützliches Frontend zur Visualisierung der abgelegten und aggregierten Daten erstellen.
Den Quellcode der gezeigten Beispiel-Hilla-App findet man bei GitHub.
Bildnachweis:
- Cover-Bild Quelle: https://www.flickr.com/photos/mgaylard/54107308934
- Cover-Bild Lizenz: https://creativecommons.org/licenses/by/2.0/deed.en