How to automatically enable parallelization of data flows for better performance

author
Talend Documentation Team
EnrichVersion
6.4
6.3
6.2
6.1
6.0
EnrichProdName
Talend Big Data Platform
task
Design and Development > Designing Jobs
EnrichPlatform
Talend Studio

How to automatically enable parallelization of data flows for better performance

In Talend Studio, parallelization of data flows means to partition an input data flow of a subJob into parallel processes and to simultaneously execute them, so as to gain better performance. This feature simplifies the implementation of parallelization.
Environment

Note that the feature explained in this section is available only on the condition that you have subscribed to one of the Talend Platform solutions or Big Data solutions V5.3.1 or later.

Resolution

When you have to develop a Job to process very huge data using Talend Studio, you can enable or disable the parallelization by one single click, and then the Studio automates the implementation across a given Job.

The implementation of the parallelization requires four key steps as explained as follows:

  1. Partitioning ( ): In this step, the Studio splits the input records into a given number of threads.
  2. Collecting ( ): In this step, the Studio collects the split threads and sends them to a given component for processing.
  3. Departitioning ( ): In this step, the Studio groups the outputs of the parallel executions of the split threads.
  4. Recollecting ( ): In this step, the Studio captures the grouped execution results and outputs them to a given component.

Once the automatic implementation is done, you can alter the default configuration by clicking the corresponding connection between components. The following scenario presents more configuration details using a sample Job:

Scenario: sorting the customer data of large size in parallel

The Job in this scenario puts in order 20 million customer records by running parallelized executions.

Linking the components
  1. In the Integration perspective of your Studio, create an empty Job from the Job Designs node in the Repository tree view. For further information, see How to create a Job .
  2. Drop the following components onto the workspace: tFileInputDelimited , tSortRow and tFileOutputDelimited .

    The tFileInputDelimited component (labeled test file in this example) reads the 20 million customer records from a .txt file generated by tRowGenerator .

  3. Connect the components using the Row > Main link.
Enabling parallelization
  • Right-click the start component of the Job, tFileInputDelimited in the scenario, and from the contextual menu, select Set parallelization .

Then the parallelization is automatically implemented.

Splitting the input data flow

You need firstly configure the input flow:

  1. Double-click tFileInputDelimited to open its Component view.
  2. In the File name/Stream field, browse to, or enter the path to the file storing the customer records to be read.
  3. Click the button to open the schema editor where you need to create the schema to reflect the structure of the customer data.
  4. Click the button five times to add five rows and rename them as follows: FirstName, LastName, City, Address and ZipCode .

    In this scenario, we leave the data types with their default value String . In the real-world practice, you can change them depending on the data types of your data to be processed.

  5. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.
  6. If needs be, complete the other fields of the Component view with values corresponding to your data to be processed. In this scenario, we leave them as is.

Then you need to configure the partitioning step.

  1. Click the link representing the partitioning step to open its Component view and click the Parallelization tab.

    The Partition row option has been automatically selected in the Type area. If you select None , you are actually disabling parallelization for the data flow to be handled over this link. Note that depending on the link you are configuring, a Repartition row option may become available in the Type area to re-partition a data flow already departitioned.

    In this Parallelization view, you need to define the following properties:

    Properties Descriptions
    Number of Child Threads The number of threads you want to split the input records up into. We recommend that this number be N-1 where N is the total number of CPUs or cores on the machine processing the data.
    Buffer Size The number of rows to cache for each of the threads generated.
    Use a key hash for partitions This allows you to use the hash mode to dispatch the input records into threads. Once selecting it, the Key Columns table appears, in which you set the column(s) you want to apply the hash mode on. In the hash mode, the records meeting the same criteria are dispatched into the same threads. If you leave this check box clear, the dispatch mode is Round-robin , meaning records are dispatched one-by-one to each thread, in a circular fashion, until the last record is dispatched. Be aware that this mode cannot guarantee that records meeting the same criteria go into the same threads
  2. In the Number of Child Threads field, enter the number of the threads you want to partition the data flow into. In this example, enter 3 because we are using 4 processors to run this Job.
  3. If required, change the value in the Buffer Size field to adapt the memory capacity. In this example, we leave the default one.

At the end of this link, the Studio automatically collect the split thread to accomplish the collecting step.

Sorting the input records

You need to firstly configure tSortRow:

  1. Double-click tSortRow to open its Component view.
  2. Under the Criteria table, click the button three times to add three rows to the table.
  3. In the Schema column column, select, for each row, the schema column to be used as the sorting criterion. In this example, select ZipCode , City and Address , sequentially.
  4. In the Sort num or alpha? column, select alpha for all the three rows.
  5. In the Order asc or desc column, select asc for all the three rows.
  6. If the schema does not appear, click the Sync columns button to retrieve the schema from the preceding component.
  7. Click Advanced settings to open its view.
  8. Select Sort on disk . Then the Temp data directory path field and the Create temp data directory if not exist check box appears.
  9. In Temp data directory path , enter the path to, or browse to the folder you want to use to store the temporary data processed by tSortRow . In this approach, tSortRow is enabled to sort considerably more data.

    As the threads will overwrite each other if they are written in the same directory, you need to create the folder for each thread to be processed using its thread ID.

    To use the variable representing the thread IDs, you need to click Code to open its view and in that view, find this variable by searching for thread_id. In this example, this variable is tCollector_1_THREAD_ID .

    Then you need to enter the path using this variable This path reads like:

    "E:/Studio/workspace/temp"+((Integer)globalMap.get("tCollector_1_THREAD_ID")).

  10. Ensure that the Create temp data directory if not exists check box is selected.

Then you need to configure the departitioning step

  1. Click the link representing the departitioning step to open its Component view and click the Parallelization tab.

    The Departition row option has been automatically selected in the Type area. If you select None , you are actually disabling parallelization for the data flow to be handled over this link. Note that depending on the link you are configuring, a Repartition row option may become available in the Type area to repartition a data flow already departitioned.

    In this Parallelization view, you need to define the following properties:

    Properties Descriptions
    Buffer Size The number of rows to be processed before the memory is freed.
    Merge sort partitions This allows you to implement the Mergesort algorithm to ensure the consistency of data.
  2. If required, change the values in the Buffer Size field to adapt the memory capacity. In this example, we leave the default value.

At the end of this link, the Studio automatically accomplish the recollecting step to group and output the execution results to the next component.

Outputting the sorted data
  1. Double click the tFileOutputDelimited component to open its Component view.
  2. In the File Name field, browse to the file, or enter the directory and the name of the file, that you want to write the sorted data in. At runtime, this file will be created if it does not exist.
Executing the Job

Then you can press F6 to run this Job.

Once done, you can check the file holding the sorted data and the temporary folders created by tSortRow for sorting data on disk. These folders were emptied once the sorting had been done.