How to enable parallelization of data flows - 6.4

Talend Data Management Platform Studio User Guide

EnrichVersion
6.4
EnrichProdName
Talend Data Management Platform
task
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

Note that this type of parallelization is available only on the condition that you have subscribed to one of the Talend Platform solutions or Big Data solutions.

You can use dedicated components or the Set parallelization option in the contextual menu within a Job to implement this type of parallel execution.

The dedicated components are tPartitioner, tCollector, tRecollector and tDepartitioner. For related information, see the documentation of these components at https://help.talend.com.

The following sections explains how to use the Set parallelization option and the related Parallelization vertical tab associated with a Row connection.

You can enable or disable the parallelization by one single click, and then the Studio automates the implementation across a given Job.

The implementation of the parallelization requires four key steps as explained as follows:

  1. Partitioning (): In this step, the Studio splits the input records into a given number of threads.

  2. Collecting (): In this step, the Studio collects the split threads and sends them to a given component for processing.

  3. Departitioning (): In this step, the Studio groups the outputs of the parallel executions of the split threads.

  4. Recollecting (): In this step, the Studio captures the grouped execution results and outputs them to a given component.

Once the automatic implementation is done, you can alter the default configuration by clicking the corresponding connection between components.

The Parallelization tab

The Parallelization tab is available as one of the settings tab you can use to configure a Row connection.

You define the parallelization properties on your row connections according to the following table.

Field/Option

Description

Partition row

Select this option when you need to partition the input records into a specific number of threads.

Note

It is not available to the last row connection of the flow.

Departition row

Select this option when you need to regroup the outputs of the processed parallel threads.

Note

It is not available to the first row connection of the flow.

Repartition row

Select this option when you need to partition the input threads into a specific number of threads and regroup the outputs of the processed parallel threads.

Note

It is not available to the first or the last row connection of the flow.

None

Default option. Select this option when you do not want to take any action on the input records.

Merge sort partitions

Select this check box to implement the Mergesort algorithm to ensure the consistency of data.

This check box appears when you select the Departition row or Repartition row option.

Number of Child Threads

Type in the number of threads into which you want to split the input records.

This field appears when you select the Partition row or Departition row option.

Buffer Size

Type in the number of rows to cache for each of the threads generated.

This field does not appear if you select the None option.

Use a key hash for partitions

Select this check box to use the hash mode for dispatching the input records, which will ensure the records meeting the same criteria are dispatched to the same threads. Otherwise, the dispatch mode is Round-robin.

This check box appears if you select the Partition row or Repartition row option.

In the Key Columns table that appears after you select the check box, set the columns on which you want to use the hash mode.

Scenario: sorting the customer data of large size in parallel

The Job in this scenario puts in order 20 million customer records by running parallelized executions.

Linking the components
  1. In the Integration perspective of your Studio, create an empty Job from the Job Designs node in the Repository tree view.

    For further information about how to create a Job, see Designing a Job.

  2. Drop the following components onto the workspace: tFileInputDelimited, tSortRow and tFileOutputDelimited.

    The tFileInputDelimited component (labeled test file in this example) reads the 20 million customer records from a .txt file generated by tRowGenerator.

  3. Connect the components using the Row > Main link.

Enabling parallelization
  • Right-click the start component of the Job, tFileInputDelimited in the scenario, and from the contextual menu, select Set parallelization.

    Then the parallelization is automatically implemented.

Splitting the input data flow

Configuring the input flow

  1. Double-click tFileInputDelimited to open its Component view.

  2. In the File name/Stream field, browse to, or enter the path to the file storing the customer records to be read.

  3. Click the button to open the schema editor where you need to create the schema to reflect the structure of the customer data.

  4. Click the button five times to add five rows and rename them as follows: FirstName, LastName, City, Address and ZipCode.

    In this scenario, we leave the data types with their default value String. In the real-world practice, you can change them depending on the data types of your data to be processed.

  5. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  6. If needs be, complete the other fields of the Component view with values corresponding to your data to be processed. In this scenario, we leave them as is.

Configuring the partitioning step

  1. Click the link representing the partitioning step to open its Component view and click the Parallelization tab.

    The Partition row option has been automatically selected in the Type area. If you select None, you are actually disabling parallelization for the data flow to be handled over this link. Note that depending on the link you are configuring, a Repartition row option may become available in the Type area to repartition a data flow already departitioned.

    In this Parallelization view, you need to define the following properties:

    • Number of Child Threads: the number of threads you want to split the input records up into. We recommend that this number be N-1 where N is the total number of CPUs or cores on the machine processing the data.

    • Buffer Size: the number of rows to cache for each of the threads generated.

    • Use a key hash for partitions: this allows you to use the hash mode to dispatch the input records into threads.

      Once selecting it, the Key Columns table appears, in which you set the column(s) you want to apply the hash mode on. In the hash mode, the records meeting the same criteria are dispatched into the same threads.

      If you leave this check box clear, the dispatch mode is Round-robin, meaning records are dispatched one-by-one to each thread, in a circular fashion, until the last record is dispatched. Be aware that this mode cannot guarantee that records meeting the same criteria go into the same threads.

  2. In the Number of Child Threads field, enter the number of the threads you want to partition the data flow into. In this example, enter 3 because we are using 4 processors to run this Job.

  3. If required, change the value in the Buffer Size field to adapt the memory capacity. In this example, we leave the default one.

At the end of this link, the Studio automatically collect the split thread to accomplish the collecting step.

Sorting the input records

Configuring tSortRow

  1. Double-click tSortRow to open its Component view.

  2. Under the Criteria table, click the button three times to add three rows to the table.

  3. In the Schema column column, select, for each row, the schema column to be used as the sorting criterion. In this example, select ZipCode, City and Address, sequentially.

  4. In the Sort num or alpha? column, select alpha for all the three rows.

  5. In the Order asc or desc column, select asc for all the three rows.

  6. If the schema does not appear, click the Sync columns button to retrieve the schema from the preceding component.

  7. Click Advanced settings to open its view.

  8. Select Sort on disk. Then the Temp data directory path field and the Create temp data directory if not exist check box appear.

  9. In Temp data directory path, enter the path to, or browse to the folder you want to use to store the temporary data processed by tSortRow. In this approach, tSortRow is enabled to sort considerably more data.

    As the threads will overwrite each other if they are written in the same directory, you need to create the folder for each thread to be processed using its thread ID.

    To use the variable representing the thread IDs, you need to click  Code  to open its view and in that view, find this variable by searching for thread_id. In this example, this variable is  tCollector_1_THREAD_ID.

    Then you need to enter the path using this variable  This path reads like:

    "E:/Studio/workspace/temp"+((Integer)globalMap.get("tCollector_1_THREAD_ID")).

  10. Ensure that the Create temp data directory if not exists check box is selected.

Configuring the departitioning step

  1. Click the link representing the departitioning step to open its Component view and click the Parallelization tab.

    The Departition row option has been automatically selected in the Type area. If you select None, you are actually disabling parallelization for the data flow to be handled over this link. Note that depending on the link you are configuring, a Repartition row option may become available in the Type area to repartition a data flow already departitioned.

    In this Parallelization view, you need to define the following properties:

    • Buffer Size: the number of rows to be processed before the memory is freed.

    • Merge sort partitions: this allows you to implement the Mergesort algorithm to ensure the consistency of data.

  2. If required, change the values in the Buffer Size field to adapt the memory capacity. In this example, we leave the default value.

At the end of this link, the Studio automatically accomplish the recollecting step to group and output the execution results to the next component.

Outputting the sorted data
  1. Double click the tFileOutputDelimited component to open its Component view.