Activer la parallélisation des flux de données - 6.5

Talend Data Services Platform Studio Guide utilisateur

EnrichVersion
6.5
EnrichProdName
Talend Data Services Platform
EnrichPlatform
Studio Talend
task
Création et développement
Qualité et préparation de données

Notez que la fonctionnalité décrite dans cette section est uniquement disponible si vous avez souscrit à l'une des solutions Talend Platform ou Big Data.

Vous pouvez utiliser des composants dédiés ou utiliser l'option Set parallelization dans le menu contextuel dans un Job afin d'implémenter ce type d'exécution parallèle.

Les composants dédiés sont le tPartitioner, le tCollector, le tRecollector et le tDepartitioner. Pour plus d'informations, consultez la documentation de ces composants sur https://help.talend.com.

Les sections suivantes expliquent comment utiliser l'option Set parallelization ainsi que l'onglet vertical Parallelization associé à la connexion Row.

Le Studio automatise ensuite l'implémentation à travers le Job.

L'implémentation de la parallélisation nécessite quatre étapes clé :

  1. Le partitionnement () : dans cette étape, le Studio partitionne les enregistrements d'entrée en un nombre donné de process.

  2. La collecte () : dans cette étape, le Studio collecte les process partitionnés et les envoie au composant pour qu'ils soient traités.

  3. Le dé-partitionnement () : dans cette étape, le Studio regroupe les sorties des exécutions parallèles des process partitionnés.

  4. La re-collecte () : dans cette étape, le Studio capture les résultats des exécutions groupées et les envoie vers un composant donné.

Une fois l'implémentation automatique effectuée, vous pouvez modifier la configuration par défaut en cliquant sur la connexion correspondante entre les composants.

L'onglet Parallelization

L'onglet Parallelization vous permet de configurer une connexion Row.

Configurez les propriétés de parallélisation de vos liens Row selon les options décrites dans le tableau suivant.

Champ/Option

Description

Partition row

Sélectionnez cette option si vous devez partitionner les enregistrements d'entrée dans un nombre spécifique de process.

Note

Cette option n'est pas disponible pour la dernière connexion Row du flux.

Departition row

Sélectionnez cette option si vous souhaitez regrouper les sorties des process traités en parallèle.

Note

Cette option n'est pas disponible pour la première connexion du flux Row.

Repartition row

Sélectionnez cette option si vous devez partitionner les process d'entrée en un certain nombre de process et regrouper les sorties des process traités en parallèle.

Note

Cette option n'est pas disponible pour la première et la dernière connexion du flux Row.

None

Option par défaut. Sélectionnez cette option lorsque vous ne souhaitez pas effectuer d'action sur les enregistrements d'entrée.

Merge sort partitions

Cochez cette case pour utiliser l'algorithme Mergesort afin d'assurer la cohérence des données.

Cette case apparaît lorsque vous sélectionnez l'option Departition row ou Repartition row.

Number of Child Threads

Saisissez le nombre de process que vous souhaitez obtenir en divisant les enregistrements d'entrée.

Ce champ apparaît lorsque vous sélectionnez l'option Partition row ou Departition row.

Buffer Size

Saisissez le nombre de lignes à mettre en cache pour chaque process généré.

Ce champ n'apparaît pas si vous sélectionnez l'option None.

Use a key hash for partitions

Cochez cette case pour utiliser le mode hachage pour répartir les enregistrements d'entrée, ce qui assure le regroupement des enregistrements répondant aux mêmes critères dans les mêmes process. Sinon, le mode de répartition des données est Round-Robin.

Cette case apparaît si vous sélectionnez l'option Partition row ou Repartition row.

Dans la table Key Columns qui apparaît après avoir coché la case, configurez les colonnes sur lesquelles vous souhaitez utiliser le mode hachage.

Scénario :

Le Job de ce scénario trie 20 millions d'enregistrements clients en effectuant des exécutions parallèles.

Relier les composants
  1. Dans la perspective Integration du Studio, créez un Job vide depuis le nœud Job Designs dans l'arborescence Repository.

    Pour plus d'informations concernant la création d'un Job, consultez Conception des Jobs et des Routes.

  2. Déposez un composant tFileInputDelimited, un tSortRow et un tFileOutputDelimited dans l'espace de modélisation graphique.

    Le composant tFileInputDelimited (nommé test file dans cet exemple) lit les 20 millions d'enregistrements clients depuis un fichier .txt généré par le tRowGenerator.

  3. Connectez les composants entre eux à l'aide de liens Row > Main.

Activer la parallélisation
  • Cliquez-droit sur le composant de départ du Job, le tFileInputDelimited dans ce scénario. Dans le menu contextuel, sélectionnez Set parallelization.

    Dans le menu contextuel, sélectionnez Set parallelization.

Partitionner le flux de données d'entrée

