Écrire des données dans un data warehouse Cloud (Snowflake) - Cloud

Guide des processeurs de Talend Cloud Pipeline Designer

Version
Cloud
Language
Français
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Création et développement > Création de Pipelines
Last publication date
2024-02-27

Avant de commencer

  • Vous avez récupéré le fichier financial_transactions.avro et vous l'avez chargé dans votre bucket Amazon S3.

  • Vous avez reproduit et dupliqué le pipeline décrit dans Écrire des données dans un stockage Cloud (S3) et vous allez travailler sur ce pipeline dupliqué.
  • Vous avez créé un Moteur distant Gen2 et son profil d'exécution depuis Talend Management Console.

    Le Moteur Cloud pour le design et son profil d'exécution correspondant sont embarqués par défaut dans Talend Management Console pour permettre aux utilisateurs et utilisatrices de prendre l'application en main rapidement, mais il est recommandé d'installer le Moteur distant Gen2 sécurisé pour le traitement avancé des données.

Procédure

  1. Dans la page d'accueil de Talend Cloud Pipeline Designer, cliquez sur Connections (Connexions) > Add connection (Ajouter une connexion).
  2. Dans le panneau qui s'ouvre, sélectionnez Snowflake, puis cliquez sur Next (Suivant).
  3. Sélectionnez votre Moteur distant Gen2 dans la liste Engine.
  4. Sélectionnez Database (Base de données) dans la liste Connection type (Type de connexion) et Snowflake dans la liste Database (Base de données).
  5. Saisissez l'URL JDBC de votre base de données, ainsi que vos identifiants.
  6. Vérifiez votre connexion si nécessaire et cliquez sur Next (Suivant).
  7. Donnez un nom à votre connexion, par exemple Snowflake connection et cliquez sur Validate (Valider).
  8. Cliquez sur Add dataset (Ajouter un jeu de données) et saisissez les informations de connexion à votre table Snowflake :
    1. Donnez un nom d’affichage à votre jeu de données, financial data on Snowflake par exemple.
    2. Dans la liste Type, sélectionnez Table or view name (Nom de la table ou de la vue).
    3. Dans la liste Table name (Nom de la table), sélectionnez ou saisissez le nom de votre table Snowflake.
    4. Dans le champ Column selection (Sélection des colonnes), sélectionnez les colonnes spécifiques à récupérer, ou cliquez sur Select all (Sélectionner tout) pour récupérer toutes les colonnes existantes. Dans cet exemple, deux champs sont sélectionnés : transaction_amount et transaction_code.
  9. Cliquez sur View sample (Voir l'échantillon) pour vérifier que vos données sont valides et peuvent être prévisualisées.
    Aperçu de l'échantillon de données Snowflake.
  10. Cliquez sur Validate (Valider) pour sauvegarder votre jeu de données. Dans la page Datasets (Jeux de données), le nouveau jeu de données est ajouté à la liste et peut être utilisé comme jeu de données de destination dans votre pipeline.
    Un pipeline avec une source S3, un processeur Python 3, un processeur Filter, un processeur Aggregate et une destination Snowflake.

    L'icône de mapping de données (Data Mapping) près de la destination est désactivée pour le moment, car le schéma d'entrée n'est pas plat.

  11. Cliquez sur l'icône + et ajoutez un processeur Field selector (Sélecteur de champs) après le processeur Aggregate (Agrégation) afin de sélectionner les champs à conserver et d'aplatir le schéma. Le panneau de configuration s'ouvre.
  12. Dans le mode de sélection Simple, cliquez sur Edit (Modifier) pour ouvrir la fenêtre Select fields (Sélectionner des champs) :
    1. Sélectionnez les champs à conserver et aplatir : description et total_amount.
    2. Cliquez sur Edit (Modifier) pour fermer la fenêtre.
    3. Cliquez sur Save (Sauvegarder) pour enregistrer la configuration et prévisualiser les champs aplatis.
    Aperçu du processeur Field selector après réorganisation des enregistrements financiers.
  13. À présent que le schéma d'entrée est plat, l'icône Add a Mapper (Ajouter un mappeur) est activée et vous permet d'ajouter un processeur Data mapping (Mapping de données) au pipeline. Le panneau de configuration s'ouvre.
    Icône permettant d'ajouter un processeur Data mapping (Mapping de données) dans l'espace de travail.
  14. Dans l'onglet Configuration, cliquez sur Open mapping (Ouvrir le mapping) pour ouvrir le processeur Data mapping.
    Certains champs d'entrée sont mappés automatiquement à un champ de sortie, en fonction de leur nom. Vous pouvez examiner ces mappings et commencer à mapper le reste de votre schéma :
    1. Mappez le champ d'entrée total_amount au champ de sortie transaction_amount.
    2. Mappez le champ d'entrée description au champ de sortie transaction_code.
    3. Cliquez sur Validate pour confirmer vos mappings.
    Page du processeur Data Mapper avec aperçu des enregistrements mappés.

    Le contenu du champ d'entrée total_amount sera ajouté au contenu du champ de sortie transaction_amount, en fonction de l'opération définie pour votre base de données (insertion, mise à jour, upsert ou suppression).

    Le contenu du champ d'entrée description sera ajouté au contenu du champ de sortie transaction_code.

    Vous pouvez examiner les résultats du mapping dans la zone Data preview (Aperçu des données).
    Même pipeline que précédemment, avec un processeur Data mapper ajouté avant la destination Snowflake.
  15. Avant d'exécuter ce pipeline, sélectionnez Upsert dans l'onglet de configuration du jeu de données Snowflake pour mettre à jour et insérer les nouvelles données dans la table Snowflake. Définissez le champ transaction_amount comme clé de l'opération.
    Panneau de configuration de la destination Snowflake affichant l'action Upsert sélectionnée.
  16. Dans la barre d'outils en haut de Talend Cloud Pipeline Designer, cliquez sur le bouton Run (Exécuter) pour ouvrir le panneau vous permettant de sélectionner votre profil d'exécution.
  17. Sélectionnez dans la liste votre profil d'exécution (pour plus d'informations, consultez Profils d'exécution), puis cliquez sur Run (Exécuter) pour exécuter votre pipeline.

Résultats

Une fois que votre pipeline est exécuté, les données mises à jour sont visibles dans la table de la base de données Snowflake.