Wo wird Data Stream Processing eingesetzt? Welche Infrastruktur ist dazu notwendig und welche Tools existieren? Dieser Artikel zeigt einige grundlegenden Herausforderungen und Konzepte. 

Definition von Big Data

In einem Blog Post definierte Gartner im Jahr 2001 den Begriff Big Data als Technologien zur Verarbeitung von Daten mit

  • sehr großem Volumen
  • sehr hoher Geschwindigkeit
  • und verschiedensten Formaten und aus verschiedensten Quellen.

Diese Definition ist auch bekannt als 3V nämlich Volume, Velocity, Variety.

Realtime Big Data Processing adressiert alle drei Aspekte.

Einsatzgebiete

Die Daten, die mit Data Stream Processing verarbeitet werden, stammen aus verschiedensten Quellen:

  • Logdaten zur Überwachung von Servern
  • Sensordaten aus Endgeräten, wie Smartphones (IoT)
  • Sensordaten aus Fahrzeugen und Geräten (IoT, Predictive Maintenance)
  • Überwachungsdaten
  • Social Media Daten
  • und viele andere mehr

Das Ziel ist eine möglichst zeitnahe Auswertung der Daten für Reports und Alert.

Anforderungen

  • Hoher Durchsatz – große Volumen von Daten sollen in kurzer Zeit verarbeitet werden.
  • Geringe Latenz – die Verarbeitung der Daten soll mit kleinster Verzögerung, also möglichst zeitnah, erfolgen.
  • Präzise Ergebnisse – die Berechnungen sollen präzise sein und möglichst nicht approximiert werden müssen.
  • Skalierbarkeit der Systeme – bei temporären Spitzenlasten sollen die Systeme möglichst einfach skalieren.

 

Klassische Architektur

Die Problemstellung ist nicht neu. Die klassische Architektur sieht folgendes vor:

  • Die Daten werden beispielsweise von einem Webserver in Empfang genommen.
  • Dieser schreibt die Daten in ein  (relationales) Datenbanksystem.
  • Von dort werden sie periodisch abgeholt und in ein Data Warehouse (DWH) gebracht. Dabei wird eine ETL-Verarbeitung (extract-transform-load) vorgenommen. Klassischerweise sind das Batch-Verarbeitungen.
  • Der Report und der Alert werden aufgrund der Ergebnisse von DWH-Abfragen gebildet.

Dieses Vorgehen stößt vielerorts an seine Grenzen, gerade weil die Datenvolumen zu groß werden, als dass die Batch-Verarbeitungen noch schnell genug durchlaufen könnten.

Die Realtime Stream Verarbeitung bietet eine mögliche Lösung.

Positionieren Realtime Stream Verarbeitung

Die Daten werden zuerst analysiert und anschließend gespeichert. So wird eine zeitnahe Reaktion auf Events ermöglicht.

Der Data Lake

Allerdings bedingt dies den Einsatz einer neuen Architektur.  In einem ersten Schritt baut man an Stelle eines Data Warehouses einen Data Lake auf. Der Unterschied liegt in der Art der Daten. Da wo das DWH aggregierte und bereinigte Daten enthält, da speichert man die Daten im Data Lake auch in der ursprünglichen Form. Dabei fallen sehr große Datenmengen an. Passen diese nicht mehr auf die üblichen Medien, dann bietet es sich heute an, sie in einem verteilten Filesystem zu speichern.

Die Idee der verteilten Filesysteme geht auf ein Google Paper The Google File System zurück . Als Open-Source-Pandant steht uns heute Apache Hadooop zur Verfügung.

Data Nodes

Ein verteiltes Filesystem besteht aus einer Vielzahl von Data Nodes. Ein Data Node ist ein Rechner in einem Rechenzentrum. Darauf sind Linux mit einem Linux-Filesystem installiert, sowie die Software für HDFS (Hadoop Distributed File System). Ein Big-Data-Filesystem umfasst eine Vielzahl an Data Nodes. Die Systeme sind gebaut im Wissen, dass diese Hardware ausfallen kann. Es soll also nicht gleich das ganze Filesystem lahm gelegt werden.

