Veröffentlicht am Schreib einen Kommentar

Aufbau einer Enterprise Search Plattform

Die Anforderungen an eine Enterprise Search sind schnell spezifiziert: Wir wollen eine Suchmaske mit einem Eingabefeld, einer Vorschlagsfunktion während des Tippens und einer Ergebnisliste, in der das wichtigste Dokument zuoberst steht.

GUI Enterprise Search
Wir wollen eine einfache Suchmaske mit einem einzigen Eingabefeld, einer Vorschlagsfunktion während des Tippens und einer Ergebnisliste, in der das wichtigste Dokument zuoberst steht.

Wir sind verwöhnt von Google und Co. Die Messlatte für die Standards liegt hoch. Jahrzehntelange Forschung und Entwicklung stehen hinter diesen Suchsystemen.

Wer eine Enterprise Search aufbauen will, muss sich auf eine längere Entwicklungszeit einstellen.

Der Aufbau einer Unternehmensweiten Suche ist mehr als ein Integrationsprojekt. Dokumente liegen in unterschiedlichen Systemen und sollen mit wenigen Klicks in einer einfachen Suchmaske auffindbar sein. Natürlich steht das beste Dokument ganz oben in der Trefferliste und die Trefferliste ist maßgeschneidert auf den Suchenden.

Ein ehrgeiziges Projekt. Hier die wichtigsten Aspekte, die es zu beachten gilt:

Autorin: Ursula Deriu

Datenquellen und Mengengerüst

Die Dokumente liegen oft verstreut auf vielen unterschiedlichen Systemen. Sharepoint, Confluent, Filesysteme, CMS-Systeme, Document Management Systeme, Archivsysteme – der Phantasie sind keine Grenzen gesetzt.

Es lohnt sich, zu Projektstart, eine Liste zu erstellen mit

  • den einzelnen Datenquellen,
  • jeweils der ungefähren Anzahl Dokumente,
  • den Fileformaten,
  • den Aktualisierungshäufigkeiten und dem geschätzten Wachstum
  • und den Authentifizierungsmethoden.

Diese Liste  wird helfen, ein passendes System zu evaluieren.

Im Verlauf des Projekts werden die Dokumente aus allen Systemen ausgelesen . Die Enterprise Search Plattform braucht Zugriff auf alle Systeme und muss mit einer Vielzahl von Authentifizierungsmechanismen zurechtkommen.

Auch erwarten wir, dass die Suchmaschine mit den Änderungen der Dokumente synchron ist.

Je nach Quellsystem wird diese Herausforderung anders zu lösen sein.

Fazit

Unternehmensweite Suche – die Anforderungen sind schnell formuliert. Der Aufbau eines Suchsystems jedoch ist sehr anspruchsvoll und aufwändig und benötigt vielfältige Skills.

Laufender Betrieb und Unterhalt

Jedes System muss gepflegt werden. Verteilte Systeme sind aufwendiger.

Die Daten müssen aktuell gehalten werden – vielleicht sogar in Echtzeit.

Die Texte und Dokumente werden von Menschenhand erstellt und der Kreativität sind keine Grenzen gesetzt.

Wir brauchen Qualitätssichrungsmechanismen die bemerken, wenn neue Formate erfunden wurden, die in der Datenkonvertierung noch nicht erkannt werden.

  • Abo: Data Engineering und Analytics
  •  

Veröffentlicht am Schreib einen Kommentar

Zeit im Big Data Stream Processing

Zeit im Big Data Stream Processing

Event-Time, Ingestion-Time, Processing-Time? Und was bedeutet da Real-Time? Der Artikel beleuchtet spezielle Herausforderungen der Echtzeitanalyse großer Datenströme besonders im Hinblick auf den Faktor Zeit.

Autorin: Ursula Deriu

Typische Anwendungsfälle des Data Stream Processings

Real-Time Analytics - Use Cases
Anwendungsfälle der Real-Time Stream Analytics: Eine Analyse-Pipeline verarbeitet als Quelle (Source) unbounded Datasets und leitet die Ergebnisse an die geeignete Senke (Sink) weiter.

Streams sind überall:

  • Logdaten zur Überwachung von Servern
  • Sensordaten aus Endgeräten, wie Smartphones (IoT)
  • Sensordaten aus Fahrzeugen und Geräten (IoT, Predictive Maintenance)
  • Überwachungsdaten- z.B. Webcams
  • Social Media Daten
  • Echtzeitanalysen in Data Warehouses, resp. Data Lakes
  • und viele andere mehr

Statt von ‘Records’ ist die Rede von ‘Events’ oder auch von ‘Messages’ oder ‘Nachrichten’. Die Events werden ununterbrochen generiert und gelangen in einem fortwährenden Strom in die Analytics-Pipeline.

Unaufhörlich treffen neue Events ein. Stream Processing verarbeitet und analysiert ‘unbeschränkten’ Datasets (engl. unbounded Datasets). Eigentlich sind es Warteschlangen (Queues) und die Events.

Zeitkritische Verarbeitung von Data Streams

Sollen beispielsweise Herzpatienten von Ferne überwacht werden, dann ist eine sehr zeitnahe Verarbeitung und Analyse gefordert.

