Schreiben von Daten in ein cloudbasiertes Data Warehouse (Snowflake) - Cloud

Prozessorhandbuch für Talend Cloud Pipeline Designer

Version
Cloud
Language
Deutsch
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Design und Entwicklung > Konzeption von Pipelines
Last publication date
2024-02-29

Vorbereitungen

  • Sie haben die Datei financial_transactions.avro heruntergeladen und sie in Ihr Amazon S3-Bucket hochgeladen.

  • Sie haben die unter Schreiben von Daten in einen Cloud-Speicher (S3) beschriebene Pipeline reproduziert und dupliziert und arbeiten jetzt mit dieser duplizierten Pipeline.
  • Sie haben eine Remote Engine Gen2 sowie das zugehörige Ausführungsprofil über Talend Management Console erstellt.

    In Talend Management Console sind standardmäßig die Cloud Engine for Design und ein entsprechendes Ausführungsprofil integriert. Dadurch können die Benutzer in kürzester Zeit ihre Arbeit mit der Anwendung aufnehmen. Es wird jedoch empfohlen, die sichere Remote Engine Gen2 zu installieren, die eine erweiterte Datenverarbeitung ermöglicht.

Prozedur

  1. Klicken Sie auf der Homepage von Talend Cloud Pipeline Designer auf Connections (Verbindungen) > Add connection (Verbindung hinzufügen).
  2. Wählen Sie in dem Fenster, das geöffnet wird, die Option Snowflake aus und klicken Sie auf Next (Weiter).
  3. Wählen Sie in der Liste Engine Ihre Remote Engine Gen2 aus.
  4. Wählen Sie in der Liste Connection type (Verbindungstyp) den Eintrag Database (Datenbank) und in der Liste Database (Datenbank) den Eintrag Snowflake aus.
  5. Geben Sie die JDBC-URL Ihrer Datenbank und die entsprechenden Anmeldedaten ein.
  6. Prüfen Sie bei Bedarf die Verbindung und klicken Sie dann auf Next (Weiter).
  7. Geben Sie einen Namen für die Verbindung an, z. B. Snowflake connection (Snowflake-Verbindung), und klicken Sie auf Validate (Validieren).
  8. Klicken Sie auf Add dataset (Datensatz hinzufügen) und geben Sie die Verbindungsinformationen für Ihre Snowflake-Tabelle ein:
    1. Geben Sie einen Anzeigenamen für den Datensatz ein, z. B. financial data on Snowflake (Finanzdaten in Snowflake).
    2. Wählen Sie in der Liste Type (Typ) den Eintrag Table or view name (Tabellen- oder Ansichtsname) aus.
    3. Wählen Sie in der Liste Table name (Tabellenname) den Namen der Snowflake-Tabelle aus bzw. geben Sie ihn ein.
    4. Wählen Sie im Feld Column selection (Tabellenauswahl) die Tabellenspalten aus, die abgerufen werden sollen, oder klicken Sie auf Select all (Alle auswählen), um alle vorhandenen Spalten abzurufen. In diesem Beispiel wurden 2 Felder ausgewählt: transaction_amount (Transaktion_Betrag) und transaction_code (Transaktion_Code).
  9. Klicken Sie auf View sample (Sample anzeigen), um zu prüfen, ob die Daten gültig sind und in der Vorschau angezeigt werden können.
    Vorschau des Datenbeispiels „Snowflake“.
  10. Klicken Sie auf Validate (Validieren), um den Datensatz zu speichern. Der neue Datensatz wird der Liste auf der Seite Datasets (Datensätze) hinzugefügt und kann jetzt in Ihrer Pipeline als Zieldatensatz verwendet werden.
    Eine Pipeline mit einer S3-Quelle, einem Prozessor vom Typ „Python 3“, einem Prozessor vom Typ „Filter“, einem Prozessor vom Typ „Aggregate (Aggregieren)“ und einem Snowflake-Ziel.

    Das Symbol „Datenmapping“ neben dem Ziel wird vorübergehend deaktiviert, da es sich bei dem Eingabeschema nicht um ein flaches Schema handelt.

  11. Klicken Sie auf das Symbol Plus und fügen Sie nach dem Prozessor vom Typ Aggregate (Aggregieren) einen Prozessor vom Typ Field selector (Feldauswahl) hinzu, um die Felder auszuwählen, die Sie beibehalten möchten, und um das Schema abzuflachen. Daraufhin wird das Konfigurationsfenster geöffnet.
  12. Klicken Sie im Auswahlmodus Simple (Einfach) auf Edit (Bearbeiten), um das Fenster Select fields (Felder auswählen) zu öffnen:
    1. Wählen Sie die Felder aus, die beibehalten und abgeflacht werden sollen: description (Beschreibung) und total_amount (Gesamtbetrag).
    2. Klicken Sie auf Edit (Bearbeiten), um das Fenster wieder zu schließen.
    3. Klicken Sie auf Save (Speichern), um die Konfiguration zu speichern und eine Vorschau der abgeflachten Felder anzuzeigen.
    Vorschau des Prozessors vom Typ „Field selector (Feldauswahl)“ nach der Umorganisation der Finanzdatensätze.
  13. Sie verfügen jetzt über ein flaches Eingabeschema. Somit wird das Symbol Add a Mapper (Mapper hinzufügen) aktiviert, d. h. Sie können einen Prozessor vom Typ Data mapping (Datenmapping) zur Pipeline hinzufügen. Daraufhin wird das Konfigurationsfenster geöffnet.
    Symbol zum Hinzufügen eines Prozessors vom Typ Data mapping (Datenmapping) auf der Arbeitsfläche.
  14. Klicken Sie auf der Registerkarte Configuration (Konfiguration) auf Open mapping (Mapping öffnen), um den Datenmapping-Prozessor zu öffnen.
    Einige der Eingabefelder werden automatisch einem auf ihrem Namen basierenden Ausgabefeld zugeordnet. Sie können diese Felder prüfen und dann das Mapping für das restliche Schema durchführen:
    1. Ordnen Sie das Eingabefeld total_amount (Gesamtbetrag) dem Ausgabefeld transaction_amount (Transaktion_Betrag) zu.
    2. Ordnen Sie das Eingabefeld description (Beschreibung) dem Ausgabefeld transaction_code (Transaktion_Code) zu.
    3. Klicken Sie auf Validate (Validieren), um das Mapping zu bestätigen.
    Datenmapping-Seite mit einer Vorschau der zugeordneten Datensätze.

    Der Inhalt des Eingabefelds total_amount (Gesamtbetrag) wird gemäß der für die Datenbank ausgewählten Operation (Einfügen, Aktualisieren, Upsert, Löschen) zum Inhalt des Ausgabefelds transaction_amount (Transaktion_Betrag) hinzugefügt.

    Der Inhalt des Eingabefelds description (Beschreibung) wird zum Inhalt des Ausgabefelds transaction_code (Transaktion_Code) hinzugefügt.

    Sie können das Ergebnis des Mappings im Bereich Data preview (Datenvorschau) überprüfen.
    Die gleiche Pipeline wie zuvor, mit einem vor dem Snowflake-Ziel hinzugefügten Prozessor vom Typ „Data mapper (Datenmapping)“.
  15. Wählen Sie vor der Ausführung der Pipeline Upsert auf der Konfigurationsregisterkarte des Snowflake-Datensatzes aus, um die Snowflake-Tabelle zu aktualisieren und die neuen Daten einzufügen. Legen Sie das Feld transaction_amount (Transaktion_Betrag) als Operationsschlüssel fest.
    Der Fensterbereich mit der Snowflake-Zielkonfiguration zeigt die ausgewählte Aktion „Upsert“.
  16. Klicken Sie in der oberen Symbolleiste von Talend Cloud Pipeline Designer auf die Schaltfläche Run (Ausführen), um das Fenster zur Auswahl des Ausführungsprofils zu öffnen.
  17. Wählen Sie Ihr Ausführungsprofil in der Liste aus (weitere Informationen finden Sie unter „Ausführungsprofile“) und klicken Sie dann auf Run (Ausführen), um die Pipeline auszuführen.

Ergebnisse

Sobald die Pipeline ausgeführt wird, werden die aktualisierten Daten in der Snowflake-Datenbanktabelle angezeigt.