Cas d’utilisation : Créer un pipeline pour traiter des données financières - Cloud

Guide de prise en main de Talend Cloud Pipeline Designer

Version
Cloud
Language
Français
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Création et développement > Création de Pipelines
Déploiement > Déploiement > Exécution de Pipelines
Last publication date
2024-03-06

Créez un pipeline qui va enrichir et filtrer les données financières hiérarchiques (informations relatives aux IBAN, aux comptes et aux transactions, etc.) puis agréger et compter le nombre total de transactions effectuées.

Pipeline terminé dans l'espace de travail.

Procédure

  1. Cliquez sur Add pipeline (Ajouter un pipeline) dans la page Pipelines. Votre nouveau pipeline s’ouvre.
  2. Dans la barre d'outils en haut, cliquez sur l'icône représentant un crayon bleu, à côté du nom par défaut du pipeline et donnez-lui un nom explicite.

    Exemple

    Process financial data
  3. Cliquez sur ADD SOURCE pour ouvrir le panneau vous permettant de sélectionner vos données source, ici le jeu de données financial data précédemment créé.
  4. Sélectionnez votre jeu de données et cliquez sur Select (Sélectionner) pour l'ajouter au pipeline.
    Votre jeu de données est ajouté en tant que source et vous pouvez prévisualiser vos données JSON.
  5. Cliquez sur Add (Ajouter) et ajoutez un processeur Python 3 au pipeline. Ce processeur sera utilisé pour copier le code Python qui va traiter et enrichir les données d’entrée.
  6. Donnez un nom significatif au processeur.

    Exemple

    enrich with IBAN validation
  7. Dans la zone Python code, saisissez le code suivant.
    import string;
    
    ## IBAN Validation function;
    ALPHA = {c: str(ord(c) % 55) for c in string.ascii_uppercase};
    def reverse_iban(iban): return iban[4:] + iban[:4];
    def check_iban(iban): return int(''.join(ALPHA.get(c, c) for c in reverse_iban(iban))) % 97 == 1;
    
    output = input;
    transaction = input['transaction'];
    this_account = transaction["this_account"];
    account_routing = this_account["account_routing"];
    account_iban = account_routing["address"].replace(" ", "");
    output['iban_valid'] = check_iban(account_iban)
    Ce code vous permet :
    • de vérifier que la syntaxe de l’IBAN est valide
    • d’ajouter un nouveau champ nommé iban_valid pour les enregistrements existants avec la valeur true ou false selon le résultat de la vérification de l’IBAN
  8. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
    Les données d’entrée sont traitées en conséquence et vous pouvez prévisualiser les modifications. Le champ iban_valid est ajouté à tous les enregistrements.
    Champ iban_valid dans l'aperçu de la sortie.
  9. Cliquez sur Add (Ajouter) et ajoutez un processeur Filter (Filtre) au pipeline. Ce processeur sera utilisé pour isoler les transactions acceptées (indiquées AC. Les transactions déclinées sont indiquées DC).
  10. Donnez un nom significatif au processeur.

    Exemple

    filter on accepted transactions
  11. Dans la zone Filters (Filtres) :
    1. Sélectionnez .transaction.details.type dans la liste Input, puisque vous souhaitez filtrer les clients en vous basant sur cette valeur.
    2. Sélectionnez None dans la liste Optionally select a function to apply, car vous ne souhaitez pas appliquer de fonction lors du filtre des enregistrements.
    3. Sélectionnez = = dans la liste Operator et saisissez AC dans la liste Value, car vous souhaitez filtrer sur les transactions acceptées.

      Vous pouvez utiliser la syntaxe d’avpath dans cette zone. Pour plus d’informations, consultez What is avpath and why use it?.

    4. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
    Les données d’entrée sont traitées en conséquence et vous pouvez prévisualiser les modifications. Seuls les enregistrements contenant des transactions acceptées (AC) sont conservés dans la sortie.
    Aperçu de la sortie des transactions contenant la valeur AC pour l'attribut type.
  12. Cliquez sur Add (Ajouter) et ajoutez un processeur Aggregate (Agrégation) au pipeline. Ce processeur sera utilisé pour regrouper les transactions et calculer le nombre total de transactions.
  13. Donnez un nom significatif au processeur.

    Exemple

    count transaction amounts with valid IBAN
  14. Dans la zone Group by, spécifiez les champs à utiliser pour votre ensemble d'agrégation :
    1. Sélectionnez .transaction.details.description dans la liste Field path.
    2. Ajoutez un élément et sélectionnez .iban_valid dans la liste.
  15. Dans la zone Operations, ajoutez une opération d'agrégation :
    1. Sélectionnez .transaction.details.value.amount dans la liste Field path et Sum dans la liste Operation.
    2. Nommez le champ généré, total_amount par exemple.
    3. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.
    Les données d'entrée sont traitées en fonction. Vous pouvez voir un aperçu des données calculées après les opérations de filtre et de regroupement. Il y a 252 transactions ayant un IBAN valide et 81 transactions ayant un IBAN non valide.
    Somme calculée des transactions avec des valeurs d'IBAN valides et invalides dans l'aperçu de la sortie.
  16. Cliquez sur l'élément ADD DESTINATION du pipeline pour ouvrir le panneau et sélectionner votre jeu de données pour les données de sortie : le jeu de données financial data précédemment créé. Vous pouvez utiliser le même jeu de données pour l'entrée et la sortie car les jeux de données de test se comportent différemment dans la source et dans la destination. Lorsqu'ils sont utilisés dans une destination, les données sont ignorées.
  17. Donnez un nom significatif à la destination.

    Exemple

    processed data
  18. Cliquez sur Save (Sauvegarder) pour sauvegarder votre configuration.