Hilla und Kafka. Teil 1: Nachrichten konsumieren

René Wilby | 21.06.2025 Min. Lesezeit

Artikelreihe

Dies ist der erste Teil einer kleinen Artikelserie darüber, wie man Hilla-Anwendungen für Apache Kafka erstellt.

In diesem Blog-Post wird aufgezeigt, wie man Kafka-Nachrichten mit Spring Kafka im Hilla-Backend konsumiert und wie man diese Nachrichten im Hilla-Frontend auf reaktive Weise anzeigt.

Kafka einrichten

Das Einrichten eines Kafka-Brokers würde den Rahmen dieses Artikels sprengen. In diesem Artikel verwenden wir einen einfachen Kafka Docker-Container mit einer minimalen Standardkonfiguration. Man kann die folgende Docker Compose-Datei verwenden, um den Docker-Container mit Kafka zu starten und ein Topic zu erstellen.

name: hilla-kafka-example

services:
  broker:
    image: apache/kafka:latest
    hostname: broker
    container_name: broker
    ports:
      - 9092:9092
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_LISTENERS: EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:19092,CONTROLLER://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:9092,INTERNAL://broker:19092,CONTROLLER://broker:9093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_NUM_PARTITIONS: 1

  topic-creator:
    image: apache/kafka:latest
    container_name: topic-creator
    depends_on:
      broker:
        condition: service_started
    volumes:
      - ./kafka-init.sh:/tmp/kafka-init.sh
    command: ['sh', '-c', '/tmp/kafka-init.sh']

Die Docker Compose-Datei kann mit dem folgendem Befehl verwendet werden.

docker compose -f docker/docker-compose.yaml up|down

In der offiziellen Dokumentation des Kafka Docker-Images, kann man weitere Informationen über das Image und seine Konfiguration erhalten.

Das Skript kafka-init.sh erstellt automatisch ein Topic mit der Bezeichnung reservations in dem lokalen Kafka-Broker, das man später verwenden werden, um Nachrichten von ihm zu konsumieren.

#!/bin/bash
BROKER="broker:19092"

echo "Creating topic..."

/opt/kafka/bin/kafka-topics.sh --if-not-exists --create --topic reservations --bootstrap-server "$BROKER" --partitions 1 --replication-factor 1

echo "Topic created successfully!"

Man kann das Topic auch mit einer speziellen NewTopic Bean erstellen, so wie zum Beispiel in der Spring Boot Dokumentation beschrieben ist.

Neues Hilla-Projekt

Man kann ein neues Hilla-Projekt beispielsweise über start.vaadin.com oder mit npx create-vaadin erstellen.

Kafka-Nachrichten im Hilla-Backend konsumieren

Um Kafka-Nachrichten von dem Topic reservations im Backend der Hilla-Anwendung zu konsumieren, kommt hier Spring Kafka zum Einsatz. Daher muss die erforderliche Dependency in der pom.xml hinzugefügt werden.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Als Nächstes benötigt man ein Datenmodell, das die Reservierungen darstellt, die empfangen werden.

package de.rwi.hillakafkaexample.reservation;

import java.time.LocalDate;
import jakarta.validation.constraints.NotBlank;
import jakarta.validation.constraints.NotNull;

public record Reservation(@NotBlank String id, @NotNull LocalDate date, @NotBlank String customer) { }

Um Kafka-Nachrichten konsumieren zu können, müssen im Hilla-Backend einige Konfigurationen hinzufügt werden. Dafür könnte man die erforderlichen Beans erstellen, wie in der Dokumentation von Spring für Apache Kafka beschrieben, oder man kann die Autokonfigurationsfähigkeiten von Spring Boot nutzen (wie in der Spring Boot Dokumentation für Apache Kafka beschrieben), indem man die erforderliche Konfiguration in der application.properties des Hilla-Backends bereitstellt.

# Set up Kafka:
spring.kafka.bootstrap-servers=localhost:9092

# Configure the consumer:
spring.kafka.consumer.client-id=reservation-consumer-client
spring.kafka.consumer.group-id=reservation-consumer-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=de.rwi.hillakafkaexample.reservation.Reservation

Der Großteil der Konfiguration sollte selbsterklärend sein. Die empfangenen Nachrichten werden in das Reservation-Modell deserialisiert. Zu diesem Zweck kann man den JsonDeserializer verwenden, der von Spring bereitgestellt wird. Darüber hinaus wird Reservation als Standardtyp für eingehende Nachrichten konfiguriert. Auf diese Weise muss man sich nicht um die Nachrichten-Header und deren Zuordnung kümmern.

Nun kann der KafkaConsumerService erstellt werden, der für den Empfang der Kafka-Nachrichten und die Bereitstellung der daraus resultierenden Reservierungen zuständig ist.

package de.rwi.hillakafkaexample.kafka;