Verläßt der Puls eine gewisse Toleranzschwelle, dann wird ein Alarm ausgelöst. Diagramme werden zusätzlich erstellt und ermöglichen die Diagnose und Überwachung.

Ein konstruiertes Beispiel erleichtert die Erläuterung: Bei einem Herzpatienten soll der Puls gemessen werden. Für jeden Pulsschlag wird via Internet ein Event an die Real-Time Data Pipeline in einem entfernten Rechenzentrum gesandt.
Ein konstruiertes Beispiel erleichtert die Erläuterung: Bei einem Herzpatienten soll der Puls gemessen werden. Für jeden Pulsschlag wird via Internet ein Event an die Real-Time Data Pipeline in einem entfernten Rechenzentrum gesendet.

Das Szenario ist stark vereinfacht:

Ein kleines Gerät erhebt den Puls bei einem Herzpatienten. Für jeden Herzschlag schickt es ein Event via Internet zu einer Real-Time-Pipeline.

Nicht jedes Event nimmt denselben Weg durchs Internet – und so treffen die Events in nahezu beliebiger Reihenfolge in der Analytics Pipeline ein.

Wird jetzt der Puls unseres fiktiven Patienten ausgewertet, dann dürfen keine Events verloren gehen und es dürfen keine Events mehrfach verarbeitet werden.

Dies zu garantieren, ist in Big Data Systemen eine besonders komplexe Aufgabe. Immerhin sollen diese Systeme ja horizontal skalieren.

Vorkehrungen bei der Konfiguration des Gesamtsystems und bei der Programmierung Analyse sind notwendig, um die geforderte Exactly-Once Verarbeitung zu erreichen.

Zeitfenster (Windows) in der Data Stream Processing

Für eine Echzteit-Analyse werden die laufend eintreffenden Events typischerweise nach Zeitintervallen zusammengefasst (aggregiert) und das Ergebnis wurd als Zeitreihe dargestellt.

Beispiel Histogram als Ergebnis einer Aggregatsfunktion
Das Ergebnis einer Real-Time Analyse ist typischerweise eine Zeitreihe. Die Events werden in sinnvollen Zeitintervallen aggregiert.

Dazu teilt die verwendete Stream Analytics Engine  die Events in Zeitfenster (engl. Windows) ein.

Man unterscheidet die folgenden Window-Typen:

Typen verschiedner Zeitfenster
Schematische Verdeutlichung der verschiedenen Typen von Zeitfenstern. Nicht jede Analytics Engine bietet alle Typen an.

Fixed Windows (heißen auch hopping Windows) unterteilen die Zeit in gleichmäßige Intervalle. Die klassische Batch-Verarbeitung mit den typischen nächtlichen Verarbeitungen, macht eigentlich genau das gleiche. In der real-time Verarbeitung sind die Intervalle jedoch sehr viel kleiner und die Analyse erfolgt sehr zeitnah zum Eintreffen der Daten.

Sliding Windows haben Überlappungen. Dies erlaubt es, einen Einblick in die eintreffenden Daten zu erhalten noch bevor die Verarbeitung eines Window fertig abgeschlossen ist.

Sessions sind auch Zeitfenster. Diese haben jedoch unterschiedliche Länge und überlappen. Selbst die Aktionen in einer Web-Session oder in einer Datenbank-Session bilden einen Event-Stream.

Die Herzschläge unseres fiktiven Patienten analysieren wir mit Fixed Windows – der Puls eines Patienten wird ja pro Minute erhoben.

Typen des Data Stream Processing

Typen Stream Analytics Operationen
Schematische Darstellung verschiedener Typen von Real-Time Analytics Operationen.

Immer häufiger werden klassische ETL-Prozesse durch Data Stream Processing abgelöst. Hier kommt elementweise Verarbeitung zum Zuge, beispielsweise, um Daten zu bereinigen (Datumstransformation und ähnliches).

Gerade für Analysen sind Aggregationen interessant. In SQL entsprechen diese einem Group By.

Oft werden auch Daten aus verschiedenen Quellen zu einem Stream vereint – in SQL entspricht dies einem Join.

Machine Learning auf Streams wird immer beliebter. Anwendungsfälle sind Predictive Maintenance oder Fraud Detection.

Im Beispiel unseres fiktiven Herzpatienten aggregieren wir die Events und zählen sie pro Minute.

Zeitversatz (Time Skew)

Konzentrieren wir uns auf die Zeit. Der Zeitpunkt eines einzelnen Herzschlags des Patienten ist die Event-Time. Trifft das Event bei der Pipeline ein, dann spricht man von Ingestion-Time.

Präziserweise würden wir von Timestamp statt von Time sprechen.

Zwischen diesen beiden Zeitpunkten kann theoretisch beliebig viel Zeit verstreichen. Und möglicherweise befindet sich der Patient gerade in einer anderen Zeitzone als die Streaming Pipeline.

Event-Time vs. Processing-Time
Event-Time vs. Processing-Time

Auch innerhalb der Pipeline wird es eine gewisse Zeit dauern, bis das Event verarbeitet wird. Der Zeitpunkt wird Processing Time genannt.

Die Abweichung zwischen der Processing Time und der Event Time bezeichnen wir als Zeitversatz (engl. Time Skew).

