Zeit im Big Data Stream Processing
Event-Time, Ingestion-Time, Processing-Time? Und was bedeutet da Real-Time? Der Artikel beleuchtet spezielle Herausforderungen der Echtzeitanalyse großer Datenströme besonders im Hinblick auf den Faktor Zeit.
Inhalt
Typische Anwendungsfälle des Data Stream Processings
Zeitkritische Verarbeitung von Data Streams
Zeitfenster (Windows) in der Data Stream Processing
Typen des Data Stream Processing
Zeitversatz (Time Skew)
Auswirkungen des zeitlichen Versatzes
Analysen nach Event-Zeit
Watermark
Trigger
Als Anwender der Data Stream Processing Engine
Typische Anwendungsfälle des Data Stream Processings
Streams sind überall:
- Logdaten zur Überwachung von Servern
- Sensordaten aus Endgeräten, wie Smartphones (IoT)
- Sensordaten aus Fahrzeugen und Geräten (IoT, Predictive Maintenance)
- Überwachungsdaten- z.B. Webcams
- Social Media Daten
- Echtzeitanalysen in Data Warehouses, resp. Data Lakes
- und viele andere mehr
Statt von ‘Records’ ist die Rede von ‘Events’ oder auch von ‘Messages’ oder ‘Nachrichten’. Die Events werden ununterbrochen generiert und gelangen in einem fortwährenden Strom in die Analytics-Pipeline.
Unaufhörlich treffen neue Events ein. Stream Processing verarbeitet und analysiert ‘unbeschränkten’ Datasets (engl. unbounded Datasets). Eigentlich sind es Warteschlangen (Queues) und die Events.
Zeitkritische Verarbeitung von Data Streams
Sollen beispielsweise Herzpatienten von Ferne überwacht werden, dann ist eine sehr zeitnahe Verarbeitung und Analyse gefordert.
Verläßt der Puls eine gewisse Toleranzschwelle, dann wird ein Alarm ausgelöst. Diagramme werden zusätzlich erstellt und ermöglichen die Diagnose und Überwachung.
Das Szenario ist stark vereinfacht:
Ein kleines Gerät erhebt den Puls bei einem Herzpatienten. Für jeden Herzschlag schickt es ein Event via Internet zu einer Real-Time-Pipeline.
Nicht jedes Event nimmt denselben Weg durchs Internet – und so treffen die Events in nahezu beliebiger Reihenfolge in der Analytics Pipeline ein.
Wird jetzt der Puls unseres fiktiven Patienten ausgewertet, dann dürfen keine Events verloren gehen und es dürfen keine Events mehrfach verarbeitet werden.
Dies zu garantieren, ist in Big Data Systemen eine besonders komplexe Aufgabe. Immerhin sollen diese Systeme ja horizontal skalieren.
Vorkehrungen bei der Konfiguration des Gesamtsystems und bei der Programmierung Analyse sind notwendig, um die geforderte Exactly-Once Verarbeitung zu erreichen.
Zeitfenster (Windows) in der Data Stream Processing
Für eine Echzteit-Analyse werden die laufend eintreffenden Events typischerweise nach Zeitintervallen zusammengefasst (aggregiert) und das Ergebnis wurd als Zeitreihe dargestellt.
Dazu teilt die verwendete Stream Analytics Engine die Events in Zeitfenster (engl. Windows) ein.
Man unterscheidet die folgenden Window-Typen:
Fixed Windows (heißen auch hopping Windows) unterteilen die Zeit in gleichmäßige Intervalle. Die klassische Batch-Verarbeitung mit den typischen nächtlichen Verarbeitungen, macht eigentlich genau das gleiche. In der real-time Verarbeitung sind die Intervalle jedoch sehr viel kleiner und die Analyse erfolgt sehr zeitnah zum Eintreffen der Daten.
Sliding Windows haben Überlappungen. Dies erlaubt es, einen Einblick in die eintreffenden Daten zu erhalten noch bevor die Verarbeitung eines Window fertig abgeschlossen ist.
Sessions sind auch Zeitfenster. Diese haben jedoch unterschiedliche Länge und überlappen. Selbst die Aktionen in einer Web-Session oder in einer Datenbank-Session bilden einen Event-Stream.
Die Herzschläge unseres fiktiven Patienten analysieren wir mit Fixed Windows – der Puls eines Patienten wird ja pro Minute erhoben.
Typen des Data Stream Processing
Immer häufiger werden klassische ETL-Prozesse durch Data Stream Processing abgelöst. Hier kommt elementweise Verarbeitung zum Zuge, beispielsweise, um Daten zu bereinigen (Datumstransformation und ähnliches).
Gerade für Analysen sind Aggregationen interessant. In SQL entsprechen diese einem Group By.
Oft werden auch Daten aus verschiedenen Quellen zu einem Stream vereint – in SQL entspricht dies einem Join.
Machine Learning auf Streams wird immer beliebter. Anwendungsfälle sind Predictive Maintenance oder Fraud Detection.
Im Beispiel unseres fiktiven Herzpatienten aggregieren wir die Events und zählen sie pro Minute.
Zeitversatz (Time Skew)
Konzentrieren wir uns auf die Zeit. Der Zeitpunkt eines einzelnen Herzschlags des Patienten ist die Event-Time. Trifft das Event bei der Pipeline ein, dann spricht man von Ingestion-Time.
Präziserweise würden wir von Timestamp statt von Time sprechen.
Zwischen diesen beiden Zeitpunkten kann theoretisch beliebig viel Zeit verstreichen. Und möglicherweise befindet sich der Patient gerade in einer anderen Zeitzone als die Streaming Pipeline.
Auch innerhalb der Pipeline wird es eine gewisse Zeit dauern, bis das Event verarbeitet wird. Der Zeitpunkt wird Processing Time genannt.
Die Abweichung zwischen der Processing Time und der Event Time bezeichnen wir als Zeitversatz (engl. Time Skew).
Der Begriff “Real-Time” suggeriert, dass der Zeitversatz gleich Null ist. Doch das ist unrealistisch. Eigentlich sollte von “Near-Real-Time” gesprochen werden.
Das hervorgehobene Event wurde um 12:00:xx generiert und wird ist erst um 12:05:yy von der Pipeline verarbeitet.
Soll nach Processing-Time analysiert werden, dann fällt das Event ins Zeitintervall der Minute 12:05 -12:06.
Soll hingegen nach Event-Time analysiert werden, dann gehört das Event ins Zeitintervall zwischen 12:00 und 12:01.
Die Diagonale zeigt den nie eintreffenden Idealfall: die Verarbeitungszeit ist gleich der Event-Zeit – das wäre Echtzeit im wörtlichen Sinn. Der zeitliche Versatz von der Ideallinie ist sowohl in der x-Achse als auch in der y-Achse sichtbar.
Dieser zeitliche Versatz nicht konstant sondern hängt von vielen technischen Faktoren ab:
- Vom Zustand der Systeme außerhalb der Stream-Analytics-Pipeline, beispielsweise von der Übertragungsgeschwindigkeit im Internet.
- Vom Zustand der Stream-Analytics-Pipeline.
Auswirkungen des zeitlichen Versatzes
Bedeutet “Real-Time” jetzt Verarbeitungszeit – dann verarbeitet die Analytics Engines die Events in der Reihenfolge ihres Eintreffens. Ganz unabhängig davon, wie lange die Events unterwegs und welchen Weg sie durchs Internet eingeschlagen haben.
Das Histogramm zeigt die Anzahl der Events pro Minute. Window-Länge ist also eine Minute. Wir betrachten die Achse “Processing Time” und zählen pro Minute die jeweils Events. Das Ergebnis zeichnen wir im Histogramm auf.
Anders sieht es aus, wenn mit “Real-Time” an die Event-Zeit gemeint ist, wie bei unserem Herzpatienten.
Rechnen wir das Ergebnis der Event-Time Analyse in unserem Beispiel aus: Diesmal zählen wir die Events pro Minute auf der Achse Event-Time. Dabei entsteht das folgende Histogramm:
Legen wir beide Histogramme übereinander, dann sehen wir große Abweichungen zwischen den beiden Analysen. Je nach Anwendungsfall sind solche Abweichungen nicht tragbar.
Bei unserem Beispiel: die beiden Analysen des Pulses unseres fiktiven Patienten weichen sehr stark voneinander ab.
Der Zeitversatz zwischen Processing-Time und Event-Time stellt hohe Anforderungen an die Stream Analytics Engine. Diese arbeitet ja auf dem Zeitstrahl der Processing-Time, erhält die Events in beliebiger Reihenfolge mit beliebigen Versatz und ist gefordert in Echtzeit nach Event-Time auszuwerten.
Analysen nach Event-Zeit
Untersuchen wir das Phänomen genauer.
Zur Verdeutlichung färben wir die Events, die in derselben Minute anfielen (Event-Zeit) mit je einer Farbe.
Daraus leiten wir den Event-Stream ab in der Reihenfolge der Processing-Time. In der Reihenfolge verarbeitet die Analytics Engine die Events ja in jedem Fall.
Die Analyse nach Minute ist für die Überwachung unseres Herzpatienten unbrauchbar.
Die Analytics Engine stellt jetzt die Reihenfolge wieder her, in der die Events ursprünglich angefallen ist.
Wir ergänzen das Diagramm durch den Event-Stream nach Event-Time. Die Pfeile zeigen, wie die Stream Analytics Engine die laufend verarbeiteten Events intern in die Zeitfenster der Event-Zeit zuordnet und so die urspüngliche Reihenfolge wieder herstellt.
Dabei reicht die Zuordnung zum Zeitfenster aus – immerhin werden die Daten anschließend pro Zeitfenster ausgewertet. Pro Zeitfenster verwaltet die Analytics Engine also einen internen Puffer.
Man beachte, dass es sich bei einem Stream nicht um ein abgeschlossenes Dataset handelt, das einfach umsortiert werden kann. Bei dem unbounded Dataset des Streams treffen laufend neue Events ein.
Moderne Real-Time Analytics Systeme sind in der Lage, diese Zuordnung vorzunehmen.
Watermark
Bleibt die Frage, wie lange diese internen Puffer bestehen bleiben. Immerhin läuft die Pipeline ununterbrochen und potenziell unendlich lange. Diese Puffer verbrauchen Platz im Memory, der Platz ist endlich und muss frei gegeben werden.
Der zeitliche Versatz liegt in der Natur der Pipeline. Die Abweichung zwischen Event-Time und Processing-Time sollte für die meisten Events vergleichbar sein. Es wird immer Ausreisser geben, die mit erheblicher Verspätung eintreffen.
Als Watermark bezeichnet man die Toleranzgrenze für verspätetes Eintreffen. Setzen wir in unserem Beispiel die Watermark auf zwei Minuten, dann wird der Pupper für das erste Intervall, das ja um 12:01 endet, um 12:03 abgeschlossen, also zwei Minuten nach Ende des Zeitfensters.
Später eintreffende Events werden für die Analyse nicht mehr berücksichtigt.
Trigger
Wir sprechen von Echtzeit und wollen sofort Ergebnisse sehen und nicht warten, bis das Zeitfester plus die Wassermarke verstrichen sind. In unserem Beispiel mit einem Zeitfenster von einer Minute un der Watermark von zwei Minuten, würde das ja bedeuten, dass wir drei Minuten warten müssen, um Ergebnisse zu sehen.
Nach drei Minuten werden wird das Endergebnis sehen. In einer gut funktionierenden Pipeline, sollten in den meisten Fällen ja relativ wenig verspätete Events eintreffen, so dass meistens die Auswertung für ein Zeitfenster unmittelbar aktuell ist.
Doch gerade bei längeren Zeitfenstern wäre es interessant, auch schon vorher einen Einblick zu erhalten und ein vorläufiges Ergebnis zu sehen.
Triggers dienen genau dazu.
Early Trigger bestimmen den Zeitpunkt, du dem ein Einblick in die Entwicklung der Analyse bei einem offenen Zeitfenster erlaubt. Der Zeitpunkt wird relativ zum Beginn des Zeitfensters bestimmt.
Ontime Trigger bestimmen den Einblick in die Analyse nach Abschluss des Zeitfensters.
Late Trigger bestimmen die Watermark, also die Zeitspanne, die nach dem Ontime Trigger noch durchlaufen wird, bevor die Ergebnisse der Auswertung für das betroffene Zeitfenster abschließend vorliegen und der betreffende Puffer im Memory frei gegeben wird.
In der Animation wurden die folgenden Werte angenommen:
- Ontime Trigger = Dauer eines Zeitfensters = 1 Minute
- Early Trigger nach 20 Sekunden (diese Wahl passt zum Beispiel und ist nicht praktischer Natur)
- Late Trigger = Watermark = 2 Minuten
Als Anwender der Data Stream Processing Engine
Auf der Reise von der Quelle bis zur Verarbeitung durchläuft ein Event viele Stationen. Jede kann theoretisch einen Zeitstempel auf das Event schreiben und nach jedem dieser Zeitstempel könnte eine Auswertung erfolgen. Die Aussage des Ergebnisses wäre jedes Mal eine andere.
Als Anwender eines Real-Time Systems sorgen wir dafür, dass wir die Events nach dem richtigen Zeitstempel auswerten. Je nach Anwendungsfall berücksichtigen wir auch unterschiedliche Zeitzonen.
Jedes Real-Time Analytics System löst die in diesem Artikel ganannten Herausforderungen. Jedes System hat ein eigenes API und ein eigenes Wording. So wird beispielsweise der Trigger nicht in jedem System so benannt.
Die Darstellung in diesem Artikel beziehen sich auf das allgemeine Data Flow Model. Und mit Apache Beam ist auch eine Art Standard in Arbeit, um dem Endanwender die Einarbeitung zu erleichtern.
In den meisten Anwendungsfällen wird eine Auswertung nach Event-Time notwendig sein. Wir müssen also dafür sorgen, dass der Event-Timestamp auf den Events vorhanden ist.
Das unterliegende System kann dem Anwender die komplexe Handhabung der Zuordnung der Events zu den passenden Windows ab.
Moderne Big Data Stream Analytics Systeme ermöglichen dies für Datenströme, die so groß sind, dass sie auf einem verteilten System analysiert werden müssen. Und so werden auch die Data-Buffers die für diese Zuordnung notwendig sind, auf mehrere Systeme verteilt.
Weiterführende Quelle:
Akidau et. al: The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale Unbounded, Out-of-Order Data Processing, 2015, Proceedings of the VLDB Endowment