Data Block

Diese Systeme sind nicht geeignet für kleine Files sondern sie sind ausgelegt auf sehr große Files, die Größen bis hin zu Petabytes erreichen können.

Die Files werden in Blöcke aufgeteilt. Übliche Blockgrößen sind 64MB, 128MB, 256MB. Jeder Block wird auf drei Data Nodes repliziert. So wird erreicht, dass das System verfügbar bleibt, auch wenn ein einzelner Data Node ausfällt.

Letztendlich ist HDFS nicht mehr als ein Filesystem mit der üblichen hierarchischen Verzeichnisstruktur.

MapReduce

Um Files von dieser Größe einigermaßen schnell analysieren zu können, wurde mit MapReduce ein neues Programmierparadigma geschaffen. Es geht zurück auf ein weiteres Google-Paper MapReduce: Simplified Data Processing on Large Clusters.

 

Das Bild zeigt ein Beispiel in Python. Es zählt die Anzahl der Vorkommnisse einzelner Wörter in einem Text. Das Programm kann nur mit Hilfe des entsprechenden Frameworks ausgeführt werden. Als Programmierer konzentriert man sich auf die Entwicklung der Map-Schritte und der Reduce-Schritte. Im Map-Schritt werden Operationen auf einzelne Datenelemente angewendet und es werden Key-Value-Paare ans Framework zurückgegeben. Im Reduce-Schritt wird eine Operation auf Gruppen von Key-Value-Paaren angewendet werden, die denselben Key haben.

Im Map Schritt im Beispiel oben, wird ein Input-Zeile in Wörter zerlegt und im Reduce-Schritt werden die gleichen Wörter gezählt.

Das Framework  schickt das Programm zu den Daten auf den einzelnen Data Nodes. Dort wird das Programm ausgeführt. Das Framework sorgt dafür, dass die Schritte in der vorgesehenen Reihenfolge verteilt auf den einzelnen Rechner ausgeführt werden.

Das Framework sortiert auch die Zwischenergebnisse der Mapper und verteilt die Daten wieder für den Reduce-Schritt.

Ein Mapper ist eine Art “Riesen-Select” und ein Reducer ist eine Art “Riesen Group By”.

Das Hadoop Ökosystem

MapReduce-Programmieren ist sehr aufwändig, sprich: wenig produktiv. So verwundert es nicht, dass eine Vielzahl von Tools entstanden sind und noch entstehen, die auf MapReduce und HDFS basieren, dem Programmierer jedoch das Aufwändige Programmieren mit MapReduce abnehmen oder dem Analysten gar Tools zur Datenauswertung an die Hand geben. Hier ein vertiefender Blog-Post zu diesem Thema.

Der Event Hub

Die Verarbeitung mit MapReduce ist vergleichsweise langsam und reicht nicht an die Anforderung der Latenzzeiten  Realtime Analysen heran.

Zudem sind die Datenblöcke im HDFS ziemlich groß. Die Default-Einstellung bei HDFS 3 ist 256MB. Die Events, die von den Datenquellen eintreffen, sind oft eher klein, so dass relativ viele Events zu einem Block zusammengefasst werden müssen, bevor dieser auf HDFS geflusht werden kann. Um keine Events zu verlieren und vor allem, um die Events auch vor dem Schreiben auf HDFS zu analysieren, setzt man einen Event-Hub ein.

Damit können die Events gespeichert und verschiedenen Analysen unterworfen werden, bevor man sie im Data Lake aufbewahrt. Treffen große Mengen an Events ein, dann wird auch dieser Event-Hub als verteiltes System gehalten.

Die Internet-Riesen sprechen von Event-Hubs mit Petabytes von Daten, die täglich in der Form anfallen und ausgewertet werden (Beispiel: Netflix Keystone).