Der Begriff “Real-Time” suggeriert, dass der Zeitversatz gleich Null ist. Doch das ist unrealistisch. Eigentlich sollte von “Near-Real-Time” gesprochen werden.

Beispiel Event-Time vs. Processing-Time
Darstellung der Abweichung zwischen Event-Time (x-Achse) und Processing-Time (y-Achse). Jeder schwarze Punkt markiert ein einzelnes Event. Der zeitliche Versatz der beiden Zeitpunkte wird als Abweichung von der Diagonalen. Die Ideallinie bedeutet Real-Time im wörtlichen Sinn.

Das hervorgehobene Event wurde um 12:00:xx generiert und wird ist erst um 12:05:yy von der Pipeline verarbeitet.

Soll nach Processing-Time analysiert werden, dann fällt das Event ins Zeitintervall der Minute 12:05 -12:06.

Soll hingegen nach Event-Time analysiert werden, dann gehört das Event ins Zeitintervall zwischen 12:00 und 12:01.

Die Diagonale  zeigt den nie eintreffenden Idealfall: die Verarbeitungszeit ist gleich der Event-Zeit – das wäre Echtzeit im wörtlichen Sinn. Der zeitliche Versatz von der Ideallinie ist sowohl in der x-Achse als auch in der y-Achse sichtbar.

Dieser zeitliche Versatz nicht konstant sondern hängt von vielen technischen Faktoren ab:

  • Vom Zustand der Systeme außerhalb der Stream-Analytics-Pipeline, beispielsweise von der Übertragungsgeschwindigkeit im Internet.
  • Vom Zustand der Stream-Analytics-Pipeline.

Auswirkungen des zeitlichen Versatzes

Bedeutet “Real-Time” jetzt Verarbeitungszeit – dann verarbeitet die Analytics Engines die Events in der Reihenfolge ihres Eintreffens. Ganz unabhängig davon, wie lange die Events unterwegs und welchen Weg sie durchs Internet eingeschlagen haben.

Auswertung nach Processing-Time
Auswertung des Beispiels nach Processing-Time. Hier werden die Pulsschläge pro Minute gezählt. Berücksichtigt wird die Processing-Time.

Das Histogramm zeigt die Anzahl der Events pro Minute. Window-Länge ist also eine Minute. Wir betrachten die Achse “Processing Time” und zählen pro Minute die jeweils Events. Das Ergebnis zeichnen wir im Histogramm auf.

Anders sieht es aus, wenn mit “Real-Time” an die Event-Zeit gemeint ist, wie bei unserem Herzpatienten.

Rechnen wir das Ergebnis der Event-Time Analyse in unserem Beispiel aus:  Diesmal zählen wir die Events pro Minute auf der Achse Event-Time. Dabei entsteht das folgende Histogramm:

Auswertung nach Event-Time
Auswertung derselben Daten, doch diesmal werden die Pulsschläge pro Minute nach Event-Time gezählt.

Legen wir beide Histogramme übereinander, dann sehen wir große Abweichungen zwischen den beiden Analysen. Je nach Anwendungsfall sind solche Abweichungen nicht tragbar.

Bei unserem Beispiel: die beiden Analysen des Pulses unseres fiktiven Patienten weichen sehr stark voneinander ab.

Vergleich der Auswertung nach Event-Time und nach Processing-Time
Vergleich der Auswertung nach Event-Time (schwarz) und nach Processing-Time (orange).

Der Zeitversatz zwischen Processing-Time und Event-Time stellt hohe Anforderungen an die Stream Analytics Engine. Diese arbeitet ja auf dem Zeitstrahl der Processing-Time, erhält die Events in beliebiger Reihenfolge mit beliebigen Versatz und ist gefordert in Echtzeit nach Event-Time auszuwerten.

Analysen nach Event-Zeit

Untersuchen wir das Phänomen genauer.

Zur Verdeutlichung färben wir die Events, die in derselben Minute anfielen (Event-Zeit) mit je einer Farbe.

Beispiel der zeitkritischen Real-Time Analyse nach Minute Event-Time
Beispiel der zeitkritischen Real-Time Analyse - die Events einer Minute sind jeweils gleich eingefärbt.

Daraus leiten wir den Event-Stream ab in der Reihenfolge der Processing-Time. In der Reihenfolge verarbeitet die Analytics Engine die Events ja in jedem Fall.

Processing-Stream
Die Events sortiert nach Processing-Time - in dieser Reihenfolge treffen die Events bei der Analytics-Pipeline ein.

Die Analyse nach Minute ist für die Überwachung unseres Herzpatienten unbrauchbar.

Die Analytics Engine stellt jetzt die Reihenfolge wieder her, in der die Events ursprünglich angefallen ist.

Wir ergänzen das Diagramm durch den Event-Stream nach Event-Time. Die Pfeile zeigen, wie die Stream Analytics Engine die laufend verarbeiteten Events intern in die Zeitfenster der Event-Zeit zuordnet und so die urspüngliche Reihenfolge wieder herstellt.

Dabei reicht die Zuordnung zum Zeitfenster aus – immerhin werden die Daten anschließend pro Zeitfenster ausgewertet. Pro Zeitfenster verwaltet die Analytics Engine also einen internen Puffer.

