Was ist Data Lineage?
Bevor wir uns näher mit Spline beschäftigen klären wir kurz den Begriff der Data Lineage. Data Lineage ist ein Begriff der ursprünglich aus dem Data-Warehouse Umfeld stammt und der sich im Wesentlichen mit der Dokumentation der Abstammung von Datensätzen aus Transparenzgründen beschäftigt. Ziel ist es sowohl die Herkunft aggregierter Daten als auch das Datenmapping möglichst vollständig zu archivieren. Dabei liegt der Fokus auch auf der Visualisierung der Data Lineage, da eine reine Beschreibung der Datensatzherkunft zu intransparent wäre.
Praktische Bedeutung der Data Lineage
Die Nachvollziehbarkeit der Datensatzherkunft hat insbesondere in regulierten Branchen eine hohe Bedeutung. Bei Auditierungen müssen Unternehmen in kurzer Zeit nachweisen, welche Daten bzw. Kennzahlen auf welcher Basis entstanden sind. Ein Beispiel hierfür ist BCBS 239, ein regulatorisches Rahmenwerk bei dem „systemrelevante“ Banken die Herkunft ihrer Risikodaten offenlegen müssen. Eine vollständige und transparente Darstellung der Data Lineage ist dabei notwendig um die genannten Anforderungen zu erfüllen.
Aufgrund immer komplexer werdender ETL-Strecken dürfte das Thema für den Big Data Kontext auch in eher deregulierten Industrien an Bedeutung gewinnen. Gerade an den Schnittstellen zwischen Datenlieferanten – Data Engineering – Data Science - Business kann eine transparente Data Lineage einen wertvollen Beitrag leisten. Sie erhöht das Vertrauen in gelieferte (Teil-)Ergebnisse, kann als Diskussionsgrundlage dienen und verringert Verständnisprobleme.
Data Lineage wird somit zunehmend auch zu einem Thema für Big Data Processing Engines, die es in einem einzigen Job ermöglichen, die Datensatzherkunft sozusagen bis zur Unkenntlichkeit zu transformieren und/oder zu aggregieren. Ein Beispiel hierfür ist Apache Spark. Dessen Optimierungsengine sowie die daraus resultierenden Auswirkungen auf die Data Lineage, beleuchten wir im Folgenden etwas genauer.
Apache Spark die hochperformante Blackbox?
Spätestens mit der Einführung von Spark SQL und der high-level DataFrame API für strukturierte Daten festigte Spark seine Stellung als hochperformantes Big Data Framework für Cluster Computing. Die dadurch erzielten Performancezuwächse im Vergleich zur RDD API sind durch den sog. Catalyst Optimizer, einer Mischung aus regel- und kostenbasierten Abfrageoptimierer, begründet.
Etwas genauer betrachtet erfolgt die Optimierung des Catalyst Optimizers auf Basis von Directed Acyclic Graphs (DAGs), welche wiederum auf Grundlage der RDD Lineage konstruiert werden. Die RDD Lineage ergibt sich aus der Tatsache das RDDs unveränderlich sind. Jede auf einem RDD ausgeführte Transformation erzeugt ein neues RDD. Der sich daraus ergebende Lineage wird von den RDDs selbst in Form von Pointern gehalten. Der Pointer eines RDDs zeigt dabei auf ein oder mehrere Eltern-RDDs aus denen es erzeugt wurde.
Der Optimierungsvorgang an sich basiert auf Konstrukten funktionaler Programmierung und begründet die eigentliche Performanz. Das Ergebnis sind mehrere Execution Plans nach denen die Daten phasenweise verarbeitet werden.
Execution Plans des Catalyst Optimizers:
- Parsed Logical Plan
- Analyzed Logical Plan
- Optimized Logical Plan
- Physical Plan
github:6c45c2ed2648ae6acd6403bdf30530de
Wie an diesem Beispiel ersichtlich wird, führen diese verhältnismäßig einfachen Transformationen bereits zu einer erheblichen Intransparenz. Die Data Lineage ist nur sehr schwierig bzw. bei Jobs mit mehreren Operationen u. U. überhaupt nicht mehr nachvollziehbar. Eine geeignete Visualisierung fehlt gänzlich. Die Möglichkeit die Data Lineage über die Spark Web UI nachzuvollziehen entfällt ebenfalls, da diese den bereits optimierten DAG darstellt und keinerlei Bezug zu einzelnen Attributen nimmt.
Ein vielversprechendes Open Source Tool, das den genannten Problemen durch die Optimierungsengine von Spark begegnen soll ist Spline, welches wir im Folgenden nun genauer vorstellen.
Spark + Lineage = Spline
Der Name Spline setzt sich aus den Anfangsbuchstaben von Spark und Lineage zusammen und verfolgt zwei Ziele:
- Die Aufzeichnung und Speicherung der Data Lineage der prozessierten Daten auf Grundlage von Spark Jobs
- Die Visualisierung der gespeicherten Data Lineage mittels einer Web UI
Spline macht sich dabei genau zwei Dinge zu nutze. Zum einen sind dies die Execution Plans des Catalyst Optimizers, die die Data Lineage auf Basis der RDD Lineage halten. Zum anderen die Möglichkeit, für spezifische Anwendungsfälle individualisierte Spark Listeners einzufügen. Spark Listeners sind Event Handler, die Events einer Spark Anwendung die zwischen Driver und Executors durch einen internen Event Bus versendet werden, abfangen.
Funktionsweise Spline
Wird ein Spark Job auf einem Cluster gestartet, löst die erste Action eine Abfrageroutine aus. Diese Abfrageroutine entspricht im Wesentlichen den Phasen des Catalyst Optimizers und resultiert in einem QueryExecution Objekt. Das QueryExecution Objekt, welches die Execution Plans hält, wird wiederum an den Spline Listener, die SplineQueryExecutionListener Klasse, übergeben. Diese übergibt den Analyzed Logical Plan aus dem QueryExecution Objekt an die DataLineageBuilder Klasse. Innerhalb dieser Klasse wird die Lineage aus dem Analyzed Logical Plan auf Basis der Operationen endrekursiv erzeugt. Die Ablage der Data Lineage kann entweder auf MongoDB, HDFS oder Apache Atlas in JSON Format erfolgen und ist vor Initialisierung des Spline Listeners zu konfigurieren.
Initialisierung des Spline Listeners
Während die built-in Listeners von Spark automatisch durch die SparkSession initialisiert werden, muss der Spline Listener manuell hinzugefügt werden. Wichtig ist hierbei den Listener vor der ersten Action (Trigger für die Erzeugung des QueryExecution Objekts) einzuführen. Eine lückenlose Aufzeichnung der Data Lineage kann ansonsten nicht garantiert werden. Es empfiehlt sich also den Spline Listener direkt nach der Initialisierung der SparkSession einzufügen.
github:e8593a36cd5618d5f762e853ab6c0a45
Spline zeichnet die Data Lineage pro Spark Job auf. Der nachfolgende Abschnitt behandelt nun im Detail die Komponenten der Spline Data Lineage sowie deren Zusammenhänge.
Die Komponenten der Spline Data Lineage
Für diesen Beitrag wurde eine MongoDB zur Speicherung der Spline Data Lineage verwendet. Folgende Collections, die die Data Lineage beschreiben, werden nach einem erfolgreichen Durchlauf eines Spark Jobs abgelegt:
- attributes_v4
- dataTypes_v4
- datasets_v4
- lineages_v4
- operations_v4
- transformations_v4
lineages_v4
Die lineages_v4 Collection ist der zentrale Bestandteil der Spline Lineage. Sie beinhaltet u.a. folgende Felder:
- _id
- appId
- appName
- timestamp
- rootDataset
- rootOperation
Die _id identifiziert die Lineage eindeutig und setzt sich aus einem Prefix sowie einer UUID zusammen. Die UUID entstammt wiederum der case class MetaDataset, einem Descriptor der einzelnen Datasets. Die _id basiert somit genau genommen auf einer Metadatenbeschreibung der einzelnen Datasets die auf Grundlage der vorgenommenen Operationen erzeugt werden. appId als auch appName werden dem sparkContext entnommen. Der timestamp ist der Unix Zeitstempel des Betriebssystems zum Startzeitpunkt des Spark Jobs. rootDataset und rootOperation verweisen auf das Ursprungsdataset bzw. die Ursprungsoperation. Diese entsprechen den zeitlich gesehen letzten Datasets (z. B. Zieldatei) bzw. Operationen (z. B. write-Action) da die rekursiv gebildete Lineage am Ende invertiert wird. Die Nachverfolgung der Datenherkunft beginnt damit genau genommen am letzten erzeugten Dataset. rootDataset und rootOperation verweisen über eine Primär-/Fremdschlüsselbeziehung auf die Collections operations_v4 und datasets_v4. Diese halten alle durch die innerhalb eines Spark Jobs angewendeten Operationen sowie die dadurch erzeugten Datasets.
operations_v4
operations_v4 beinhaltet sämtliche auf den Datasets ausgeführten Operationen (Transformations + Actions). Die wesentlichen Felder sind:
- _lineageId
- _id
- name
- inputs
- output
- _index
Die _lineageID ist der Fremdschlüssel der lineages_v4 Collection. _id ist der binäre Identifier der Operation an sich. Analog dazu entspricht name dem Namen der Operation (z. B. Filter). inputs verweist auf das Eingangsdataset. Analog hierzu ist output das Ausgangsdataset der auf inputs angewendeten Operation. Der _index, welcher bei 0 beginnt und um 1 inkrementiert, spiegelt die umgekehrte Reihenfolge der Operationen wider.
Die operations_v4 Collection steht darüber hinaus über eine Primär-/Fremdschlüsselbeziehung in Verbindung mit der dataset_v4 Collection. inputs und output entsprechen dabei den Fremdschlüsseln und verweisen auf den Primärschlüssel id in der datasets_v4 Collection. Der _index der Operation ist der gleiche des Ausgangsdatasets aus der datasets_v4 Collection.
Des Weiteren besteht ein Zusammenhang zwischen der operations_v4 sowie der transformations_v4 Collection. Beide Collections stehen wiederum über eine Primär-/Fremdschlüsselbeziehung in Verbindung, wobei der Primärschlüssel _id der operations_v4 Collection dem Fremdschlüssel _opId der transformations_v4 Collection entspricht.
Außerdem beinhaltet der erste Eintrag der operations_v4 Collection (_index: 0) die Felder destinationType, path und append sofern der Spark Job mit einer write-Action endet. destinationType definiert das Ausgabeformat der Zieldatei. path entspricht dem Ausgabepfad und append signalisiert in Form eines Boolean ob der gleichnamige write-Mode angewendet wurde.
transformations_v4
transformations_v4 beinhaltet alle angewendeten Projektionen (select, drop, withColumn, withColumnRenamed) sowie Column Expressions. Die wesentlichen Felder sind:
- _id
- alias
- _opId
- _lineageId
- _index
Um operations_v4 und transformations_v4 besser unterscheiden zu können, müssen wir uns nochmals die Execution Plans etwas genauer ansehen. Der Übergang von Parsed Logical Plan zu Analyzed Logical Plan ist u.a. dadurch gekennzeichnet, dass mehrere Projektionen, soweit möglich zusammengefasst werden. Da zusammengefasste Projektionen zu einer intransparenten Data Lineage führen würden, werden diese innerhalb einer eigenen Collection, der transformations_v4, entflechtet. Zusammengefasste Projektionen finden sich in operations_v4 und sind durch die Felder name: „Project“ und einer _id (Primärschlüssel) eindeutig identifiziert. Die selbe ID findet sich in der transformation_v4 Collection unter opId (Fremdschlüssel) für jede individuelle Projektion wieder. Die individuellen, entflechteten Projektionen werden innerhalb von tranformation_v4 eindeutig durch _id und dem fortlaufenden _index identifiziert. Das Feld _lineageId entspricht dem Fremdschlüssel der lineages_v4 Collection. alias gibt selbstsprechend den Alias an.
Neben Projektionen enthält transformations_v4 darüber hinaus auch erzeugte Column Expressions. Dabei handelt es sich nicht um auf ganze Spalten angewendete Lambda Funktionen oder UDFs. Lambda Funktionen und UDFs werden von Spline nicht unterstützt und sind als Blackbox zu betrachten. Ein Beispiel für eine Column Expression ist das Hinzufügen einer Spalte, welche aus der Division zweier anderer Spalten resultiert:
github:375534ac2dc6fe90a0057c9b7b2f5916
Des Weiteren sind unter Column Expression in diesem Kontext auch keine built-in SQL-Funktionen (z. B. avg, count, etc.) zu verstehen. Built-in SQL-Funktionen werden in der operations_v4 Collection aufgeschlüsselt. Im Falle von Column Expressions enthalten die Einträge neben den oben Aufgeführten die Felder symbol und dataTypeId. symbol beschreibt dabei die auf die Spalte(n) angewendete Operationen an sich. Bei dem oben aufgeführten Beispiel entspricht dies symbol: “/“. dataTypeId ist ein Fremdschlüssel und verweist auf die dataTypes_v4 Collection (Primärschlüssel _id) und enthält den aus der Spaltenoperation resultierenden Datentyp.
datasets_v4
Die datasets_v4 Collection dokumentiert die einzelnen Datasets die sich aus den angewendeten Operationen ergeben. Die wesentlichen Felder sind:
- _id
- schema
- _lineageId
- _index
Jedes Dataset ist über _id eindeutig identifizierbar. Kern von datasets_v4 ist schema. schema beschreibt jede Spalte des jeweiligen Datasets mit einem binärem Identifier. Die Identifier stellen gleichzeitig die Fremdschlüssel der attributes_v4 Collection (Primärschlüssel _id) dar. _lineageId entspricht wiederum dem Fremdschlüssel der lineages_v4 Collection. _index dokumentiert die Reihenfolge in der die Datasets entstanden sind. Der höchste Index entspricht dabei dem Ursprungsdataset. Ein _index: 0 verweist somit auf das Zieldataset.
attributes_v4
Die attributes_v4 Collection beschreibt die Spalten der Datasets näher. Die wesentlichen Felder sind:
- _id
- name
- dataTypeId
- _lineageId
- _index
_id ist der Primärschlüssel von attributes_v4 und verweist auf datasets_v4. name enthält den sprechenden Namen der jeweiligen Spalte. _lineageId ist der Fremdschlüssel der lineages_v4 Collection. Der _index spiegelt wiederum die Reihenfolge der Spalten wider. dataTypeId ist der Fremdschlüssel der auf die dataTypes_v4 Collection verweist.
dataTypes_v4
Die dataTypes_v4 Collection beschreibt die Datentypen aus attributes_v4 bzw. die resultierenden Datentypen aus Column Expressions aus der transformation_v4 Collection. Die wesentlichen Felder sind:
- _id
- name
- _lineageId
- _index
_id entspricht dem Primärschlüssel der dataTypes_v4 Collection und verweist sowohl auf attributes_v4 als auch auf transformation_v4. name ist der sprechende Name des Datentyps. _lineageId entspricht dem Fremdschlüssel der lineages_v4 Collection. _index markiert die Reihenfolge der Datentypen in der diese auftreten.
Spline Web UI
Die Visualisierung der Data Lineage erfolgt über ein Web UI. Hierfür muss lediglich die entsprechende JAR Datei heruntergeladen werden. Mit
github:4f099bf6e96809b271edbcc63e029c1e
und unter Angabe der entsprechenden URL sowie des Datenbanknamens wird die Web UI gestartet. Anschließend steht die Spline Web UI unter localhost:8080 zur Verfügung.
Die nachfolgende Abbildung zeigt den höchstmöglichen Detailierungsgrad der Spline Web UI. Zu erkennen sind zum einen auf der linken Seite alle Operationen (z. B. Projektionen, Filter, OrderBy, Write) die der operations_v4 Collection entstammen. Zum anderen sieht man auf der rechten Seite detaillierte Informationen zu den einzelnen Operationen. In dem Beispiel wurde die Filter-Transformation markiert. Als Ergebnis liefert die Spline Web UI die Filterbedingung sowie das Eingangs- und Ausgangsdataset inkl. der jeweiligen Datentypen pro Spalte zurück.
Das nächste Beispiel zeigt die Darstellung einer Column Expression innerhalb einer Projektionsoperation in der Spline Web UI. Die Column Expression wird dabei wie oben beschrieben, in der transformations_v4 Collection gehalten. Auf der rechten Seite der Spline Web UI sind für diese Operation unter Transformations alle Projektionen und Column Expressions zu erkennen. Des Weiteren lassen sich die Column Expressions im Detail in einem separaten Fenster darstellen. Das hier gezeigte Beispiel fügt einem DataFrame eine weitere Spalte namens „Dummy“ hinzu und befüllt diese mit dem Ergebnis der Division der Spalten „Local_Authority_District“ und „Police_Force“.
github:d47e4c820a72164600daf76b00233f30
Des Weiteren sind wie im vorherigen Beispiel wiederum alle Eingangs- und Ausgangsdatasets inkl. der jeweiligen Datentypen pro Spalte zu erkennen.
Spline Data Lineage nicht für RDD API verfügbar
Da die RDD API nicht den Catalyst Optimizer, sondern eine andere Optimierungsroutine verwendet ist die Aufzeichnung der Data Lineage mittels Spline auf die DataFrame bzw. Dataset API beschränkt.
Ein interessanter Ansatz die Data Lineage unter Nutzung der RDD API dennoch aufzuzeichnen findet sich hier. Die Grundidee dabei ist die toDebugString() Funktion so zu reproduzieren, dass diese strukturierte Informationen über die RDD Lineage zurückliefert, die im Weiteren die Konstruktion der Data Lineage ermöglicht.
Ausblick
Nach Angaben von ABSA, den Entwicklern von Spline sind folgende Features geplant:
- Authentifizierung
- Autorisierung
- User Management
- Interoperabilität mit Cloudera Manager, Informatica und Apache Atlas
Fazit
Spline ist für sein relativ junges Alter schon sehr weit, hat aber definitiv noch Potenzial. Wie bereits erwähnt besteht derzeit keine Möglichkeit UDFs, Lambda Funktionen und die RDD API mit Spline zu tracken. Aus meiner Sicht ist dies für UDFs bzw. die RDD API auch nicht der entscheidende Punkt, da UDFs allgemein vermieden werden sollten und seit der Einführung der Dataset API die Bedeutung der RDD API zurückgeht. Umso wichtiger wäre dadurch allerdings die Unterstützung von Lambda Funktionen. Auch die im Ausblick genannten, noch fehlenden Features sind sicherlich notwendig um an etablierte kommerzielle Lösungen heranreichen zu können. Die einzig mir bekannte kommerzielle Lösung die die Data Lineage von Spark Jobs tracken kann ist der Cloudera Navigator. Dieser ist derzeit allerdings nach eigenen Angaben (Cloudera Navigator 6.1.1, Stand 26.03.19) nicht in der Lage Aggregationsfunktionen aufzuzeichnen. Mit Spline hingegen ist dies möglich. Das und die Tatsache, dass seit letztem Jahr Spark Structured Streaming unterstützt wird macht Spline damit schon heute zu einer echten Open Source Alternative für Data Lineage Tracking und Visualisierung.
Inwieweit ein Einsatz von Spline für eine konkrete Anwendung sinnvoll ist hängt am Ende davon ab welche konkreten regulatorischen Anforderungen vorliegen, welchen Komplexitätsgrad die Spark Jobs aufweisen und welcher Transparenzgrad notwendig ist.
Falls Du mehr zu Themen wie Data Lineage, Metadatenmanagement, Data Science oder Data Engineering erfahren willst, dann nimm doch einfach mit uns Kontakt auf. Übrigens bieten wir auch Apache Spark Trainings (s. Button unten) für euch an.
Setup für diesen Blogpost
- Spline 0.3.5
- Apache Spark 2.2.0
- Scala 2.11.12
- sbt-spark-package 0.2.6
- sbt-assembly 0.14.6
- MongoDB 3.4
Links
https://absaoss.github.io/spline/
https://www.bis.org/publ/bcbs239.pdf
https://www.bankingsupervision.europa.eu/ecb/pub/pdf/ssm.BCBS_239_report_201805.pdf
https://www.youtube.com/watch?v=T2vNPBCfA64
https://www.youtube.com/watch?v=953PcioD6tk&t
https://blog.octo.com/en/how-to-hack-spark-to-do-some-data-lineage/