Auch in einem Event Hub werden die Daten auf mehrere Server im Rechenzentrum verteilt. Beispiele für Event Hubs sind Apache Kafka und Apache Pulsar.

Natürlich wird man die Daten auch in einem Event Hub organisieren. Bei Apache Kafka ist die Rede von Topics.

Die Systeme, die Events entgegennehmen, also beispielsweise Sensor-Daten empfangen, schreiben die Daten in Topics und werden so zu einem Publisher für das Topic.

Andere Systeme werden diese Topics lesen und analysieren oder an andere Systeme weiterreichen – zum Beispiel im Data Lake aufbewahren. Diese Systeme werden zu Subscribern des Topics. Die Daten werden eine zu definierende Zeit im Topic aufbewahrt, bevor der Platz für andere Events freigegeben wird.

Verteilte real time Analysen

Setzen wir die bisher vorgestellten Architekturelemente zu einer Gesamtarchitektur zusammen, dann stellen wir fest, dass noch die Systeme für die verteilte Analyse der Daten zu ergänzen sind. Diese sind Subscriber des Event-Hubs.

Auch die Analyse kann einer sehr hohen Last unterworfen werden. Um die oft geforderte geringe Latenz zu erreichen, werden auch die Analysen verteilt auf einer Vielzahl von Worker-Nodes im Rechenzentrum ausgeführt.

Data Stream Processing

Schauen wir die Herausforderungen an die Analyse und Verarbeitung solcher Data Streams an.

Die Verarbeitung kann elementweise erfolgen, Immer häufiger werden klassische ETL-Prozesse durch Data Stream Processing abgelöst. Hier kommt elementweise Verarbeitung zum Zuge, beispielsweise, um Daten zu bereinigen (Datumstransformation und ähnliches).

Gerade für Analysen sind Aggregationen interessant. In SQL entsprechen diese einem Group By.

Oft werden auch Daten aus verschiedenen Quellen zu einem Stream vereint – in SQL entspricht dies einem Join.

Immer häufiger setzt man Machine Learning auf Streams ein. Anwendungsfälle sind Predictive Maintenance oder Fraud Detection.

Im Folgenden Konzentrieren wir uns auf die Aggregationen.

Aggregationen

In einer real time Analyse werden die eintreffenden Daten typischerweise nach Zeitintervallen zusammengefasst (aggregiert) und dargestellt.

Zeitfenster können verschiedene Eigenschaften haben. Man unterscheidet die folgenden Window-Typen

 

Fixed Windows (werden auch hopping Windows genannt) unterteilen die Zeit in gleichmäßige Intervalle. Die klassische Batch-Verarbeitung mit den typischen nächtlichen Verarbeitungen, macht genau das gleiche. In der real time Verarbeitung sind die Intervalle jedoch sehr viel kleiner und die Analyse erfolgt sehr zeitnah zum Eintreffen der Daten.

Sliding Windows haben Überlappungen. Dies erlaubt es, einen Einblick in die eintreffenden Daten zu erhalten noch bevor die Verarbeitung eines Window fertig abgeschlossen ist.

Sessions sind auch Zeitfenster. Diese haben jedoch unterschiedliche Länge und überlappen. Die Aktionen, die während einer Web-Session oder einer Datenbank-Session vorgenommen werden, können als Event-Stream aufgefasst werden.

Zeitkritische Verarbeitung

Nicht jede Streamverarbeitung ist gleichermaßen zeitkritisch. Sollen beispielsweise Herzpatienten überwacht werden, dann ist eine sehr zeitnahe Verarbeitung gefordert.

Soll beispielsweise der Puls gemessen werden, dann dürfen auch keine Events übersprungen werden und es dürfen keine Events mehrfach verarbeitet werden. In diesem Zusammenhang spricht man von Delivery Garantien: exactly once, at least once, at most once. Nicht jedes System bring exactly once Garantie mit.

Faktor Zeit