Event-Stream - rekonstruiert
Die Analytics Pipeline rekonstruiert den Stream nach Event-Time.

Man beachte, dass es sich bei einem Stream nicht um ein abgeschlossenes Dataset handelt, das einfach umsortiert werden kann. Bei dem unbounded Dataset des Streams treffen laufend neue Events ein.

Moderne Real-Time Analytics Systeme sind in der Lage, diese Zuordnung vorzunehmen.

Watermark

Bleibt die Frage, wie lange diese internen Puffer bestehen bleiben. Immerhin läuft die Pipeline ununterbrochen und potenziell unendlich lange. Diese Puffer verbrauchen Platz im Memory, der Platz ist endlich und muss frei gegeben werden.

Beispiel für ein spät eintreffendes Event
Das hervorgehobene Event wurde kurz nach 12:00 generiert, trifft aber erst nach 12:05 in der Pipeline ein.

Der zeitliche Versatz liegt in der Natur der Pipeline. Die Abweichung zwischen Event-Time und Processing-Time sollte für die meisten Events vergleichbar sein. Es wird immer Ausreisser geben, die mit erheblicher Verspätung eintreffen.

Als Watermark bezeichnet man die Toleranzgrenze für verspätetes Eintreffen. Setzen wir in unserem Beispiel die Watermark auf zwei Minuten, dann wird der Pupper für das erste Intervall, das ja um 12:01 endet, um 12:03 abgeschlossen, also zwei Minuten nach Ende des Zeitfensters.

Später eintreffende Events werden für die Analyse nicht mehr berücksichtigt.

Trigger

Wir sprechen von Echtzeit und wollen sofort Ergebnisse sehen und nicht warten, bis das Zeitfester plus die Wassermarke verstrichen sind. In unserem Beispiel mit einem Zeitfenster von einer Minute un der Watermark von zwei Minuten, würde das ja bedeuten, dass wir drei Minuten warten müssen, um Ergebnisse zu sehen.

Nach drei Minuten werden wird das Endergebnis sehen. In einer gut funktionierenden Pipeline, sollten in den meisten Fällen ja relativ wenig verspätete Events eintreffen, so dass meistens die Auswertung für ein Zeitfenster unmittelbar aktuell ist.

Doch gerade bei längeren Zeitfenstern wäre es interessant, auch schon vorher einen Einblick zu erhalten und ein vorläufiges Ergebnis zu sehen.

Triggers dienen genau dazu.

Early Trigger bestimmen den Zeitpunkt, du dem ein Einblick in die Entwicklung der Analyse bei einem offenen Zeitfenster erlaubt. Der Zeitpunkt wird relativ zum Beginn des Zeitfensters bestimmt.

Ontime Trigger bestimmen den Einblick in die Analyse nach Abschluss des Zeitfensters.

Late Trigger bestimmen die Watermark, also die Zeitspanne, die nach dem Ontime Trigger noch durchlaufen wird, bevor die Ergebnisse der Auswertung für das betroffene Zeitfenster abschließend vorliegen und der betreffende Puffer im Memory frei gegeben wird.

In der Animation wurden die folgenden Werte angenommen:

  • Ontime Trigger = Dauer eines Zeitfensters = 1 Minute
  • Early Trigger nach 20 Sekunden (diese Wahl passt zum Beispiel und ist nicht praktischer Natur)
  • Late Trigger = Watermark = 2 Minuten
Real-Time Auswertung nach Event-Time
Animierte Darstellung der Entwicklung der Real-Time Auswertung nach Event-Zeit. Early-Trigger nach 20 sec, Ontime Trigger nach 1 Minute (=Window), Late Trigger nach 2 Minuten (=Watermark).

Als Anwender der Data Stream Processing Engine

Auf der Reise von der Quelle bis zur Verarbeitung durchläuft ein Event viele Stationen. Jede kann theoretisch einen Zeitstempel auf das Event schreiben und nach jedem dieser Zeitstempel könnte eine Auswertung erfolgen. Die Aussage des Ergebnisses wäre jedes Mal eine andere.

Als Anwender eines Real-Time Systems sorgen wir dafür, dass wir die Events nach dem richtigen Zeitstempel auswerten. Je nach Anwendungsfall berücksichtigen wir auch unterschiedliche Zeitzonen.

Jedes Real-Time Analytics System löst die in diesem Artikel ganannten Herausforderungen. Jedes System hat ein eigenes API und ein eigenes Wording. So wird beispielsweise der Trigger nicht in jedem System so benannt.

Die Darstellung in diesem Artikel beziehen sich auf  das allgemeine Data Flow Model. Und mit Apache Beam ist auch eine Art Standard in Arbeit, um dem Endanwender die Einarbeitung zu erleichtern.

In den meisten Anwendungsfällen wird eine Auswertung nach Event-Time notwendig sein. Wir müssen also dafür sorgen, dass der Event-Timestamp auf den Events vorhanden ist.

Das unterliegende System kann dem Anwender die komplexe Handhabung der Zuordnung der Events zu den passenden Windows ab.