import org.jspecify.annotations.NonNull;
import org.springframework.kafka.annotation.KafkaListener;
import de.rwi.hillakafkaexample.reservation.Reservation;
import com.vaadin.flow.server.auth.AnonymousAllowed;
import com.vaadin.hilla.BrowserCallable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;

@BrowserCallable
@AnonymousAllowed
public class KafkaConsumerService {

    private final Many<Reservation> reservationSink;

    public KafkaConsumerService() {
        this.reservationSink = Sinks.many().replay().all();
    }

    @KafkaListener(topics = "reservations", groupId = "reservation-consumer-group")
    private void consume(Reservation reservation) {
        reservationSink.emitNext(reservation, (signalType, emitResult) -> emitResult == EmitResult.FAIL_NON_SERIALIZED);
    }

    public Flux<@NonNull Reservation> getLatestReservation() {
        return reservationSink.asFlux();
    }
}

In diesem Beispiel kommt die @KafkaListener Annotation als einfacher Nachrichten-Listener zum Einsatz. Jede empfangene Nachricht in dem Topic reservations wird in eine Reservation deserialisiert und an eine passende Sink übergeben. Die Senke ist so konfiguriert, dass sie sich alle Reservierungen merkt und alle Elemente, die an diese Senke gesendet werden, an neue Abonnenten weiterleitet. Um dem Hilla-Frontend zu ermöglichen, die Senke zu abonnieren und Updates zu erhalten, wenn neue Nachrichten empfangen werden, muss die Senke in einen Flux umgewandelt werden. Dies erfolgt in der Methode getLatestReservation. Hillas @BrowserCallable Annotation kümmert sich um den Rest. Sie erstellt automatisch einen reaktiven Browser Callable Service, um Nachrichten vom Backend zum Frontend zu pushen, wie in der entsprechenden Anleitung in der Doku beschrieben.

Kafka-Nachrichten im Hilla-Frontend anzeigen

Im nächsten Schritt kann nun eine einfache React-View im Hilla-Frontend erstellt werden, um den Flux zu abonnieren, der von der Methode getLatestReservation zurückgegeben wird.

import { useSignal } from '@vaadin/hilla-react-signals';
import { Grid, GridColumn, VerticalLayout } from '@vaadin/react-components';
import { ViewToolbar } from 'Frontend/components/ViewToolbar';
import Reservation from 'Frontend/generated/de/rwi/hillakafkaexample/reservation/Reservation';
import { KafkaConsumerService } from 'Frontend/generated/endpoints';
import { useEffect } from 'react';

export default function ReservationsView() {
  const reservations = useSignal<Reservation[]>();

  useEffect(() => {
    const reservationSubscription = KafkaConsumerService.getLatestReservation().onNext((reservation: Reservation) => {
      reservations.value = [...(reservations.value ?? []), reservation];
    });
    return () => reservationSubscription.cancel();
  }, []);

  return (
    <VerticalLayout theme="margin">
      <ViewToolbar title="Reservations" />
      <Grid items={reservations.value}>
        <GridColumn path={'id'} />
        <GridColumn path={'date'} />
        <GridColumn path={'customer'} />
      </Grid>
    </VerticalLayout>
  );
}

Die Erstellung der Subscription für den Flux erfolgt in einem useEffect Hook. Jede empfangene Reservierung wird dem lokalen Zustand reservations hinzugefügt, der von der Grid-Komponente verwendet wird, um alle verfügbaren Reservierungen anzuzeigen.

Die Clean-Up-Funktion des useEffect Hooks ruft die cancel-Funktion der Subscription auf, sobald die View aus dem DOM entfernt wurde.

Der Subscription-Mechanismus kann noch erweitert werden, indem zum Beispiel eine einfache Fehlerbehandlung ergänzt wird.

import { ActionOnLostSubscription } from '@vaadin/hilla-frontend';
import { useSignal } from '@vaadin/hilla-react-signals';
import {
  Button,
  Grid,
  GridColumn,
  HorizontalLayout,
  Icon,
  Notification,
  VerticalLayout,
} from '@vaadin/react-components';
import { ViewToolbar } from 'Frontend/components/ViewToolbar';
import Reservation from 'Frontend/generated/de/rwi/hillakafkaexample/reservation/Reservation';
import { KafkaConsumerService } from 'Frontend/generated/endpoints';
import { useEffect } from 'react';