Konzentrieren wir uns auf den Zeitfaktor. Zwischen dem Zeitpunkt des Aufzeichnens und Abschickens eines Events von der Datenquelle und dessen Verarbeitung im Analysesystem kann eine variable Zeitspanne liegen. Je nach Anwendungsfall ist diese Zeitspanne von fundamentaler Bedeutung.

Die rote Linie zeigt den nie eintreffenden Idealfall: die Verarbeitungszeit ist gleich der Event-Zeit. Die violetten Punkte zeigen mögliche Abweichungen der Verarbeitungszeit von der Eventzeit.

Bedeutet “Realtime” jetzt Verarbeitungszeit – dann ist die Präzision der Ergebnisse einfach erfüllbar.

 

Das gelbe Diagramm (oben links) zeigt die Aggregation der Events (hier werden die Events gezählt) pro Minute. Window-Länge ist also eine Minute. Wir betrachten die Achse “Processing Time” und zählen pro Minute die Events. Das Ergebnis zeichnen wir im Histogramm auf.

Anders sieht es aus, wenn sich der Anspruch von “Realtime” an die Event-Zeit richtet. Hier ist wegen der Abweichung der beiden Zeitpunkte die Präzision viel schwieriger zu erreichen. Diesmal zählen wir die Events pro Minute auf der Achse Event-Time und zeichnen die Ergebnisse im grünen Histogramm unten rechts auf.

Vergleichen wir die beiden Diagramme (unten links), dann sehen wir große Abweichungen zwischen den beiden Analysen. Je nach Anwendungsfall sind solche Abweichungen nicht tragbar.

Eventzeit verarbeiten

Untersuchen wir das Phänomen genauer.

Zur Verdeutlichung färben wir die Events, die in derselben Minute anfielen mit je einer Farbe. Wir zeichnen einen Eventstream auf:

Oben stellen wir die Events dar in derjenigen Reihenfolge in der sie verarbeitet werden. Im unteren Stream stellen wir die Events in der Reihenfolge dar, wie sie produziert wurden. Die Pfeile zeigen die Zuordnungen, die das verarbeitende System vornehmen muss, um eine korrekte Analyse zu produzieren.

Trigger

Data Stream Processing Systeme stellen Triggers zur Verfügung. Pro Zeitintervall (hier eine Minute) kann ein Anwendungsprogrammierer drei Trigger absetzen:

  1. Early Trigger – beispielsweise nach 1/3 des Windows erhält das Analyseprogramm einen Zwischenstand für ein Zeitfenster.
  2. On Time Trigger – nach Ablauf des Zeitfenster wird das Ergebnis geliefert.
  3. Late Trigger – beispielsweise nach 2 Minuten wird noch einmal ein Ergebnis geliefert, diese enthält jetzt auch die inzwischen noch eingetroffenen Ergebnisse. Danach wird das Zeitfenster geschlossen. Später eintreffende Events werden nicht mehr analysiert.

Die Animation zeigt die Entwicklung der Events, wie sie anfallen und wie die Statistik sich entwickelt.

Wir sehen, dass wir in diesem Fall ein Event verlieren. Als Gegenmaßnahme können wir den Late-Trigger später abholen. Dann werden wir jedoch insgesamt mehr Ressourcen benötigen, weil mehr offene Fenster zu verwalten sind. Oder wir verzichten darauf und richten einen weiteren Subscriber auf unseren Event-Hub ein. Damit holen wir die Daten aus dem Event-Hub ab, bringen sie in den Data Lake und können nachgelagert noch ganz präzise Auswertungen erstellen. Der Use Case und die Ressourcen bestimmen das Szenario.

Open Source Data Stream Processing

Einige Beispiele:

Die Bilder zeigen die Logos der entsprechenden Produkte, resp. Apache Projekte.

 

Kommender Standard: Apache Beam – An advanced unified programming model

 

Quelle Bild: Apache Beam

 

Quelle des Bildes des Data Centers: Das Bild zeigt das Google Data Center und es wird häufig verwendet. Das ursprüngliche Bild konnte ich nicht finden – möglicherweise stammt es aus diesem Video.