Moderne Big Data Stream Analytics Systeme ermöglichen dies für Datenströme, die so groß sind, dass sie auf einem verteilten System analysiert werden müssen. Und so werden auch die Data-Buffers die für diese Zuordnung notwendig sind, auf mehrere Systeme verteilt.

Weiterführende Quelle:

Akidau et. al: The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale Unbounded, Out-of-Order Data Processing, 2015, Proceedings of the VLDB Endowment

  • Abo: Data Engineering und Analytics
  •  

Veröffentlicht am Schreib einen Kommentar

Garantierte Reihenfolge in Apache Kafka

Kafka Ordering Guarantee

Garantierte Reihenfolge in Apache Kafka

Event Hubs wie Apache Kafka geben eine Ordering Guarantee und versprechen damit, dass die Events in derselben Reihenfolge gelesen werden, wie sie in die Queue geschrieben wurde. Dieser Blog-Post beleuchtet die Details und zeigt die Einschränkungen der garantierten Reihenfolge in Event Hubs im Real-Time Big Data.

Autorin: Ursula Deriu

1 Reihenfolge in Topics

Sei es als Message Queue, sei es als Event Hub Apache Kafka ist sehr beliebt. Apache Kafka kommt in Realtime Big Data Stream Processing Systemen zum Einsatz. Als verteiltes System ist Kafka jedoch sehr komplex. Das folgende Bild verdeutlicht die Grundidee:

High-Level-Betrachung eines Kafka-Topics
Das Producer-API sendet Events auf ein Topic. Das Consumer API liest Events von einem Topic. Ein Topic hat die Datenstruktur einer Queue, die Events haben eine Offsetnummer.

Der Producer schreibt laufend Nachrichten auf eine Topic, also eine Message Queue. Der Consumer holt diese bei Bedarf dort ab.

Zusammen mit der Aussage der Ordering Guarantee, also der garantierten Reihenfolge, suggeriert das Bild, dass der Consumer die Nachrichten in genau der Reihenfolge erhält, wie der Producer sie schreibt. Eine Queue, also Wartschlange, befolgt ja das FIFO-Prinzip – first in first out.

Das folgende Video geht mit einem simplen Test dieser Behauptung nach. Bald stellen wir fest, dass sie einen Trugschluss birgt.

Im ersten Durchlauf des Versuchs sendet der Producer die Events im Abstand von einer Sekunde an das Topic. Der Consumer erhält – wie erwartet – die Events in derselben Reihenfolge, in der der Producer sie versendet hat.

Im zweiten Durchlauf sendet der Producer die Events ohne Wartezeit und der Consumer erhält sie in einer abweichenden Reihenfolge.

Die Erklärung für das Phänomen ist einfach und basiert auf der Tatsache, dass das Topic eine verteilte Message Queue darstellt. Es wurde ja zu Beginn des Experiments mit fünf Partitionen definiert. Der Befehl im Video lautet ja:

kafka-topics.sh --create --topic nugget02 --partitions 5  --replication-factor 2 --bootstrap-server kafka-1:9092

Für das Experiment ist die Angabe –partitions 5 ausschlaggebend. Der Effekt des Experiments ist vergleichbar, wenn das Topic mit 2 oder mehr Partitionen definiert wird.

Apache Kafka ist ein komplexes System. Um das einfache API für den jeweiligen Anwendungsfall korrekt einzusetzen, benötigt man ein solides Verständnis des unterliegenden Systems.

2 Topics und Partitionen

Als Big-Data-System ist Apache Kafka ein verteiltes System. Die Daten der Topics werden also auf mehrere Broker verteilt, so dass insgesamt sehr große Datenmengen gehandhabt werden können. Je mehr Partitionen ein Topic umfasst, umso mehr Events kann das Topic enthalten.

Bei den Überlegungen zur Partitionierung ist zu berücksichtigen, dass eine Partition nicht über mehrere Festplatten hinweg gehalten werden kann. Der Festplattenplatz beschränkt also die Größe einer einzelnen Partition. Apache Pulsar ist ein Event-Hub, der diese Beschränkung nicht aufweist.

Bei der Entscheidung, wie viele Partitionen für ein Topic definiert werden sollen, spielt auch die Anzahl der vorhandenen Kafka-Broker eine Rolle. Die Partitionen sollten ja auf unterschiedliche Broker verteilt werden, um mehr Events halten zu können. Im Code oben wurden weder retention.ms noch retention.bytes definiert. Die Retention Time bestimmt die Dauer, während der ein Event in der Message Queue verbleibt, bevor es von Kafka automatisch gelöscht wird. Analog bestimmt die retention.bytes wie gross eine Partition maximal werden kann, bevor Kafka, die ältesten Events löscht, um diese Maximalgröße nicht zu überschreiten.

Topic Partitonen
Topics werden partitioniert. Die Events werden auf die einzelnen Partitionen verteilt.

3 Partitionen und Reihenfolge

Topics in Event Hubs sind ja (unbeschränkte) unbounded Datasets. Das bedeutet nicht, dass sie selbst unendlich groß werden – denn der Platz auf der Harddisk ist ja immer beschränkt und auch die Anzahl der Broker ist beschränkt. ‘Unbeschränkt’ bedeutet in dem Fall, dass die Event Queue theoretisch unendlich lang laufen kann – ältere Events werden ja gelöscht, wie oben beschrieben. Bei passender Konfiguration und Pflege wird eine unbeschränkte Laufzeit erreicht.