Configurer le flux d'entrée

  1. Double-cliquez sur le tFileInputDelimited pour ouvrir sa vue Component.

  2. Dans le champ File name/Stream, parcourez votre système, ou saisissez le chemin d'accès au fichier contenant les enregistrements clients à lire.

  3. Cliquez sur l'icône pour ouvrir l'éditeur de schéma dans lequel créer le schéma reflétant la structure des données clients.

  4. Cliquez sur l'icône Cliquez cinq fois sur le bouton pour ajouter cinq lignes et renommez-les : FirstName, LastName, City, Address et ZipCode.

    Dans ce scénario, laissez la valeur par défaut des types de données, String. Dans un cas d'utilisation réelle, vous pouvez modifier les types selon les données à traiter.

  5. Cliquez sur OK pour valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.

  6. Si nécessaire, renseignez les autres champs dans la vue Component avec les valeurs correspondant aux données à traiter. Dans ce scénario, laissez les paramètres tels qu'ils sont.

Configurer l'étape de partitionnement

  1. Cliquez sur le lien représentant l'étape de partitionnement pour ouvrir sa vue Component. Cliquez ensuite sur l'onglet Parallelization.

    L'option Partition row a été automatiquement sélectionnée dans la zone Type. Si vous sélectionnez None, vous désactivez la parallélisation du flux de données sur ce lien. Notez que selon le lien que vous configurez, une option Repartition row peut être disponible dans la zone Type afin de re-partitionner un flux de données déjà dé-partitionné.

    Dans cette vue Parallelization, vous devez définir les propriétés suivantes :

    • Number of Child Threads : le nombre de process que vous souhaitez obtenir en divisant les enregistrements d'entrée. Il est recommandé de saisir un nombre N-1 où N est le nombre total de CPU ou cœurs de la machine traitant les données.

    • Buffer Size : le nombre de lignes à mettre en cache pour chacun des process générés.

    • Use a key hash for partitions : cela vous permet d'utiliser le mode Hash pour répartir les enregistrements dans les process.

      Une fois la case cochée, la table Key Columns s'affiche. Vous pouvez y configurer les colonnes sur lesquelles appliquer le mode Hash. En mode Hash, les enregistrements répondant aux critères sont répartis dans les mêmes process.

      Si vous laissez cette case décochée le mode de répartition est Round-robin, ce qui signifie que les enregistrements sont répartis un par un dans chaque process, de manière circulaire, jusqu'à ce que le dernier enregistrement soit distribué. Ce mode ne peut garantir que les enregistrements répondant aux critères vont bien dans les mêmes process.

  2. Dans le champ Number of Child Threads, saisissez le nombre de process (threads) que vous souhaitez obtenir en divisant les enregistrements d'entrée. Dans cet exemple, saisissez 3 car quatre processeurs sont utilisés pour exécuter le Job.

  3. Si nécessaire, modifiez la valeur dans le champ Buffer Size afin d'adapter la capacité de la mémoire. Dans cet exemple, laissez la valeur par défaut.

A la fin de ce lien, le Studio collecte automatiquement les process partitionnés afin de réaliser l'étape de collecte.

Trier les enregistrements d'entrée

Configurer le tSortRow

  1. Double-cliquez sur le tSortRow pour ouvrir sa vue Component.

  2. Sous la table Criteria, cliquez sur le bouton pour ajouter trois lignes à la table.

  3. Dans la colonne Schema column, sélectionnez, pour chaque ligne, la colonne du schéma à utiliser comme critère de tri. Dans cet exemple, sélectionnez ZipCode, City et Address.

  4. Dans la colonne Sort num or alpha?, sélectionnez alpha pour les trois lignes.

  5. Dans la colonne Order asc or desc, sélectionnez asc pour les trois lignes.

  6. Si le schéma ne s'affiche pas, cliquez sur le bouton Sync columns pour récupérer le schéma du composant précédent.

  7. Cliquez sur Advanced settings pour ouvrir cette vue.

  8. Cochez la case Sort on disk. Le champ Temp data directory path et la case Create temp data directory if not exist s'affichent.

  9. Dans le champ Temp data directory path, saisissez le chemin d'accès, ou parcourez votre système jusqu'au dossier dans lequel stocker les données temporaires traitées par le tSortRow. Ainsi, le tSortRow permet de traiter considérablement plus de données.

    Comme les process s'écrasent s'ils sont écrits dans le même répertoire, vous devez créer le dossier pour chaque process à traiter, via l'ID du process.

    Pour utiliser la variable représentant l'ID du process, cliquez sur l'onglet Code afin d'ouvrir cette vue. Dans cette vue, recherchez thread_id. Dans cet exemple, la variable est tCollector_1_THREAD_ID.

    Saisissez le chemin d'accès à l'aide de la variable. Le chemin d'accès se présente comme suit :

    "E:/Studio/workspace/temp"+((Integer)globalMap.get("tCollector_1_THREAD_ID")).

  10. Vérifiez que la case Create temp data directory if not exists est cochée.

Configurer l'étape de dé-partitionnement

  1. Cliquez sur le lien représentant l'étape de dé-partitionnement pour ouvrir sa vue Component. Cliquez ensuite sur l'onglet Parallelization.