Activer la parallélisation des flux de données - 6.5

Talend Real-Time Big Data Platform Studio Guide utilisateur

EnrichVersion
6.5
EnrichProdName
Talend Real-Time Big Data Platform
task
Création et développement
Qualité et préparation de données
EnrichPlatform
Studio Talend

Notez que la fonctionnalité décrite dans cette section est uniquement disponible si vous avez souscrit à l'une des solutions Talend Platform ou Big Data.

Vous pouvez utiliser des composants dédiés ou utiliser l'option Set parallelization dans le menu contextuel dans un Job afin d'implémenter ce type d'exécution parallèle.

Les composants dédiés sont le tPartitioner, le tCollector, le tRecollector et le tDepartitioner. Pour plus d'informations, consultez la documentation de ces composants à l'adresse https://help.talend.com.

Les sections suivantes expliquent comment utiliser l'option Set parallelization ainsi que l'onglet vertical Parallelization associé à la connexion Row.

Vous pouvez activer ou désactiver la parallélisation grâce à un simple clic. Le studio automatise ensuite l'implémentation à travers le Job.

L'implémentation de la parallélisation nécessite quatre étapes clé :

  1. Le partitionnement () : dans cette étape, le studio partitionne les enregistrements d'entrée en un nombre donné de process.

  2. La collecte () : dans cette étape, le studio collecte les process partitionnés et les envoie au composant pour qu'ils soient traités.

  3. Le dé-partitionnement () : dans cette étape, le studio regroupe les sorties des exécutions parallèles des process partitionnés.

  4. La re-collecte () : dans cette étape, le studio capture les résultats des exécutions groupées et les envoie vers un composant donné.

Une fois l'implémentation automatique effectuée, vous pouvez modifier la configuration par défaut en cliquant sur la connexion correspondante entre les composants.

L'onglet Parallelization

L'onglet Parallelization vous permet de configurer une connexion Row.

Configurez les propriétés de parallélisation de vos liens Row selon les options décrites dans le tableau suivant :

Champ/Option

Description

Partition row

Sélectionnez cette option si vous devez partitionner les enregistrements d'entrée dans un nombre spécifique de process.

Note

Cette option n'est pas disponible pour la dernière connexion Row du flux.

Departition row

Sélectionnez cette option si vous souhaitez regrouper les sorties des process traités en parallèle.

Note

Cette option n'est pas disponible pour la première connexion du flux Row.

Repartition row

Sélectionnez cette option si vous devez partitionner les process d'entrée en un certain nombre de process et regrouper les sorties des process traités en parallèle.

Note

Cette option n'est pas disponible pour la première et la dernière connexion du flux Row.

None

Option par défaut. Sélectionnez cette option lorsque vous ne souhaitez pas effectuer d'action sur les enregistrements d'entrée.

Merge sort partitions

Cochez cette case pour implémenter l'algorithme Mergesort afin d'assurer la cohérence des données.

Cette case apparaît lorsque vous sélectionnez l'option Departition row ou Repartition row.

Number of Child Threads

Saisissez le nombre de process que vous souhaitez obtenir en divisant les enregistrements d'entrée.

Ce champ apparaît lorsque vous sélectionnez l'option Partition row ou Departition row.

Buffer Size

Saisissez le nombre de lignes à mettre en cache pour chaque process généré.

Ce champ n'apparaît pas si vous sélectionnez l'option None.

Use a key hash for partitions

Cochez cette case pour utiliser le mode hachage pour répartir les enregistrements d'entrée, ce qui assure le regroupement des enregistrements répondant aux mêmes critères dans les mêmes process. Sinon, le mode de répartition des données est Round-Robin.

Cette case apparaît si vous sélectionnez l'option Partition row ou Repartition row.

Dans la table Key Columns qui apparaît après avoir coché la case, configurez les colonnes sur lesquelles vous souhaitez utiliser le mode hachage.

Scénario : Parallélisation du tri des données client volumineuses

Le Job de ce scénario trie 20 millions d'enregistrements clients en effectuant des exécutions parallèles.

Relier les composants
  1. Dans la perspective Integration de votre studio, créez un Job vide depuis le nœud Job Designs dans la vue Repository.

    Pour plus d'informations concernant la création d'un Job, consultez Conception de Jobs et de Routes.

  2. Déposez un composant tFileInputDelimited, un tSortRow et un tFileOutputDelimited dans l'espace de modélisation graphique.

    Le tFileInputDelimited (nommé test file dans cet exemple) lit les 20 millions d'enregistrements clients depuis un fichier .txt généré par le tRowGenerator.

  3. Connectez les composants entre eux à l'aide de liens Row > Main.

Activer la parallélisation
  • Cliquez-droit sur le composant de départ du Job, le tFileInputDelimited dans ce scénario. Dans le menu contextuel, sélectionnez Set parallelization.

    La parallélisation est automatiquement implémentée.

Partitionner le flux de données d'entrée

Configurer le flux d'entrée

  1. Double-cliquez sur le tFileInputDelimited pour ouvrir sa vue Component.

  2. Dans le champ File name/Stream, parcourez votre système, ou saisissez le chemin d'accès au fichier contenant les enregistrements clients à lire.

  3. Cliquez sur le bouton pour ouvrir l'éditeur de schéma dans lequel créer le schéma reflétant la structure des données clients.

  4. Cliquez cinq fois sur le bouton pour ajouter cinq lignes et renommez-les : FirstName, LastName, City, Address et ZipCode.

    Dans ce scénario, laissez la valeur par défaut des types de données, String. Dans un cas d'utilisation réelle, vous pouvez modifier les types selon les données à traiter.

  5. Cliquez sur OK pour valider les modifications et accepter la propagation proposée par la boîte de dialogue.

  6. Si nécessaire, renseignez les autres champs dans la vue Component avec les valeurs correspondant aux données à traiter. Dans ce scénario, laissez les paramètres tels qu'ils sont.