Jede Topic Partition ist eine Queue, also eine Warteschlange, die nach dem FIFO-Prinzip funktioniert. Damit ist ja die Reihenfolge gegeben: die Events werden in derselben Reihenfolge aus der Queue gelesen, in der sie in die Queue geschrieben wurden.

In einer verteilten Queue verteilt der Producer die Events auf die einzelnen Partitionen. Ein Event wird in genau einer Partition gespeichert. Innerhalb einer Partition werden also die Events in genau derjenigen Reihenfolge aufbewahrt, in der sie geschrieben wurden.

In unserem Experiment werden die Events also auf fünf Partitionen verteilt. Welches Event in welcher Partition landen wird, ist nicht vorhersehbar.

Doch warum stimmt die Reihenfolge im ersten Durchlauf des Experiments und im zweiten Durchlauf nicht mehr?

Der Unterschied war die Geschwindigkeit, in der der Producer die Events auf die Topic-Partitionen schickte. Um den Effekt zu verstehen, schauen wir den Partitioner genauer an.

4 Der Partitioner

Das Kafka-Partitioner API ist einfach bedienbar. Der Python Code im Experiment sah wie folgt aus:

producer = KafkaProducer(bootstrap_servers='kafka-1')
....
producer.send(topic="nugget02", value=data)

Damit wurde das Default-Verhalten von Kafka aufgerufen. Dieses setzt Vieles voraus, was vielleicht gar nicht gewünscht ist. Auch die Wahl des Partitioners.

Es stehen mehrere Partitioner-Algorithmen zur Auswahl.

Der Partitioner definiert einen Buffer für jede Partition, auf die er schreiben soll. Die eintreffenden Events verteilt er nach dem gewählten Algorithmus auf die Partitionen. Beim Round-Robin-Partitioner beispielsweise, werden die Events der Reihe nach auf die einzelnen Buffer verteilt.

Ist ein Buffer voll oder ist eine gewisse Zeit verstrichen, dann werden die Events an die jeweilige Partition gesendet. Beide Parameter können konfiguriert werden.

Dieser Timeout ist natürlich viel kürzer als die eine Sekunde, die wir im ersten Durchlauf des Experiments definiert hatten. Somit wird jedes Event einzeln an die jeweilige Partition gesandt. Im zweiten Durchlauf des Experimants wurden dann mehrere Events gleichzeitig an die Partition gesandt.

Das ist nur ein Teil der Erklärung für die abweichende Reihenfolge, in der der Consumer in den beiden Durchläufen die Events geliefert bekam. Der zweite Teil der Erklärung ist beim Consumer zu suchen.

Dieser pollt die einzelnen Partitionen in einer bestimmten (ebenfalls konfigurierbaren) Frequenz und erhält jeweils höchstens so viele Events geliefert, wie im vorgesehenen Buffer Platz finden (auch diese Parameter in konfigurierbar).

Im ersten Durchlauf des Experiments trafen die Events so langsam in den Partitionen ein, dass sie einzeln ausgeliefert wurden und so die scheinbar richtige Reihenfolge einhalten konnten. Im zweiten Durchlauf des Experiments war das nicht mehr der Fall und die globale Reihenfolge der Events geriet beim Consumer durcheinander.

Kafka Partitioner
Der Round-Robin-Partitioner verteilt die Events abwechslungsweise auf die Partitionen. Ist der Buffer voll, oder ist eine konfigurierte Zeitspanne verstrichen, dann werden die Events aus dem Buffer auf die Partition geflushed.

5 Partition beeinflussen

Bei all diesen Betrachtungen dürfen wir eins nicht außer Acht lassen: Auch beim Consumer wird es sich meistens um ein verteiltes System handeln, beispielsweise um Apache Spark, oder ein anderes Tool zur Real-Time Analyse von Kafka-Topics.

Sollte für diese Verarbeitung oder Analyse der Daten die Reihenfolge eine Rolle spielen, dann wird sich die Analyse-Komponente in ihrer Rolle als Consumer darum kümmern. So wird Apache Spark für Real-Time Analysen die Events in Zeitfenster einteilen und auswerten (siehe Zeit im Big Data Stream Processing).

Verlangt unser Use Case, dass gewisse Events in der richtigen Reihenfolge aus der Queue gelesen werden, dann haben wir bei Apache Kafka die folgenden Möglichkeiten:

5.1 Beim Senden der Events die Partition angeben

Diese Variante sollte mit Bedacht angewendet werden. Immerhin gibt es eine ganze Reihe verschiedener Partitioner-Algorithmen, die genau für diesen Zweck geschrieben wurden.

5.2 Mit nur einer Partition arbeiten

Damit sind wir auf der sicheren Seite: Wo nichts aufgeteilt wird, gerät nichts durcheinander. Doch wir verlieren die Möglichkeit, die Daten auf mehrere Broker zu verteilen und damit die Menge insgesamt zu skalieren.

5.3 Mit Keys arbeiten

