Cas d’utilisation : Créer un pipeline pour traiter des données financières - Cloud

Guide de prise en main de Talend Cloud Pipeline Designer

Version
Cloud
Language
Français (France)
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Création et développement > Création de Pipelines
Déploiement > Déploiement > Exécution de Pipelines

Créez un pipeline qui va enrichir et filtrer les données financières hiérarchiques (informations relatives aux IBAN, aux comptes et aux transactions, etc.) puis agréger et compter le nombre total de transactions effectuées.

Procédure

  1. Cliquez sur ADD PIPELINE dans la page Pipelines. Votre nouveau pipeline s’ouvre.
  2. Dans la barre d'outils en haut, cliquez sur l'icône représentant un crayon bleu, à côté du nom par défaut du pipeline et donnez-lui un nom explicite.

    Exemple

    Process Financial Data
  3. Cliquez sur ADD SOURCE pour ouvrir le panneau vous permettant de sélectionner vos données source, ici le jeu de données financial data précédemment créé.
  4. Sélectionnez votre jeu de données et cliquez sur SELECT pour l’ajouter au pipeline.
    Votre jeu de données est ajouté en tant que source et vous pouvez prévisualiser vos données.
  5. Cliquez sur et ajoutez un processeur Python au pipeline. Ce processeur sera utilisé pour copier le code Python qui va traiter et enrichir les données d’entrée.
  6. Donnez un nom significatif au processeur.

    Exemple

    enrich with IBAN validation
  7. Sélectionnez Map comme type de mapping. Ainsi, le processeur Python émet automatiquement un enregistrement de sortie pour chaque entrée.

    Pour plus d’informations concernant les différences entre Map et Flatmap, consultez la documentation du processeur Python.

  8. Dans la zone Python code, saisissez le code suivant.
    import string;
    
    ## IBAN Validation function;
    ALPHA = {c: str(ord(c) % 55) for c in string.ascii_uppercase};
    def reverse_iban(iban): return iban[4:] + iban[:4];
    def check_iban(iban): return int(''.join(ALPHA.get(c, c) for c in reverse_iban(iban))) % 97 == 1;
    
    output = input;
    transaction = input['transaction'];
    this_account = transaction["this_account"];
    account_routing = this_account["account_routing"];
    account_iban = account_routing["address"].replace(" ", "");
    output['iban_valid'] = check_iban(account_iban)
    Ce code vous permet :
    • de vérifier que la syntaxe de l’IBAN est valide

    • d’ajouter un nouveau champ nommé iban_valid pour les enregistrements existants avec la valeur true ou false selon le résultat de la vérification de l’IBAN

  9. Cliquez sur SAVE afin de sauvegarder votre configuration.
    Les données d’entrée sont traitées en conséquence et vous pouvez prévisualiser les modifications. Le champ iban_valid est ajouté à tous les enregistrements.
  10. Cliquez sur et ajoutez un processeur Filter au pipeline. Ce processeur sera utilisé pour isoler les transactions acceptées (indiquées AC. Les transactions déclinées sont indiquées DC).
  11. Donnez un nom significatif au processeur.

    Exemple

    filter on accepted transactions
  12. Dans la zone FILTERS :
    1. Sélectionnez .transaction.details.type dans la liste Input, puisque vous souhaitez filtrer les clients en vous basant sur cette valeur.
    2. Sélectionnez NONE dans la liste Optionnaly select a function to apply car vous ne souhaitez pas appliquer de fonction tout en filtrant les enregistrements.
    3. Sélectionnez = = dans la liste Operator et saisissez AC dans la liste Value, car vous souhaitez filtrer sur les transactions acceptées.

      Vous pouvez utiliser la syntaxe d’avpath dans cette zone. Pour plus d’informations, consultez What is avpath and why use it?.

    4. Cliquez sur SAVE pour sauvegarder votre configuration.
    Les données d’entrée sont traitées en conséquence et vous pouvez prévisualiser les modifications. Seuls les enregistrements contenant des transactions acceptées (AC) sont conservés dans la sortie.
  13. Cliquez sur et ajoutez un processeur Aggregate au pipeline. Ce processeur sera utilisé pour regrouper les transactions et calculer le nombre total de transactions.
  14. Donnez un nom significatif au processeur.

    Exemple

    count transaction amounts with valid IBAN
  15. Dans la zone Group by, spécifiez les champs à utiliser pour votre ensemble d'agrégation :
    1. Sélectionnez .transaction.details.description dans la liste Field path.
    2. Ajoutez un élément et sélectionnez .iban_valid dans la liste.
  16. Dans la zone Operations, ajoutez une opération d'agrégation :
    1. Sélectionnez .transaction.details.value.amount dans la liste Field path et Sum dans la liste Operation.
    2. Nommez le champ généré, total_amount par exemple.
    3. Cliquez sur SAVE pour sauvegarder votre configuration.
    Les données d'entrée sont traitées en fonction. Vous pouvez voir un aperçu des données calculées après les opérations de filtre et de regroupement. Il y a 252 transactions ayant un IBAN valide et 81 transactions ayant un IBAN non valide.
  17. Cliquez sur l'élément ADD DESTINATION du pipeline pour ouvrir le panneau et sélectionner votre jeu de données pour les données de sortie : le jeu de données financial data précédemment créé. Vous pouvez utiliser le même jeu de données pour l'entrée et la sortie car les jeux de données personnalisés se comportent différemment dans la Source et dans la Destination. Lorsqu'ils sont utilisés dans une Destination, les donnés sont ignorées.
  18. Donnez un nom significatif à la Destination.

    Exemple

    processed data (out)
  19. Cliquez sur SAVE pour sauvegarder votre configuration.