Configurer l'étape de partitionnement

  1. Cliquez sur le lien représentant l'étape de partitionnement pour ouvrir sa vue Component. Cliquez ensuite sur l'onglet Parallelization.

    L'option Partition row a été automatiquement sélectionnée dans la zone Type. Si vous sélectionnez None, vous désactivez la parallélisation du flux de données sur ce lien. Notez que selon le lien que vous configurez, une option Repartition row peut être disponible dans la zone Type afin de re-partitionner un flux de données déjà dé-partitionné.

    Dans cette vue Parallelization, vous devez définir les propriétés suivantes :

    • Number of Child Threads : le nombre de process que vous souhaitez obtenir en divisant les enregistrements d'entrée. Il est recommandé que ce nombre soit égal à N-1, où N représente le nombre total de processeurs ou de cœurs de la machine traitant les données.

    • Buffer Size : le nombre de lignes à mettre en cache pour chacun des process générés.

    • Use a key hash for partitions : cela vous permet d'utiliser le mode Hash pour répartir les enregistrements dans les process.

      Une fois la case cochée, la table Key Columns apparaît. Vous pouvez y configurer les colonnes sur lesquelles appliquer le mode Hash. En mode Hash, les enregistrements répondant aux critères sont répartis dans les mêmes process.

      Si vous laissez cette case décochée, le mode de répartition est Round-robin. Cela signifie que les enregistrements sont répartis un par un dans chaque process, de manière circulaire, jusqu'à ce que le dernier enregistrement soit distribué. Ce mode ne peut garantir que les enregistrements répondant aux critères vont bien dans les mêmes process.

  2. Dans le champ Number of Child Threads, saisissez le nombre de process que vous souhaitez obtenir en divisant les enregistrements d'entrée. Dans cet exemple, saisissez 3 car quatre processeurs sont utilisés pour exécuter le Job.

  3. Si nécessaire, modifiez la valeur dans le champ Buffer Size afin d'adapter la capacité de la mémoire. Dans cet exemple, laissez la valeur par défaut.

A la fin de ce lien, le studio collecte automatiquement les process partitionnés afin de réaliser l'étape de collecte.

Trier les enregistrements d'entrée

Configurer le tSortRow

  1. Double-cliquez sur le tSortRow pour ouvrir sa vue Component.

  2. Sous la table Criteria, cliquez trois fois sur le bouton pour ajouter trois lignes à la table.

  3. Dans la colonne Schema column, sélectionnez, pour chaque ligne, la colonne du schéma à utiliser comme critère de tri. Dans cet exemple, sélectionnez ZipCode, City et Address.

  4. Dans la colonne Sort num or alpha?, sélectionnez alpha pour les trois lignes.

  5. Dans la colonne Order asc or desc, sélectionnez asc pour les trois lignes.

  6. Si le schéma n'apparaît pas, cliquez sur le bouton Sync columns pour récupérer le schéma du composant précédent.

  7. Cliquez sur Advanced settings pour ouvrir la vue correspondante.

  8. Sélectionnez Sort on disk. Le champ Temp data directory path et la case Create temp data directory if not exist apparaissent.

  9. Dans le champ Temp data directory path, saisissez le chemin d'accès, ou parcourez votre système jusqu'au dossier dans lequel stocker les données temporaires traitées par le tSortRow. Ainsi, le tSortRow permet de traiter considérablement plus de données.

    Comme les process s'écrasent s'ils sont écrits dans le même répertoire, vous devez créer le dossier pour chaque process à traiter, via l'ID du process.

    Pour utiliser la variable représentant l'ID du process, cliquez sur l'onglet Code afin d'ouvrir cette vue. Dans cette vue, recherchez thread_id. Dans cet exemple, la variable est tCollector_1_THREAD_ID.

    Saisissez le chemin d'accès à l'aide de la variable. Le chemin d'accès se présente comme suit :

    "E:/Studio/workspace/temp"+((Integer)globalMap.get("tCollector_1_THREAD_ID")).

  10. Vérifiez que la case Create temp data directory if not exists est cochée.

Configurer l'étape de dé-partitionnement

  1. Cliquez sur le lien représentant l'étape de dé-partitionnement pour ouvrir sa vue Component. Cliquez ensuite sur l'onglet Parallelization.

    L'option Departition row a été automatiquement sélectionnée dans la zone Type. Si vous sélectionnez None, vous désactivez la parallélisation du flux de données sur ce lien. Notez que selon le lien que vous configurez, une option Repartition row peut être disponible dans la zone Type afin de re-partitionner un flux de données déjà dé-partitionné.

    Dans cette vue Parallelization, vous devez définir les propriétés suivantes :

    • Buffer Size : le nombre de lignes qui sont traitées avant que la mémoire soit libérée.

    • Merge sort partitions : cette option permet d'implémenter l'algorithme Mergesort afin d'assurer la cohérence des données.

  2. Si nécessaire, modifiez les valeurs dans le champ Buffer Size, pour adapter la capacité de la mémoire. Dans cet exemple, laissez la valeur par défaut.

A la fin de ce lien, le studio effectue automatiquement l'étape de re-collecte afin de grouper les résultats de l'exécution et de les passer au composant suivant.

Ecrire en sortie les données triées
  1. Double-cliquez sur le composant tFileOutputDelimited pour ouvrir sa vue Component.