Im Code-Beispiel oben, enthielt der Send-Befehl nur ein Value um die Variable zu benennen, in der die zu sendenden Daten hinterlegt sind. Der Send-Befehl kann auch einen Key enthalten. Dabei handelt es sich nicht um einen Primary-Key, sondern lediglich um ein Merkmal, das den Value indentifiziert.

Der Primary-Key könnte beispielsweise eine Konto-Nummer sein. Der zugehörende Value könnte die Veränderungen auf dem Konto enthalten. Für jede Veränderung auf dem Konto wird ein Event ins Topic geschrieben. So können wir die Veränderungen nachverfolgen – immerhin haben sie alle denselben Key, werden in dieselbe Partition geschrieben und erscheinen demnach in der historischen Reihenfolge.

Der Partitioner serialisiert zuerst den Key und bildet danach einen Hash-Wert bilden, dessen Ergebnis die Partition bezeichnet. Damit erwirken wir, dass alle Events mit demselben Key auf ein und dieselbe Partition geschrieben werden und das unter Einhaltung der Reihenfolge ihres Eintreffens. Eine Partition kann für mehrere Keys zuständig sein.

Damit erreichen wir eine sinnvolle Reihenfolge in Bezug auf ein Kriterium: Beispielsweise in Bezug auf die Messdaten eines einzelnen Sensors, wenn die SensorID als Kafka-Key dient.

Die Gefahr besteht, dass die Partitionen ungleichmäßig ausgelastet werden und damit die Consumer der nachfolgenden Komponente (z.B. Apache Spark) ungleichmäßig ausgelastet werden.

Mit Keys arbeiten ist auch sinnvoll, wenn zusätzlich die Topic Partitionen compacted werden. Bei der Compaction bleibt jeweils das jüngste Element eines Keys erhalten und die älteren werden gelöscht. Eine sinnvolle Art Daten mit relativ wenigen unterschiedlichen Keys in Topics zu verwalten.

6 Fazit

Apache Kafka gibt eine Ordering Guarantee ab. Gleich wie andere verteilte Event-Hubs kann diese Garantie nur in Bezug auf eine Partition abgegeben werden.

Durch die Wahl eines Keys kann die Reihenfolge der Events im Topic in Bezug auf den Key garantiert werden.

  • Abo: Data Engineering und Analytics
  •  

Credits:

(c) Video und Quiz: Tirsus GmbH / Ursula Deriu

Veröffentlicht am Schreib einen Kommentar

10 Tools zur Real-Time Analytics von Apache Kafka-Topics

Big Data Analyse für Apache Kafka

10 Tools zur Real-Time Analytics von Apache Kafka-Topics

Apache Kafka hat sich im Big-Data-Bereich dabei als Quasi-Standard für Event Hubs durchgesetzt. Mit hoher Geschwindigkeit persistiert es immer schneller eintreffende Events sicher in Topics. Höher, Schneller, Stärker – das Motto der Olympischen Spiele gilt auch für die Ansprüche an moderne Datenanalyse.

Immer schneller soll das Datengold in den Topics geschürft werden. Die Daten sollen gefiltert, angereichert, gejoined und aggregiert werden und das möglichst in Echtzeit.

Anforderungen an Real-Time-Analytics Tools


Hoch sind die Anforderungen an die Werkzeuge. Die Thematik ist alles andere als trivial. Wir haben es zu tun mit (potenziell) gigantischen Datenmengen und erwarten dennoch:

  • Horizontale Skalierbarkeit bis in Big-Data-Bereiche
  • Ausfallsicheren 7×24 Betrieb
  • Echtzeit-Analysen
  • Exactly Once Garantie
  • Event-Time Analysen
  • Windowing
  • Watermarks
  • Handling von Out-of-Order Events
  • Aggregationsfunktionen
  • Machine Learning auf Streams beispielsweise für predictive Maintenance.

Die Tool-Palette für Apache Kafka als Quelle wächst und damit die Qual der Wahl.

Real-Time Analytics Tools der Kafka Community

Begonnen hat Apache Kafka ja als hochperformanter und hoch skalierbarer Event-Hub, und wird  je nach Sichtweise auch als Message Queue bezeichnet. Das Streaming-API kamen relativ spät dazu und noch jünger ist ksqlDB.

Das Kafka-Streams API hat mit seinen Features mächtig zugelegt. Kein zusätzliches Cluster wird benötigt. Input und Output der Analysen sind jeweils Topics.  Das API gibt’s für Java und Scala. Wer nicht in ein Topic schreiben möchte, sucht nach einem Apache Kafka Connector für die gewünschte Datensenke.

Mit ksqlDB eröffnet sich gar die Möglichkeit, die Topics mit SQL-artigen Befehlen zu analysieren. ksqlDB ist relativ neu und gerade bei komplexen SQL-Abfragen mit vielen Joins und Aggregatfunktionen noch etwas umständlich zu bedienen. Ein Werkzeug, das sicher noch handlicher wird.

Die flexiblen Klassiker unter den Real-Time Analytics Tools

Ein Klassiker unter den Stream Analytics Engines ist Apache Spark. Es erlaubt sowohl Batch-Analysen auf einer Vielfalt von Formaten als auch Stream Analytics mit mannigfaltigen Datenquellen. Eine davon ist Apache Kafka.

