Anwendungsfall: Erstellen einer Pipeline zur Verarbeitung von Finanzdaten - Cloud

Einführungshandbuch für Talend Cloud Pipeline Designer

Version
Cloud
Language
Deutsch (Deutschland)
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Design und Entwicklung > Konzeption von Pipelines
Implementierung > Implementierung > Pipeline-Ausführung

Vorgehensweise zur Erstellung einer Pipeline, die hierarchische Finanzdaten anreichert und filtert (IBAN, Konto- und Transaktionsdaten usw.) und anschließend den Gesamtbetrag aller durchgeführten Transaktionen zählt.

Prozedur

  1. Klicken Sie auf der Seite Pipelines auf Add pipeline (Pipeline hinzufügen). Ihre neue Pipeline wird geöffnet.
  2. Klicken Sie in der oberen Symbolleiste auf das Stiftsymbol neben dem Pipeline-Standardnamen und geben Sie einen aussagekräftigen Namen für Ihre Pipeline ein.

    task_stepxmp

    Process Financial Data (Finanzdaten verarbeiten)
  3. Klicken Sie auf ADD SOURCE (QUELLE HINZUFÜGEN), um einen Fensterbereich zu öffnen, in dem Sie Ihre Quelldaten auswählen können. Hier der zuvor erstellte Datensatz financial data (Finanzdaten).
  4. Wählen Sie den Datensatz aus und klicken Sie auf SELECT (AUSWÄHLEN), um ihn zur Pipeline hinzuzufügen.
    Der Datensatz wird als Quelle hinzugefügt, und Sie können bereits eine Vorschau Ihrer Daten anzeigen.
  5. Klicken Sie auf und fügen Sie einen Python-Prozessor zur Pipeline hinzu. Dieser Prozessor ermöglicht die Kopie des Python-Codes, der die Eingabedaten verarbeitet und anreichert.
  6. Geben Sie einen sinnvollen Namen für den Prozessor an.

    task_stepxmp

    enrich with IBAN validation (Mit IBAN-Validierung anreichern)
  7. Wählen Sie Map als Map-Typ aus. Dadurch gibt der Python-Prozessor für jede Eingabe automatisch einen Ausgabedatensatz aus.

    Weitere Informationen zu den Unterschieden zwischen MAP und FLATMAP finden Sie in der Dokumentation des Python-Prozessors.

  8. Geben Sie im Bereich Phyton code (Python-Code) folgenden Code ein:
    import string;
    
    ## IBAN Validation function;
    ALPHA = {c: str(ord(c) % 55) for c in string.ascii_uppercase};
    def reverse_iban(iban): return iban[4:] + iban[:4];
    def check_iban(iban): return int(''.join(ALPHA.get(c, c) for c in reverse_iban(iban))) % 97 == 1;
    
    output = input;
    transaction = input['transaction'];
    this_account = transaction["this_account"];
    account_routing = this_account["account_routing"];
    account_iban = account_routing["address"].replace(" ", "");
    output['iban_valid'] = check_iban(account_iban)
    Dieser Code ermöglicht Ihnen Folgendes:
    • Prüfen, ob die IBAN-Syntax gültig ist.

    • Hinzufügen eines neuen Felds mit der Bezeichnung iban_valid zu den vorhandenen Datensätzen mit dem Wert True oder False, je nach Ergebnis der IBAN-Prüfung.

  9. Klicken Sie auf SAVE (SPEICHERN), um die Konfiguration zu speichern.
    Die Eingabedaten werden entsprechend verarbeitet, und Sie können eine Vorschau der Änderungen anzeigen. Das neue Feld iban_valid wird in allen Datensätzen hinzugefügt.
  10. Klicken Sie auf und fügen Sie einen Filter-Prozessor zur Pipeline hinzu. Dieser Prozessor ermöglicht die Isolierung angenommener Transaktionen (gekennzeichnet durch das Tag AC, im Gegensatz zu abgelehnten Transaktionen mit dem Tag DC).
  11. Geben Sie einen sinnvollen Namen für den Prozessor an.

    task_stepxmp

    filter on accepted transactions (Nach angenommenen Transaktionen filtern)
  12. Führen Sie im Bereich FILTERS (FILTER) Folgendes durch:
    1. Wählen Sie .transaction.details.type (Typ der Transaktionsdetails) in der Liste Input (Eingabe) aus, da Sie die Kunden basierend auf diesem Wert filtern möchten.
    2. Wählen Sie NONE (KEINE) in der Liste Optionally select a function to apply (Optional eine anzuwendende Funktion auswählen) aus, da bei der Filterung der Datensätze keine Funktion angewendet werden soll.
    3. Wählen Sie = = in der Liste Operator aus und geben Sie AC in der Liste Value (Wert) ein, da Sie nach den angenommenen Transaktionen filtern möchten.

      Sie können in diesem Bereich die avpath-Syntax verwenden. Weitere Informationen hierzu finden Sie unter Beschreibung und Funktion von avpath.

    4. Klicken Sie auf SAVE (SPEICHERN), um die Konfiguration zu speichern.
    Die Eingabedaten werden entsprechend verarbeitet, und Sie können eine Vorschau der Änderungen anzeigen. Nur Datensätze mit angenommenen Transaktionen (AC) werden in der Ausgabe beibehalten.
  13. Klicken Sie auf und fügen Sie einen Aggregationsprozessor (Aggregate) zur Pipeline hinzu. Dieser Prozessor ermöglicht die Gruppierung von Transaktionen und die Berechnung des Gesamtbetrags dieser Transaktionen.
  14. Geben Sie einen sinnvollen Namen für den Prozessor an.

    task_stepxmp

    count transaction amounts with valid IBAN (Transaktionsbeträge mit gültiger IBAN zählen)
  15. Geben Sie im Bereich Group by (Gruppieren nach) die Felder an, die für die Aggregationsgruppe verwendet werden sollen:
    1. Wählen Sie .transaction.details.description in der Liste Field path (Feldpfad) aus.
    2. Fügen Sie ein neues Element hinzu und wählen Sie in der Liste den Eintrag .iban_valid aus.
  16. Fügen Sie im Bereich Operations (Operationen) eine Aggregationsfunktion hinzu:
    1. Wählen Sie .transaction.details.value.amount in der Liste Field path (Feldpfad) und Sum (Summe) in der Liste Operation aus.
    2. Legen Sie für das generierte Feld einen Namen fest, beispielsweise total_amount (Gesamtbetrag).
    3. Klicken Sie auf SAVE (SPEICHERN), um die Konfiguration zu speichern.
    Die Eingabedaten werden entsprechend verarbeitet, und Sie können nach der Filter- und Gruppierungsoperation eine Vorschau der berechneten Daten anzeigen. Mit einer gültigen IBAN-Nummer können 252, mit einen ungültigen IBAN-Nummer 81 Transaktionen durchgeführt werden.
  17. Klicken Sie auf das Element ADD DESTINATION (ZIEL HINZUFÜGEN) der Pipeline, um das Fenster zur Auswahl des Datensatzes für die Ausgabedaten zu öffnen: den zuvor von Ihnen erstellten Datensatz financial data (Finanzdaten). Sie können denselben Datensatz für die Ein- und Ausgabe verwenden, da sich die Testdatensätze in Quelle und Ziel unterschiedlich verhalten. Bei einer Verwendung in einem Ziel werden die Daten außerdem ignoriert.
  18. Legen Sie für das Ziel einen aussagekräftigen Namen fest.

    task_stepxmp

    processed data (out) (Verarbeitete Daten (Aus))
  19. Klicken Sie auf Save (Speichern), um die Konfiguration zu speichern.