“Trade-Offs der Real-Time Analyse großer Datenströme”
Von der Theorie zur Anwendung
Von der Theorie zur Anwendung
Die Anforderungen an eine Enterprise Search sind schnell spezifiziert: Wir wollen eine Suchmaske mit einem Eingabefeld, einer Vorschlagsfunktion während des Tippens und einer Ergebnisliste, in der das wichtigste Dokument zuoberst steht.
Wir sind verwöhnt von Google und Co. Die Messlatte für die Standards liegt hoch. Jahrzehntelange Forschung und Entwicklung stehen hinter diesen Suchsystemen.
Wer eine Enterprise Search aufbauen will, muss sich auf eine längere Entwicklungszeit einstellen.
Der Aufbau einer Unternehmensweiten Suche ist mehr als ein Integrationsprojekt. Dokumente liegen in unterschiedlichen Systemen und sollen mit wenigen Klicks in einer einfachen Suchmaske auffindbar sein. Natürlich steht das beste Dokument ganz oben in der Trefferliste und die Trefferliste ist maßgeschneidert auf den Suchenden.
Ein ehrgeiziges Projekt. Hier die wichtigsten Aspekte, die es zu beachten gilt:
Autorin: Ursula Deriu
Qualität des Suchsystems
Erwartungen der Zielgruppen
Vertrauliche Daten in der Enterprise Search
Datenquellen und Mengengerüst
Suchsystem Lizenzierung
Architektur der Enterprise Search Plattform
Aufbau der Enterprise Search und Tuning
Ranking
Infrastruktur und Aktualiserungen
Laufender Betrieb und Unterhalt
Fazit
Die Erwartungen sind einfach:
“Ein gutes Suchsystem liefert mir das, was ich suche”.
Im Umkehrschluss: Liefert ein Suchsystem keine brauchbaren Ergebnisse, dann ist das System unbrauchbar. Und das gilt für jeden User und für jede Suchfrage.
Bei der Entwicklung einer Enterprise Search Plattform lohnt es sich, von Anfang an die Erwartungen der Zielgruppen abzuholen.
Gerade in einem größeren Unternehmen gibt es mehr als eine Zielgruppe. Diese richten sich nach dem Aufgabengebiet der Mitarbeitenden und nach deren Informationsbedarf.
Idealerweise werden noch vor Projektstart ausgewählte Vertreterinnen und Vertreter der einzelnen Zielgruppen ihre Erwartungen aufschreiben.
Und idealerweise wird diese Gruppe während des Projekts nach jeder Iteration gezielt Feedback zum Suchsystem geben.
Mit der Cranfield-Methode wurde dieses Vorgehen formalisiert und wird gerne in erfolgreichen Projekten angewendet. Denn ein Suchsystem ist dann gut, wenn es die Fragen der Suchenden beantwortet. Wir müssen also verstehen, was und wie die Suchenden fragen und mit welchen Antworten sie zufrieden sind.
Der Aufbau eines Suchsystems ist keine exakte Wissenschaft.
Eine wichtige Frage, die zu Beginn klar sein muss:
Gibt es Dokumente mit eingeschränkten Leserechten? Falls ja, dann ist diesem Merkmal besondere Beachtung zu schenken:
Ein solches Dokument darf nicht in der Trefferliste für alle sichtbar erscheinen.
Auch die Information, dass ein Dokument mit einem bestimmten Titel überhaupt existiert, ist eine Information.
Die Dokumente liegen oft verstreut auf vielen unterschiedlichen Systemen. Sharepoint, Confluent, Filesysteme, CMS-Systeme, Document Management Systeme, Archivsysteme – der Phantasie sind keine Grenzen gesetzt.
Es lohnt sich, zu Projektstart, eine Liste zu erstellen mit
Diese Liste wird helfen, ein passendes System zu evaluieren.
Im Verlauf des Projekts werden die Dokumente aus allen Systemen ausgelesen . Die Enterprise Search Plattform braucht Zugriff auf alle Systeme und muss mit einer Vielzahl von Authentifizierungsmechanismen zurechtkommen.
Auch erwarten wir, dass die Suchmaschine mit den Änderungen der Dokumente synchron ist.
Je nach Quellsystem wird diese Herausforderung anders zu lösen sein.
Make or Buy – diese Entscheidung muss gefällt werden.
Jeder Hersteller von Enterprise-Search Plattformen wird mit einem anderen Zauberkasten antreten und jeder Hersteller muss mit unserer Systemlandschaft zurechtkommen.
Entschließt man sich, die Suchplattform weitgehend selbst zu bauen, dann wird man dennoch nicht bei Null anfangen.
Man wird ein Suchsystem lizenzieren und in die eigene Systemlandschaft integrieren. Je nach Lizenzmodell wird die Datenmenge eine Rolle spielen.
Oder man basiert auf einem Open Source System – wie Apache Solr.
Dieses System wird seit 2006 von einer regen Community ständig weiterentwickelt. Es basiert auf der Open Source Library Apache Lucene.
Diese ist sehr bewährt und beliebt und wird gerne als Herzstück einer Suchmaschine verwendet.
Bleiben wir bei Apache Solr als Beispiel einer Suchmaschine, die wir als Herzstück unserer Enterprise Search Plattform einsetzen können.
Solr kommt mit einer Indexierungskomponente und mit einer Suchkomponente. Beide sind unglaublich flexibel konfigurierbar.
Es gibt Features für nahezu alle Anforderungen.
Reicht das nicht aus, dann können wir eigene Features dazu programmieren.
Die Lucene/Solr Community ist sehr aktiv und erweitert den Umfang laufend um die neuen Features, die dank KI möglich werden.
Die Indexierungskomponente baut den Invertierten Index und die Suchkomponente fragt diesen ab.
Je mehr Features wir konfigurieren und je mehr Dokumente wir Indexieren, umso größer wird der invertierte Index.
Früher als uns lieb ist, werden wir in die Big Data Welt, also in die Welt des verteilten Rechnens katapultiert, und war auch dann, wenn die Quelldokumente im Quellsystem bleiben und die User der Enterprise Search Plattform bei Klick auf einen Treffer das Dokument aus dem Quellsystem lesen.
Das folgende Diagramm zeigt die Architektur einer Enterprise Search.
Die Dokumente werden aus den einzelnen Quellsystemen abgeholt.
Sie werden in dasjenige Format transformiert, das Solr als Input verwendet. Das ist ein einfaches XML- oder JSON-Format. Pro Dokument definieren wir einzelne Suchfelder. Hier fängt das Index-Design schon an.
Je zielgenauer die Suchmaschine sein soll, desto raffinierter werden wir dieses Format strukturieren und desto aufwändiger wird diese Datenkonvertierung.
Die konvertierten Daten werden von Sorl zur Indexierung eingelesen. Wir konfigurieren, mit welchen Features indexiert und später gesucht wird.
Im Produktivbetrieb werden wir dafür sorgen, dass Änderungen an den Dokumenten in den Quellsystemen zeitnah indexiert werden.
Die Suchoberfläche sieht nur auf den ersten Blick einfach aus. Unter der Haube werden die User-Eingaben in Suchanfragen umgewandelt, die zum Index passen und die Features ausreizen.
Viel Aufmerksamkeit schenken wir dem Ranking: Wir sorgen also dafür, dass das relevanteste Dokument zuerst in der Trefferliste erscheint.
Heute gängig und erwartet: personalisierte Suchen – also ein Ranking, das sich nach dem Suchenden richtet.
Das ist viel Arbeit. Und mit jeder Datenquelle, die wir dazu nehmen, werden wir neue Erkenntnisse gewinnen.
Das Suchsystem wird schrittweise aufgebaut.
Jedes Quellsystem muss analysiert werden.
Die Struktur des Suchmaschinen Indexes wird ebenfalls schrittweise entwickelt. Zu Beginn wird man kaum die perfekten Entscheidungen treffen und muss das Design immer wieder überdenken.
Daten werden in den Quellsystemen abgeholt und konvertiert in ein Format, das von der Suchmaschine indexiert wird.
Vielleicht gibt es einen Konnektor, der das Suchsystem direkt mit dem Quellsystem verbindet und vielleicht funktioniert dieser sogar out-of-the-box und lässt sich nach unseren Anforderungen konfigurieren.
Falls nein, dann erstellt man diesen Prozess selbst. Python ist eine mächtige Sprache mit vielen guten Libraries, die uns die Arbeit erleichtern.
Das Tuning der Suche ist aufwändig und spannend. Immerhin soll die Suchmaschine nicht einfach ein Ergebnis liefern, sondern zielgenau den besten Treffer an erster Stelle zeigen.
Nicht nur das Index-Design selbst, sondern auch die Erschließung der Daten im Index wollen entwickelt werden.
Die grundlegendsten Schritte kommen in jedem Suchsystem standardmäßig mit:
Das ist der erste Schritt. Jetzt vergleichen wir die Erwartungen der Testuser mit den Ergebnissen.
Das Vorgehen können wir so standardisieren, dass wir nach jedem Optimierungsschritt aufgrund der von den User erwarteten Dokumente und vom Suchsystem gefundenen Treffer den F1-Score berechnen.
Dieser wird mit der out-of-the-box Konfiguration eher im niedrigen Bereich sein. Vielleicht bei 0,2 von 1,0.
Wobei wir 1,0 nicht erzielen wollen, weil wir damit das Suchsystem “overfitten”, also auf die Testuser und deren Fragen trimmen und alles andere außer Acht lassen.
Das erste Tuning wird sich mit der Frage befassen, ob jeder dieser Schritte verbessert werden kann. Beispiele:
Nach dem Tuning bauen wir den Index neu und errechnen den F1-Score neu.
Dieser wird stetig wachsen. Sinkt er, dann haben wir einen guten Hinweis, dass der letzte Optimierungsschritt nicht die erwarteten Ergebnisse brachte.
Nach und nach werden wir weitere Features einbauen. Hier eine Liste gängiger Möglichkeiten:
Es ist geschafft – wir vertrauen unserem Suchsystem und wollen eine erste Version ausrollen und einem ausgewählten Benutzerkreis zur Verfügung stellen.
Suchmaschinen Indexe werden sehr groß. Wir werden sehr viel Platz benötigen, sowohl Festplatte als auch RAM.
Und sehr wahrscheinlich werden wir mit einem verteilten System arbeiten, weil die schiere Menge der Daten nicht auf einem Rechner Platz hat.
Wir sollten uns schon lange bei Projektstart mit dem Aspekt der Infrastruktur befassen und in einer sehr frühen Projektphase berechnen, ob die Größe des Suchmaschinenindexes ein verteiltes System benötigt.
Ob wir damit in die Cloud gehen wollen? Auch diese Frage muss abgewogen werden.
Vielleicht kommt gar ein Real-Time-Aspekt dazu – und wir wollen Änderungen an Dokumenten sofort im Index nachführen.
Unternehmensweite Suche – die Anforderungen sind schnell formuliert. Der Aufbau eines Suchsystems jedoch ist sehr anspruchsvoll und aufwändig und benötigt vielfältige Skills.
Jedes System muss gepflegt werden. Verteilte Systeme sind aufwendiger.
Die Daten müssen aktuell gehalten werden – vielleicht sogar in Echtzeit.
Die Texte und Dokumente werden von Menschenhand erstellt und der Kreativität sind keine Grenzen gesetzt.
Wir brauchen Qualitätssichrungsmechanismen die bemerken, wenn neue Formate erfunden wurden, die in der Datenkonvertierung noch nicht erkannt werden.
Event-Time, Ingestion-Time, Processing-Time? Und was bedeutet da Real-Time? Der Artikel beleuchtet spezielle Herausforderungen der Echtzeitanalyse großer Datenströme besonders im Hinblick auf den Faktor Zeit.
Autorin: Ursula Deriu
Typische Anwendungsfälle des Data Stream Processings
Zeitkritische Verarbeitung von Data Streams
Zeitfenster (Windows) in der Data Stream Processing
Typen des Data Stream Processing
Zeitversatz (Time Skew)
Auswirkungen des zeitlichen Versatzes
Analysen nach Event-Zeit
Watermark
Trigger
Als Anwender der Data Stream Processing Engine
Streams sind überall:
Statt von ‘Records’ ist die Rede von ‘Events’ oder auch von ‘Messages’ oder ‘Nachrichten’. Die Events werden ununterbrochen generiert und gelangen in einem fortwährenden Strom in die Analytics-Pipeline.
Unaufhörlich treffen neue Events ein. Stream Processing verarbeitet und analysiert ‘unbeschränkten’ Datasets (engl. unbounded Datasets). Eigentlich sind es Warteschlangen (Queues) und die Events.
Sollen beispielsweise Herzpatienten von Ferne überwacht werden, dann ist eine sehr zeitnahe Verarbeitung und Analyse gefordert.
Verläßt der Puls eine gewisse Toleranzschwelle, dann wird ein Alarm ausgelöst. Diagramme werden zusätzlich erstellt und ermöglichen die Diagnose und Überwachung.
Das Szenario ist stark vereinfacht:
Ein kleines Gerät erhebt den Puls bei einem Herzpatienten. Für jeden Herzschlag schickt es ein Event via Internet zu einer Real-Time-Pipeline.
Nicht jedes Event nimmt denselben Weg durchs Internet – und so treffen die Events in nahezu beliebiger Reihenfolge in der Analytics Pipeline ein.
Wird jetzt der Puls unseres fiktiven Patienten ausgewertet, dann dürfen keine Events verloren gehen und es dürfen keine Events mehrfach verarbeitet werden.
Dies zu garantieren, ist in Big Data Systemen eine besonders komplexe Aufgabe. Immerhin sollen diese Systeme ja horizontal skalieren.
Vorkehrungen bei der Konfiguration des Gesamtsystems und bei der Programmierung Analyse sind notwendig, um die geforderte Exactly-Once Verarbeitung zu erreichen.
Für eine Echzteit-Analyse werden die laufend eintreffenden Events typischerweise nach Zeitintervallen zusammengefasst (aggregiert) und das Ergebnis wurd als Zeitreihe dargestellt.
Dazu teilt die verwendete Stream Analytics Engine die Events in Zeitfenster (engl. Windows) ein.
Man unterscheidet die folgenden Window-Typen:
Fixed Windows (heißen auch hopping Windows) unterteilen die Zeit in gleichmäßige Intervalle. Die klassische Batch-Verarbeitung mit den typischen nächtlichen Verarbeitungen, macht eigentlich genau das gleiche. In der real-time Verarbeitung sind die Intervalle jedoch sehr viel kleiner und die Analyse erfolgt sehr zeitnah zum Eintreffen der Daten.
Sliding Windows haben Überlappungen. Dies erlaubt es, einen Einblick in die eintreffenden Daten zu erhalten noch bevor die Verarbeitung eines Window fertig abgeschlossen ist.
Sessions sind auch Zeitfenster. Diese haben jedoch unterschiedliche Länge und überlappen. Selbst die Aktionen in einer Web-Session oder in einer Datenbank-Session bilden einen Event-Stream.
Die Herzschläge unseres fiktiven Patienten analysieren wir mit Fixed Windows – der Puls eines Patienten wird ja pro Minute erhoben.
Immer häufiger werden klassische ETL-Prozesse durch Data Stream Processing abgelöst. Hier kommt elementweise Verarbeitung zum Zuge, beispielsweise, um Daten zu bereinigen (Datumstransformation und ähnliches).
Gerade für Analysen sind Aggregationen interessant. In SQL entsprechen diese einem Group By.
Oft werden auch Daten aus verschiedenen Quellen zu einem Stream vereint – in SQL entspricht dies einem Join.
Machine Learning auf Streams wird immer beliebter. Anwendungsfälle sind Predictive Maintenance oder Fraud Detection.
Im Beispiel unseres fiktiven Herzpatienten aggregieren wir die Events und zählen sie pro Minute.
Konzentrieren wir uns auf die Zeit. Der Zeitpunkt eines einzelnen Herzschlags des Patienten ist die Event-Time. Trifft das Event bei der Pipeline ein, dann spricht man von Ingestion-Time.
Präziserweise würden wir von Timestamp statt von Time sprechen.
Zwischen diesen beiden Zeitpunkten kann theoretisch beliebig viel Zeit verstreichen. Und möglicherweise befindet sich der Patient gerade in einer anderen Zeitzone als die Streaming Pipeline.
Auch innerhalb der Pipeline wird es eine gewisse Zeit dauern, bis das Event verarbeitet wird. Der Zeitpunkt wird Processing Time genannt.
Die Abweichung zwischen der Processing Time und der Event Time bezeichnen wir als Zeitversatz (engl. Time Skew).
Der Begriff “Real-Time” suggeriert, dass der Zeitversatz gleich Null ist. Doch das ist unrealistisch. Eigentlich sollte von “Near-Real-Time” gesprochen werden.
Das hervorgehobene Event wurde um 12:00:xx generiert und wird ist erst um 12:05:yy von der Pipeline verarbeitet.
Soll nach Processing-Time analysiert werden, dann fällt das Event ins Zeitintervall der Minute 12:05 -12:06.
Soll hingegen nach Event-Time analysiert werden, dann gehört das Event ins Zeitintervall zwischen 12:00 und 12:01.
Die Diagonale zeigt den nie eintreffenden Idealfall: die Verarbeitungszeit ist gleich der Event-Zeit – das wäre Echtzeit im wörtlichen Sinn. Der zeitliche Versatz von der Ideallinie ist sowohl in der x-Achse als auch in der y-Achse sichtbar.
Dieser zeitliche Versatz nicht konstant sondern hängt von vielen technischen Faktoren ab:
Bedeutet “Real-Time” jetzt Verarbeitungszeit – dann verarbeitet die Analytics Engines die Events in der Reihenfolge ihres Eintreffens. Ganz unabhängig davon, wie lange die Events unterwegs und welchen Weg sie durchs Internet eingeschlagen haben.
Das Histogramm zeigt die Anzahl der Events pro Minute. Window-Länge ist also eine Minute. Wir betrachten die Achse “Processing Time” und zählen pro Minute die jeweils Events. Das Ergebnis zeichnen wir im Histogramm auf.
Anders sieht es aus, wenn mit “Real-Time” an die Event-Zeit gemeint ist, wie bei unserem Herzpatienten.
Rechnen wir das Ergebnis der Event-Time Analyse in unserem Beispiel aus: Diesmal zählen wir die Events pro Minute auf der Achse Event-Time. Dabei entsteht das folgende Histogramm:
Legen wir beide Histogramme übereinander, dann sehen wir große Abweichungen zwischen den beiden Analysen. Je nach Anwendungsfall sind solche Abweichungen nicht tragbar.
Bei unserem Beispiel: die beiden Analysen des Pulses unseres fiktiven Patienten weichen sehr stark voneinander ab.
Der Zeitversatz zwischen Processing-Time und Event-Time stellt hohe Anforderungen an die Stream Analytics Engine. Diese arbeitet ja auf dem Zeitstrahl der Processing-Time, erhält die Events in beliebiger Reihenfolge mit beliebigen Versatz und ist gefordert in Echtzeit nach Event-Time auszuwerten.
Untersuchen wir das Phänomen genauer.
Zur Verdeutlichung färben wir die Events, die in derselben Minute anfielen (Event-Zeit) mit je einer Farbe.
Daraus leiten wir den Event-Stream ab in der Reihenfolge der Processing-Time. In der Reihenfolge verarbeitet die Analytics Engine die Events ja in jedem Fall.
Die Analyse nach Minute ist für die Überwachung unseres Herzpatienten unbrauchbar.
Die Analytics Engine stellt jetzt die Reihenfolge wieder her, in der die Events ursprünglich angefallen ist.
Wir ergänzen das Diagramm durch den Event-Stream nach Event-Time. Die Pfeile zeigen, wie die Stream Analytics Engine die laufend verarbeiteten Events intern in die Zeitfenster der Event-Zeit zuordnet und so die urspüngliche Reihenfolge wieder herstellt.
Dabei reicht die Zuordnung zum Zeitfenster aus – immerhin werden die Daten anschließend pro Zeitfenster ausgewertet. Pro Zeitfenster verwaltet die Analytics Engine also einen internen Puffer.
Man beachte, dass es sich bei einem Stream nicht um ein abgeschlossenes Dataset handelt, das einfach umsortiert werden kann. Bei dem unbounded Dataset des Streams treffen laufend neue Events ein.
Moderne Real-Time Analytics Systeme sind in der Lage, diese Zuordnung vorzunehmen.
Bleibt die Frage, wie lange diese internen Puffer bestehen bleiben. Immerhin läuft die Pipeline ununterbrochen und potenziell unendlich lange. Diese Puffer verbrauchen Platz im Memory, der Platz ist endlich und muss frei gegeben werden.
Der zeitliche Versatz liegt in der Natur der Pipeline. Die Abweichung zwischen Event-Time und Processing-Time sollte für die meisten Events vergleichbar sein. Es wird immer Ausreisser geben, die mit erheblicher Verspätung eintreffen.
Als Watermark bezeichnet man die Toleranzgrenze für verspätetes Eintreffen. Setzen wir in unserem Beispiel die Watermark auf zwei Minuten, dann wird der Pupper für das erste Intervall, das ja um 12:01 endet, um 12:03 abgeschlossen, also zwei Minuten nach Ende des Zeitfensters.
Später eintreffende Events werden für die Analyse nicht mehr berücksichtigt.
Wir sprechen von Echtzeit und wollen sofort Ergebnisse sehen und nicht warten, bis das Zeitfester plus die Wassermarke verstrichen sind. In unserem Beispiel mit einem Zeitfenster von einer Minute un der Watermark von zwei Minuten, würde das ja bedeuten, dass wir drei Minuten warten müssen, um Ergebnisse zu sehen.
Nach drei Minuten werden wird das Endergebnis sehen. In einer gut funktionierenden Pipeline, sollten in den meisten Fällen ja relativ wenig verspätete Events eintreffen, so dass meistens die Auswertung für ein Zeitfenster unmittelbar aktuell ist.
Doch gerade bei längeren Zeitfenstern wäre es interessant, auch schon vorher einen Einblick zu erhalten und ein vorläufiges Ergebnis zu sehen.
Triggers dienen genau dazu.
Early Trigger bestimmen den Zeitpunkt, du dem ein Einblick in die Entwicklung der Analyse bei einem offenen Zeitfenster erlaubt. Der Zeitpunkt wird relativ zum Beginn des Zeitfensters bestimmt.
Ontime Trigger bestimmen den Einblick in die Analyse nach Abschluss des Zeitfensters.
Late Trigger bestimmen die Watermark, also die Zeitspanne, die nach dem Ontime Trigger noch durchlaufen wird, bevor die Ergebnisse der Auswertung für das betroffene Zeitfenster abschließend vorliegen und der betreffende Puffer im Memory frei gegeben wird.
In der Animation wurden die folgenden Werte angenommen:
Auf der Reise von der Quelle bis zur Verarbeitung durchläuft ein Event viele Stationen. Jede kann theoretisch einen Zeitstempel auf das Event schreiben und nach jedem dieser Zeitstempel könnte eine Auswertung erfolgen. Die Aussage des Ergebnisses wäre jedes Mal eine andere.
Als Anwender eines Real-Time Systems sorgen wir dafür, dass wir die Events nach dem richtigen Zeitstempel auswerten. Je nach Anwendungsfall berücksichtigen wir auch unterschiedliche Zeitzonen.
Jedes Real-Time Analytics System löst die in diesem Artikel ganannten Herausforderungen. Jedes System hat ein eigenes API und ein eigenes Wording. So wird beispielsweise der Trigger nicht in jedem System so benannt.
Die Darstellung in diesem Artikel beziehen sich auf das allgemeine Data Flow Model. Und mit Apache Beam ist auch eine Art Standard in Arbeit, um dem Endanwender die Einarbeitung zu erleichtern.
In den meisten Anwendungsfällen wird eine Auswertung nach Event-Time notwendig sein. Wir müssen also dafür sorgen, dass der Event-Timestamp auf den Events vorhanden ist.
Das unterliegende System kann dem Anwender die komplexe Handhabung der Zuordnung der Events zu den passenden Windows ab.
Moderne Big Data Stream Analytics Systeme ermöglichen dies für Datenströme, die so groß sind, dass sie auf einem verteilten System analysiert werden müssen. Und so werden auch die Data-Buffers die für diese Zuordnung notwendig sind, auf mehrere Systeme verteilt.
Weiterführende Quelle:
Akidau et. al: The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale Unbounded, Out-of-Order Data Processing, 2015, Proceedings of the VLDB Endowment
Event Hubs wie Apache Kafka geben eine Ordering Guarantee und versprechen damit, dass die Events in derselben Reihenfolge gelesen werden, wie sie in die Queue geschrieben wurde. Dieser Blog-Post beleuchtet die Details und zeigt die Einschränkungen der garantierten Reihenfolge in Event Hubs im Real-Time Big Data.
Autorin: Ursula Deriu
Sei es als Message Queue, sei es als Event Hub Apache Kafka ist sehr beliebt. Apache Kafka kommt in Realtime Big Data Stream Processing Systemen zum Einsatz. Als verteiltes System ist Kafka jedoch sehr komplex. Das folgende Bild verdeutlicht die Grundidee:
Der Producer schreibt laufend Nachrichten auf eine Topic, also eine Message Queue. Der Consumer holt diese bei Bedarf dort ab.
Zusammen mit der Aussage der Ordering Guarantee, also der garantierten Reihenfolge, suggeriert das Bild, dass der Consumer die Nachrichten in genau der Reihenfolge erhält, wie der Producer sie schreibt. Eine Queue, also Wartschlange, befolgt ja das FIFO-Prinzip – first in first out.
Das folgende Video geht mit einem simplen Test dieser Behauptung nach. Bald stellen wir fest, dass sie einen Trugschluss birgt.
Im ersten Durchlauf des Versuchs sendet der Producer die Events im Abstand von einer Sekunde an das Topic. Der Consumer erhält – wie erwartet – die Events in derselben Reihenfolge, in der der Producer sie versendet hat.
Im zweiten Durchlauf sendet der Producer die Events ohne Wartezeit und der Consumer erhält sie in einer abweichenden Reihenfolge.
Die Erklärung für das Phänomen ist einfach und basiert auf der Tatsache, dass das Topic eine verteilte Message Queue darstellt. Es wurde ja zu Beginn des Experiments mit fünf Partitionen definiert. Der Befehl im Video lautet ja:
kafka-topics.sh --create --topic nugget02 --partitions 5 --replication-factor 2 --bootstrap-server kafka-1:9092
Für das Experiment ist die Angabe –partitions 5 ausschlaggebend. Der Effekt des Experiments ist vergleichbar, wenn das Topic mit 2 oder mehr Partitionen definiert wird.
Apache Kafka ist ein komplexes System. Um das einfache API für den jeweiligen Anwendungsfall korrekt einzusetzen, benötigt man ein solides Verständnis des unterliegenden Systems.
Als Big-Data-System ist Apache Kafka ein verteiltes System. Die Daten der Topics werden also auf mehrere Broker verteilt, so dass insgesamt sehr große Datenmengen gehandhabt werden können. Je mehr Partitionen ein Topic umfasst, umso mehr Events kann das Topic enthalten.
Bei den Überlegungen zur Partitionierung ist zu berücksichtigen, dass eine Partition nicht über mehrere Festplatten hinweg gehalten werden kann. Der Festplattenplatz beschränkt also die Größe einer einzelnen Partition. Apache Pulsar ist ein Event-Hub, der diese Beschränkung nicht aufweist.
Bei der Entscheidung, wie viele Partitionen für ein Topic definiert werden sollen, spielt auch die Anzahl der vorhandenen Kafka-Broker eine Rolle. Die Partitionen sollten ja auf unterschiedliche Broker verteilt werden, um mehr Events halten zu können. Im Code oben wurden weder retention.ms noch retention.bytes definiert. Die Retention Time bestimmt die Dauer, während der ein Event in der Message Queue verbleibt, bevor es von Kafka automatisch gelöscht wird. Analog bestimmt die retention.bytes wie gross eine Partition maximal werden kann, bevor Kafka, die ältesten Events löscht, um diese Maximalgröße nicht zu überschreiten.
Topics in Event Hubs sind ja (unbeschränkte) unbounded Datasets. Das bedeutet nicht, dass sie selbst unendlich groß werden – denn der Platz auf der Harddisk ist ja immer beschränkt und auch die Anzahl der Broker ist beschränkt. ‘Unbeschränkt’ bedeutet in dem Fall, dass die Event Queue theoretisch unendlich lang laufen kann – ältere Events werden ja gelöscht, wie oben beschrieben. Bei passender Konfiguration und Pflege wird eine unbeschränkte Laufzeit erreicht.
Jede Topic Partition ist eine Queue, also eine Warteschlange, die nach dem FIFO-Prinzip funktioniert. Damit ist ja die Reihenfolge gegeben: die Events werden in derselben Reihenfolge aus der Queue gelesen, in der sie in die Queue geschrieben wurden.
In einer verteilten Queue verteilt der Producer die Events auf die einzelnen Partitionen. Ein Event wird in genau einer Partition gespeichert. Innerhalb einer Partition werden also die Events in genau derjenigen Reihenfolge aufbewahrt, in der sie geschrieben wurden.
In unserem Experiment werden die Events also auf fünf Partitionen verteilt. Welches Event in welcher Partition landen wird, ist nicht vorhersehbar.
Doch warum stimmt die Reihenfolge im ersten Durchlauf des Experiments und im zweiten Durchlauf nicht mehr?
Der Unterschied war die Geschwindigkeit, in der der Producer die Events auf die Topic-Partitionen schickte. Um den Effekt zu verstehen, schauen wir den Partitioner genauer an.
Das Kafka-Partitioner API ist einfach bedienbar. Der Python Code im Experiment sah wie folgt aus:
producer = KafkaProducer(bootstrap_servers='kafka-1') .... producer.send(topic="nugget02", value=data)
Damit wurde das Default-Verhalten von Kafka aufgerufen. Dieses setzt Vieles voraus, was vielleicht gar nicht gewünscht ist. Auch die Wahl des Partitioners.
Es stehen mehrere Partitioner-Algorithmen zur Auswahl.
Der Partitioner definiert einen Buffer für jede Partition, auf die er schreiben soll. Die eintreffenden Events verteilt er nach dem gewählten Algorithmus auf die Partitionen. Beim Round-Robin-Partitioner beispielsweise, werden die Events der Reihe nach auf die einzelnen Buffer verteilt.
Ist ein Buffer voll oder ist eine gewisse Zeit verstrichen, dann werden die Events an die jeweilige Partition gesendet. Beide Parameter können konfiguriert werden.
Dieser Timeout ist natürlich viel kürzer als die eine Sekunde, die wir im ersten Durchlauf des Experiments definiert hatten. Somit wird jedes Event einzeln an die jeweilige Partition gesandt. Im zweiten Durchlauf des Experimants wurden dann mehrere Events gleichzeitig an die Partition gesandt.
Das ist nur ein Teil der Erklärung für die abweichende Reihenfolge, in der der Consumer in den beiden Durchläufen die Events geliefert bekam. Der zweite Teil der Erklärung ist beim Consumer zu suchen.
Dieser pollt die einzelnen Partitionen in einer bestimmten (ebenfalls konfigurierbaren) Frequenz und erhält jeweils höchstens so viele Events geliefert, wie im vorgesehenen Buffer Platz finden (auch diese Parameter in konfigurierbar).
Im ersten Durchlauf des Experiments trafen die Events so langsam in den Partitionen ein, dass sie einzeln ausgeliefert wurden und so die scheinbar richtige Reihenfolge einhalten konnten. Im zweiten Durchlauf des Experiments war das nicht mehr der Fall und die globale Reihenfolge der Events geriet beim Consumer durcheinander.
Bei all diesen Betrachtungen dürfen wir eins nicht außer Acht lassen: Auch beim Consumer wird es sich meistens um ein verteiltes System handeln, beispielsweise um Apache Spark, oder ein anderes Tool zur Real-Time Analyse von Kafka-Topics.
Sollte für diese Verarbeitung oder Analyse der Daten die Reihenfolge eine Rolle spielen, dann wird sich die Analyse-Komponente in ihrer Rolle als Consumer darum kümmern. So wird Apache Spark für Real-Time Analysen die Events in Zeitfenster einteilen und auswerten (siehe Zeit im Big Data Stream Processing).
Verlangt unser Use Case, dass gewisse Events in der richtigen Reihenfolge aus der Queue gelesen werden, dann haben wir bei Apache Kafka die folgenden Möglichkeiten:
Diese Variante sollte mit Bedacht angewendet werden. Immerhin gibt es eine ganze Reihe verschiedener Partitioner-Algorithmen, die genau für diesen Zweck geschrieben wurden.
Damit sind wir auf der sicheren Seite: Wo nichts aufgeteilt wird, gerät nichts durcheinander. Doch wir verlieren die Möglichkeit, die Daten auf mehrere Broker zu verteilen und damit die Menge insgesamt zu skalieren.
Im Code-Beispiel oben, enthielt der Send-Befehl nur ein Value um die Variable zu benennen, in der die zu sendenden Daten hinterlegt sind. Der Send-Befehl kann auch einen Key enthalten. Dabei handelt es sich nicht um einen Primary-Key, sondern lediglich um ein Merkmal, das den Value indentifiziert.
Der Primary-Key könnte beispielsweise eine Konto-Nummer sein. Der zugehörende Value könnte die Veränderungen auf dem Konto enthalten. Für jede Veränderung auf dem Konto wird ein Event ins Topic geschrieben. So können wir die Veränderungen nachverfolgen – immerhin haben sie alle denselben Key, werden in dieselbe Partition geschrieben und erscheinen demnach in der historischen Reihenfolge.
Der Partitioner serialisiert zuerst den Key und bildet danach einen Hash-Wert bilden, dessen Ergebnis die Partition bezeichnet. Damit erwirken wir, dass alle Events mit demselben Key auf ein und dieselbe Partition geschrieben werden und das unter Einhaltung der Reihenfolge ihres Eintreffens. Eine Partition kann für mehrere Keys zuständig sein.
Damit erreichen wir eine sinnvolle Reihenfolge in Bezug auf ein Kriterium: Beispielsweise in Bezug auf die Messdaten eines einzelnen Sensors, wenn die SensorID als Kafka-Key dient.
Die Gefahr besteht, dass die Partitionen ungleichmäßig ausgelastet werden und damit die Consumer der nachfolgenden Komponente (z.B. Apache Spark) ungleichmäßig ausgelastet werden.
Mit Keys arbeiten ist auch sinnvoll, wenn zusätzlich die Topic Partitionen compacted werden. Bei der Compaction bleibt jeweils das jüngste Element eines Keys erhalten und die älteren werden gelöscht. Eine sinnvolle Art Daten mit relativ wenigen unterschiedlichen Keys in Topics zu verwalten.
Apache Kafka gibt eine Ordering Guarantee ab. Gleich wie andere verteilte Event-Hubs kann diese Garantie nur in Bezug auf eine Partition abgegeben werden.
Durch die Wahl eines Keys kann die Reihenfolge der Events im Topic in Bezug auf den Key garantiert werden.
(c) Video und Quiz: Tirsus GmbH / Ursula Deriu
Apache Kafka hat sich im Big-Data-Bereich dabei als Quasi-Standard für Event Hubs durchgesetzt. Mit hoher Geschwindigkeit persistiert es immer schneller eintreffende Events sicher in Topics. Höher, Schneller, Stärker – das Motto der Olympischen Spiele gilt auch für die Ansprüche an moderne Datenanalyse.
Immer schneller soll das Datengold in den Topics geschürft werden. Die Daten sollen gefiltert, angereichert, gejoined und aggregiert werden und das möglichst in Echtzeit.
Hoch sind die Anforderungen an die Werkzeuge. Die Thematik ist alles andere als trivial. Wir haben es zu tun mit (potenziell) gigantischen Datenmengen und erwarten dennoch:
Die Tool-Palette für Apache Kafka als Quelle wächst und damit die Qual der Wahl.
Begonnen hat Apache Kafka ja als hochperformanter und hoch skalierbarer Event-Hub, und wird je nach Sichtweise auch als Message Queue bezeichnet. Das Streaming-API kamen relativ spät dazu und noch jünger ist ksqlDB.
Das Kafka-Streams API hat mit seinen Features mächtig zugelegt. Kein zusätzliches Cluster wird benötigt. Input und Output der Analysen sind jeweils Topics. Das API gibt’s für Java und Scala. Wer nicht in ein Topic schreiben möchte, sucht nach einem Apache Kafka Connector für die gewünschte Datensenke.
Mit ksqlDB eröffnet sich gar die Möglichkeit, die Topics mit SQL-artigen Befehlen zu analysieren. ksqlDB ist relativ neu und gerade bei komplexen SQL-Abfragen mit vielen Joins und Aggregatfunktionen noch etwas umständlich zu bedienen. Ein Werkzeug, das sicher noch handlicher wird.
Ein Klassiker unter den Stream Analytics Engines ist Apache Spark. Es erlaubt sowohl Batch-Analysen auf einer Vielfalt von Formaten als auch Stream Analytics mit mannigfaltigen Datenquellen. Eine davon ist Apache Kafka.
Apache Spark ist sicher eine der ausgereiften Möglichkeiten. Die Integration von SQL, auch auf Streams, ist deutlich einfacher zu Handhaben. Machine Learning Modelle können mit Spark trainiert und/oder auf Streams deployes werden. Das ermöglicht beispielsweise für Predictive Maintanance, also vorausschauende Wartung. Spark GraphX ist gar eine Big-Data Graph Processing Engine. Zudem können die Ergebnisse der Stream-Analytics auf eine Vielzahl verschiedener Senken zur Weiterverarbeitung resp. Visualisierung geschrieben werden, Datenbanken, In-Memory-Stores und Topics.
Open Source Möglichkeiten zur Visualisierung findet man beispielsweise beim Apache Zeppelin oder beim Jupyter Lab. Spark bringt APIs für Scala, Java, Python und R.
Apache Flink entstand ursprünglich als Stream Analytics Engine, die Batch-Möglichkeiten kamen erst nachgelagert dazu. Die Unterschiede zu Spark liegen im Detail im Handling, in der Vielfalt der Features und auch in der Sprache der Foren. Flink wurde ursprünglich an der TU Berlin gestartet und ist mittlerweile von Alibaba aufgekauft worden. Die chinesisch-sprachigen Foren-Beiträge können ja glücklicherweise automatisch übersetzt werden. Interessant bei Flink ist auch die gute Integration mit Apache RocketMQ, einer Alternative zu Kafka als Event-Hub. Wie Flink wird auch die Open-Source-Community von RocketMQ von Alibaba getrieben.
Big Data Stream Analytics Engines eine ganze Reihe. Apache Beam ist so eine Art aufkommender Standard für Stream Analytics. So ähnlich, wie SQL ein Standard für relationale Datenbanken ist.
Apache Spark und Apache Flink sind Beam-Runner, sollen also den Beam-Standard erlauben. Ein weiterer Open-Source Beam-Runner ist Apache Samza, der auch Kafka-Topics lesen und schreiben kann. Im Vergleich zu Apache Spark bringt Samza deutlich weniger Features und ist vielleicht gerade deswegen für den einen oder anderen Use-Case eine interessante Alternative.
Neben diesen ‘klassischen’ Stream-Analytics Engines gibt es eine wachsende Zahl von real-time Analytics fähigen Datenbanken.
Apache Druid ist in der Entwicklung schon weit fortgeschritten. Die Daten können in statischen Quellen, wie auf HDFS oder Amazon S3 liegen oder eben in real-time Quellen wie Kafka oder Amazon Kinesis.
Die Analysen erfolgen batch oder real-time mit Hilfe von einer intuitiv bedienbaren GUI.
Auch andere Big-Data taugliche Datenbanken entwickeln sich in eine vergleichbare Richtung.
Apache Ignite versteht sich ist eine verteilte Datenbank für hochperformantes Rechnen. Das Data Streaming API erlaubt eine Anbindung an Kafka, aber auch an Flink, RocketMQ, Spark, Storm, MQTT-Quellen. In einer Evaluation ist es als Alternative ist Ignite sicher ein guter Kandidat in einer Evaluation.
Echtzeit-Auswertungen sind ja per se Zeitabhängig, erfolgen in Zeitintervallen. Und so liegt es nahe, dass Zeitreihen-Datenbanken für Echtzeit-Auswertungen heranzuziehen sind. Die Anbindung an Kafka kann mit dem Kafka Connect API erfolgen. Man benötigt ein Verbindungsstück zwischen Kafka und der Datenbank. Die Events werden aus dem Kafka-Topic in die Datenbank übertragen, meist als Key-Value Paare. Dort werden sie weiterverarbeitet. Redis ist ein beliebter Key-Value Store. Und kann als Zwischenspeicher für eine weitere Auswertung, resp. Visualisierung dienen.
InfluxDB wiederum ist beliebt als Zeitreihen-Datenbank. Echtzeitauswertungen erfolgen ja oft in Zeitintervallen und da bietet es sich an, die Auswertung in einer Time-Series-Database wie Influxdb vorzunehmen.
Auch für Hazelcast gibt es einen Apache Kafka Connector und das Stream Processing API wird in Java eingebettet.
Meine Nr. 1 zur Analyse von Real Time Big Data
Das Gespann Kafka + Spark ist aus meiner Sicht momentan das schlagkräftigste und facettenreichste, wenn es um Echtzeitanalysen geht. Andere holen auf und es bleibt spannend.