Apache Spark ist sicher eine der ausgereiften Möglichkeiten. Die Integration von SQL, auch auf Streams, ist deutlich einfacher zu Handhaben. Machine Learning Modelle können mit Spark trainiert und/oder auf Streams deployes werden. Das ermöglicht beispielsweise für Predictive Maintanance, also vorausschauende Wartung. Spark GraphX ist gar eine Big-Data Graph Processing Engine. Zudem können die Ergebnisse der Stream-Analytics auf eine Vielzahl verschiedener Senken zur Weiterverarbeitung resp. Visualisierung geschrieben werden, Datenbanken, In-Memory-Stores und Topics.

Open Source Möglichkeiten zur Visualisierung findet man beispielsweise beim Apache Zeppelin oder beim Jupyter Lab. Spark bringt APIs für Scala, Java, Python und R.

 

Apache Flink entstand ursprünglich als Stream Analytics Engine, die Batch-Möglichkeiten kamen erst nachgelagert dazu. Die Unterschiede zu Spark liegen im Detail im Handling, in der Vielfalt der Features und auch in der Sprache der Foren. Flink wurde ursprünglich an der TU Berlin gestartet und ist mittlerweile von Alibaba aufgekauft worden. Die chinesisch-sprachigen Foren-Beiträge können ja glücklicherweise automatisch übersetzt werden. Interessant bei Flink ist auch die gute Integration mit Apache RocketMQ, einer Alternative zu Kafka als Event-Hub. Wie Flink wird auch die Open-Source-Community von  RocketMQ von Alibaba getrieben.

Big Data Stream Analytics Engines eine ganze Reihe. Apache Beam ist so eine Art aufkommender Standard für Stream Analytics. So ähnlich, wie SQL ein Standard für relationale Datenbanken ist.

Apache Spark und Apache Flink sind Beam-Runner, sollen also den Beam-Standard erlauben. Ein weiterer Open-Source Beam-Runner ist Apache Samza, der auch Kafka-Topics lesen und schreiben kann. Im Vergleich zu Apache Spark bringt Samza deutlich weniger Features und ist vielleicht gerade deswegen für den einen oder anderen Use-Case eine interessante Alternative.

Streaming-taugliche Big Data Datenbanken

Neben diesen ‘klassischen’ Stream-Analytics Engines gibt es eine wachsende Zahl von real-time Analytics fähigen Datenbanken.

Apache Druid ist in der Entwicklung schon weit fortgeschritten. Die Daten können in statischen Quellen, wie auf HDFS oder Amazon S3 liegen oder eben in real-time Quellen wie Kafka oder Amazon Kinesis.

Die Analysen erfolgen batch oder real-time mit Hilfe von einer intuitiv bedienbaren GUI.

Auch andere Big-Data taugliche Datenbanken entwickeln sich in eine vergleichbare Richtung.

Apache Ignite versteht sich ist eine verteilte Datenbank für hochperformantes Rechnen. Das Data Streaming API erlaubt eine Anbindung an Kafka, aber auch an Flink, RocketMQ, Spark, Storm, MQTT-Quellen. In einer Evaluation ist es als Alternative ist Ignite sicher ein guter Kandidat in einer Evaluation.

Echtzeit-Auswertungen sind ja per se Zeitabhängig, erfolgen in Zeitintervallen. Und so liegt es nahe, dass Zeitreihen-Datenbanken für Echtzeit-Auswertungen heranzuziehen sind. Die Anbindung an Kafka kann mit dem Kafka Connect API erfolgen. Man benötigt ein Verbindungsstück zwischen Kafka und der Datenbank. Die Events werden aus dem Kafka-Topic in die Datenbank übertragen, meist als Key-Value Paare. Dort werden sie weiterverarbeitet. Redis ist ein beliebter Key-Value Store. Und kann als Zwischenspeicher für eine weitere Auswertung, resp. Visualisierung dienen.

InfluxDB wiederum ist beliebt als Zeitreihen-Datenbank. Echtzeitauswertungen erfolgen ja oft in Zeitintervallen und da bietet es sich an, die Auswertung in einer Time-Series-Database wie Influxdb vorzunehmen.

Auch für Hazelcast gibt es einen Apache Kafka Connector und das Stream Processing API wird in Java eingebettet.


Zusammengefasst


  • Apache Spark ist das bei weitem flexibelste Tool für Real-Time-Analysen. Dicht gefolgt von Apache Flink
  • Die Apache Kafka-Community holt mit Kafka Streams und ksqlDB rasch auf in Bezug auf Datenbanalyse-Tools. Diese verarbeiten ausschließlich Kafka-Topics.
  • Wer auf Apache Druid setzen möchte, baut damit eine real-time fähige Analyse Datenbank auf.
  • Dank Kafka-Connect können auch weitere Tools wie Apache Ignite, Redis, InfluxDB und Hazelcast für die Analyse von Topics verwendet werden.

Meine Nr. 1 zur Analyse von Real Time Big Data

Das Gespann Kafka + Spark ist aus meiner Sicht momentan das schlagkräftigste und facettenreichste, wenn es um Echtzeitanalysen geht. Andere holen auf und es bleibt spannend.

  • Abo: Data Engineering und Analytics
  •