export default function ReservationsView() {
  const reservations = useSignal<Reservation[]>();
  const subscriptionNotificationOpened = useSignal<boolean>(false);
  const subscriptionNotificationMessage = useSignal<string | undefined>(undefined);

  useEffect(() => {
    const reservationSubscription = KafkaConsumerService.getLatestReservation()
      .onNext((reservation: Reservation) => {
        reservations.value = [...(reservations.value ?? []), reservation];
      })
      .onError((message: string) => {
        subscriptionNotificationOpened.value = true;
        subscriptionNotificationMessage.value = message;
      })
      .onSubscriptionLost(() => ActionOnLostSubscription.RESUBSCRIBE);
    return () => reservationSubscription.cancel();
  }, []);

  const closeSubscriptionNotification = () => {
    subscriptionNotificationOpened.value = false;
    subscriptionNotificationMessage.value = undefined;
  };

  const retrySubscription = () => {
    window.location.reload();
  };

  return (
    <VerticalLayout theme="margin">
      <ViewToolbar title="Reservations" />
      <Grid items={reservations.value}>
        <GridColumn path={'id'} />
        <GridColumn path={'date'} />
        <GridColumn path={'customer'} />
      </Grid>
      <Notification
        theme="warning"
        duration={0}
        position="middle"
        opened={subscriptionNotificationOpened.value}
        onOpenedChanged={(event) => {
          subscriptionNotificationOpened.value = event.detail.value;
        }}>
        <HorizontalLayout theme="spacing" style={{ alignItems: 'center' }}>
          <div>{subscriptionNotificationMessage.value ?? 'Failed to subscribe to reactive reservation endpoint'}</div>
          <Button theme="tertiary-inline" style={{ marginLeft: 'var(--lumo-space-xl)' }} onClick={retrySubscription}>
            Retry
          </Button>
          <Button theme="tertiary-inline icon" onClick={closeSubscriptionNotification} aria-label="Close">
            <Icon icon="lumo:cross" />
          </Button>
        </HorizontalLayout>
      </Notification>
    </VerticalLayout>
  );
}

Im Falle eines Subscription-Fehlers wird die Fehlermeldung gespeichert und ein Dialog geöffnet, um den Benutzer über den Fehler zu informieren. Zudem wird auch ein einfacher Wiederholungsmechanismus implementiert, der die Seite neu lädt, um eine neue Subscription zu erzeugen, wenn diese zuvor verloren gegangen ist.

Im Falle einer verlorenen Subscription (z.B. bei einer kurzen Netzwerkunterbrechung) wird Hilla sich erneut anmleden, indem es die gleiche Server-Methode erneut aufruft. Weitere Informationen über diese Art der Fehlerbehandlung gibt es ebenfalls in der Dokumentation.

Manueller Test

Nun ist es an der Zeit, alles zusammenzufügen und die Hilla-Anwendung zu testen. Der Kafka-Broker sollte (weitenhin) ausgeführt werden, die Hilla-Anwendung muss gestartet werden (zum Beispiel mit ./mvnw spring-boot:run) und die URL http://localhost:8080 muss im Browser geöffnet werden.

Der nächste Schritt besteht darin, einige Kafka-Nachrichten in dem Topic reservations zu erstellen. Eine Möglichkeit, dies zu tun, ist die Verwendung der Kafka-CLI. Sie ist Teil der Kafka-Binaries, die hier heruntergeladen werden können. Nachdem diese heruntergeladen und extrahiert wurden, kann man das Skript kafka-console-producer.sh aufrufen und dem Topic Nachrichten wie folgt hinzufügen:

./bin/kafka-console-producer.sh --topic reservations --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=="
>R00000001={"id":"R00000001", "date": "2025-06-16", "customer":"Leif"}
>R00000002={"id":"R00000002", "date": "2025-06-17", "customer":"Miikka"}
>R00000003={"id":"R00000003", "date": "2025-06-18", "customer":"Marcus"}
>R00000004={"id":"R00000004", "date": "2025-06-19", "customer":"Sami"}
>R00000005={"id":"R00000005", "date": "2025-06-20", "customer":"Matti"}
>R00000006={"id":"R00000006", "date": "2025-06-21", "customer":"Rolf"}
>R00000007={"id":"R00000007", "date": "2025-06-22", "customer":"Petter"}
>R00000008={"id":"R00000008", "date": "2025-06-23", "customer":"Seb"}
>R00000009={"id":"R00000009", "date": "2025-06-24", "customer":"Joonas"}
>R00000010={"id":"R00000010", "date": "2025-06-25", "customer":"Vesa"}

Jede Nachricht, die man dem Topic hinzufügt, sollte sofort in der Grid-Komponente in der ReserverationsView erscheinen.

Reservierungen

Fazit

Durch die Verwendung von Spring Kafka und den Autokonfigurationsfähigkeiten von Spring Boot ist es einfach, ein Kafka-Topic zu abonnieren. Die Anzeige der Nachrichten des Topics in einer Benutzeroberfläche ist dank Hillas hervorragender Unterstützung für reaktive Browser Callable Services und dem gezeigten Subscription-Mechanismus im Frontend ebenfalls einfach.

Der nächste Artikel dieser Serie wird zeigen, wie man eine neue Kafka-Nachricht aus einer Benutzereingabe in einem Hilla-Frontend erstellt.

Den Quellcode der gezeigten Beispiel-Hilla-App findet man bei GitHub.

Bildnachweis: