Use Case: Creating a pipeline to process financial data - Cloud

Talend Cloud Pipeline Designer Getting Started Guide

Version
Cloud
Language
English (United States)
Product
Talend Cloud
Module
Talend Pipeline Designer
Content
Deployment > Deploying > Executing Pipelines
Design and Development > Designing Pipelines

How to create a pipeline that will enrich and filter hierarchical financial data (IBAN, account and transaction information, etc.) then aggregate and count the total amount of performed transactions.

Procedure

  1. Click ADD PIPELINE on the Pipelines page. Your new pipeline opens.
  2. On the top toolbar, click the pencil icon next to the pipeline default name and give a meaningful name to your pipeline.

    Example

    Process Financial Data
  3. Click ADD SOURCE to open the panel allowing you to select your source data, here the financial data dataset created previously.
  4. Select your dataset and click SELECT in order to add it to the pipeline.
    Your dataset is added as a source and you can already preview your data.
  5. Click and add a Python processor to the pipeline. This processor will be used to copy Python code that will process and enrich input data.
  6. Give a meaningful name to the processor.

    Example

    enrich with IBAN validation
  7. Select Map as Map type. This way, the Python processor will automatically emit an output record for each input.

    For more information on the differences between Map and Flatmap, see the Python processor documentation.

  8. In the Python code area, type in the following code.
    import string;
    
    ## IBAN Validation function;
    ALPHA = {c: str(ord(c) % 55) for c in string.ascii_uppercase};
    def reverse_iban(iban): return iban[4:] + iban[:4];
    def check_iban(iban): return int(''.join(ALPHA.get(c, c) for c in reverse_iban(iban))) % 97 == 1;
    
    output = input;
    transaction = input['transaction'];
    this_account = transaction["this_account"];
    account_routing = this_account["account_routing"];
    account_iban = account_routing["address"].replace(" ", "");
    output['iban_valid'] = check_iban(account_iban)
    This code allows you to:
    • check that the IBAN syntax is valid

    • add a new field named iban_valid to the existing records with values true or false depending on the result of the IBAN checking

  9. Click SAVE to save your configuration.
    Input data is processed accordingly and you can preview the modifications. The new iban_valid field is added to all records.
  10. Click and add a Filter processor to the pipeline. This processor will be used to isolate accepted transactions (tagged with AC, compared to DC, declined transactions).
  11. Give a meaningful name to the processor.

    Example

    filter on accepted transactions
  12. In the FILTERS area:
    1. Select .transaction.details.type in the Input list, as you want to filter customers based on this value.
    2. Select NONE in the Optionnaly select a function to apply list, as you do not want to apply a function while filtering records.
    3. Select = = in the Operator list and type in AC in the Value list as you want to filter on transactions that were accepted.

      You can use the avpath syntax in this area, for more information see What is avpath and why use it?.

    4. Click SAVE to save your configuration.
    Input data is processed accordingly and you can preview the modifications. Only records containing accepted transactions (AC) are kept in the output.
  13. Click and add an Aggregate processor to the pipeline. This processor will be used to group transactions and calculate the total amount of these transactions.
  14. Give a meaningful name to the processor.

    Example

    count transaction amounts with valid IBAN
  15. In the Group by area, specify the fields you want to use for your aggregation set:
    1. Select .transaction.details.description in the Field path list.
    2. Add a new element and select .iban_valid in the list.
  16. In the Operations area, add an aggregate operation:
    1. Select .transaction.details.value.amount in the Field path list and Sum in the Operation list.
    2. Name the generated field, total_amount for example.
    3. Click SAVE to save your configuration.
    Input data is processed accordingly and you can preview the calculated data after the filtering and grouping operation. There are 252 transactions with a valid IBAN and 81 transactions with a non-valid IBAN.
  17. Click the ADD DESTINATION item on the pipeline to open the panel to select the dataset for your output data: the financial data dataset you created earlier. You can use the same dataset for input and output because the test datasets behave differently in source and Destination, and when used in a Destination the data is ignored.
  18. Give a meaningful name to the Destination.

    Example

    processed data (out)
  19. Click SAVE to save your configuration.