Lire et écrire des données dans MongoDB à l'aide d'un Job Spark Streaming - 6.3

Composants Talend Guide de référence

EnrichVersion
6.3
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for Data Quality
Talend Open Studio for ESB
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
task
Création et développement
Gouvernance de données
Qualité et préparation de données
EnrichPlatform
Studio Talend

Dans ce scénario, vous allez créer un Job Spark Streaming pour extraire des données concernant des réalisateurs de films à partir de MongoDB, utiliser ces données pour filtrer et compléter les informations relatives aux films et écrire les résultats dans une collection MongoDB.

L'échantillon de données relatif aux réalisateurs de présente comme suit :

1;Gregg Araki	
2;P.J. Hogan 
3;Alan Rudolph 
4;Alex Proyas
5;Alex Sichel

Ces données contiennent les noms des réalisateurs et l'ID leur ayant été attribué.

La structure de ces données dans MongoDB se présente comme suit :

{ "_id" : ObjectId("575546da3b1c7e22bc7b2189"), "person" : { "id" : 3, "name" : "Alan Rudolph" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b218b"), "person" : { "id" : 4, "name" : "Alex Proyas" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b218c"), "person" : { "id" : 5, "name" : "Alex Sichel" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b2188"), "person" : { "id" : 1, "name" : "Gregg Arakit" } }
{ "_id" : ObjectId("575546da3b1c7e22bc7b218a"), "person" : { "id" : 2, "name" : "P.J. Hogan" } }

Notez que les données d'exemple sont créées à des fins de démonstration uniquement.

Prérequis :

  • Le cluster Spark et la base de données MongoDB à utiliser doivent avoir été installés et être en cours de fonctionnement.

  • Les données susmentionnées ont été chargées dans la collection MongoDB à utiliser.

Pour reproduire ce scénario, procédez comme suit :

Relier les composants

  1. Dans la perspective Integration du Studio, créez un Job Spark Batch vide depuis le nœud Job Designs de la vue Repository.

    Pour plus d'informations concernant la création d'un Job Spark Streaming, consultez Guide de prise en main de Talend Big Data.

  2. Dans l'espace de modélisation graphique, saisissez le nom du composant à utiliser et sélectionnez ce composant dans la liste qui s'affiche. Dans ce scénario, les composants utilisés sont les suivants : tHDFSConfiguration, tMongoDBConfiguration, tFixedFlowInput, tMongoDBOutput, tMongoDBLookupInput, tMap et tLogRow.

    Les composants tFixedFlowInput sont utilisés pour charger les données relatives aux films dans le flux de données. Dans un scénario réel, vous pouvez utiliser d'autres composants, comme le tFileInputDelimited pour créer un processus sophistiqué afin de préparer vos données à traiter.

  3. Reliez le tFixedFlowInput au tMap à l'aide d'un lien Row > Main.

    Le flux principal du tMap est ainsi créé. Les informations des films sont envoyées via ce flux.

  4. Reliez le tMongoDBLookupInput au tMap à l'aide d'un lien Row > Main.

    Le flux de référence (lookup) vers le tMap est ainsi créé. Les informations des réalisateurs de films sont transmises via ce flux.

  5. Reliez le tMap au tMongoDBOutput à l'aide d'un lien Row > Main et nommez ce lien dans la boîte de dialogue qui s'ouvre. Par exemple, nommez-le out1.

  6. Répétez l'opération pour connecter le tMap au tLogRow et nommez ce lien reject.

  7. Laissez les composants tHDFSConfiguration et tMongoDBConfiguration sans lien.

Configurer la connexion à Spark

  1. Cliquez sur le bouton Run pour ouvrir sa vue, puis cliquez sur l'onglet Spark Configuration pour afficher sa vue et configurer la connexion à Spark.

    Cette vue doit ressembler à l'image ci-dessous :

  2. Sélectionnez le type de cluster Spark auquel vous connecter.

    • Local : le Studio construit l'environnement Spark en lui-même au moment de l'exécution locale du Job dans le Studio. Avec ce mode, chaque processeur de la machine locale est utilisé comme Worker Spark pour effectuer les calculs. Ce mode requiert la configuration d'un minimum de paramètres dans la vue de configuration.

      Notez que cette machine locale est la machine sur laquelle s'exécute le Job. Le mode Local est le mode par défaut. Vous devez décocher cette case pour afficher la liste déroulante dans laquelle sélectionner les autres modes.

    • Standalone : le Studio se connecte à un cluster compatible Spark pour exécuter le Job depuis ce cluster.

    • Yarn client : le Studio exécute le pilote Spark pour orchestrer la manière dont sera exécuté le Job puis envoie l'orchestration au service Yarn d'un cluster Hadoop donné, afin que le Resource Manager de ce service Yarn demande des ressources pour l'exécution.

  3. Si vous utilisez le mode Yarn client, la liste Property type s'affiche et vous permet de sélectionner une connexion à Hadoop établie depuis le Repository, si vous avez créé cette connexion dans le Repository. Le Studio réutilise l'ensemble des informations de connexion pour ce Job.

    Pour plus d'informations concernant la création d'une connexion Hadoop dans le Repository, consultez le chapitre décrivant le nœud Hadoop cluster dans le Guide utilisateur du Studio Talend.

  4. Sélectionnez la version de la distribution Hadoop à utiliser avec Spark.

    • Si vous sélectionnez Microsoft HD Insight 3.4, vous devez configurer les connexions au service Livy, au service HD Insight et au service Windows Azure Storage du cluster, dans les zones qui s'affichent. Une vidéo de démonstration relative à la configuration d'une connexion à un cluster Microsoft HD Insight est disponible à l'adresse suivante : https://www.youtube.com/watch?v=A3QTT6VsNoM (en anglais).

      Le nom de l'hôte (Hostname) de Livy utilise la syntaxe suivante : nom_de_votre_cluster_spark.azurehdinsight.net. Pour plus d'informations concernant le service Livy utilisé par HD Insight, consultez Submit Spark jobs using Livy (en anglais).

    • Si vous sélectionnez Amazon EMR, consultez l'article suivant concernant la configuration de la connexion : Amazon EMR - Getting Started (en anglais) sur Talend Help Center (https://help.talend.com). Il est recommandé d'installer votre JobServer Talend dans le cluster EMR. Pour plus d'informations concernant ce JobServer, consultez le Guide d'installation Talend.

    Si vous ne trouvez pas votre distribution dans la liste déroulante, cela signifie que la distribution à laquelle vous souhaitez vous connecter n'est pas officiellement supportée par Talend. Dans ce cas, vous pouvez sélectionner Custom puis la version de Spark, dans la liste Spark version, du cluster auquel vous connecter. Cliquez sur le bouton pour afficher une boîte de dialogue dans laquelle vous pouvez :

    1. Sélectionner Import from existing version pour importer une distribution officiellement supportée comme base et ajouter d'autres fichiers .jar requis que la distribution de base ne fournit pas.

    2. Sélectionner Import from zip pour importer le .zip de configuration pour la distribution personnalisée à utiliser. Ce fichier .zip doit contenir les bibliothèques des différents éléments Hadoop/Spark et le fichier d'index de ces bibliothèques.

      Notez que les versions personnalisées ne sont pas officiellement supportées par Talend. Talend et sa communauté fournissent l'opportunité de vous connecter à des versions personnalisées depuis le Studio mais ne peuvent vous garantir la simplicité de la configuration de la version que vous choisissez. Il est recommandé de configurer ces connexions si vous avez une expérience suffisante de Hadoop et de Spark pour gérer par vous-mêmes les problèmes pouvant survenir.

  5. Configurez les informations de connexion aux principaux services du cluster à utiliser.

    Si vous utilisez le mode Yarn client, vous devez saisir les adresses des différents services dans les champs correspondants (si vous laissez décochée la case d'un service, lors de l'exécution, la configuration du paramètre en question du cluster Hadoop sera ignorée) :

    • Dans le champ Resource manager, saisissez l'adresse du service ResourceManager du cluster Hadoop à utiliser.

    • Cochez la case Set resourcemanager scheduler address et saisissez l'adresse de l'ordonnanceur (Scheduler) dans le champ qui apparaît.

    • Cochez la case Set jobhistory address et saisissez l'emplacement du serveur JobHistory du cluster Hadoop à utiliser. Cela permet de stocker les informations relatives aux métriques du Job courant sur le serveur JobHistory.

    • Cochez la case Set staging directory et saisissez le chemin d'accès au répertoire défini dans votre cluster Hadoop pour les fichiers temporaires créés par l'exécution de programmes. Ce répertoire se trouve sous la propriété yarn.app.mapreduce.am.staging-dir dans les fichiers de configuration, notamment les fichiers yarn-site.xml et mapred-site.xml de votre distribution.

    • Si vous accédez au cluster Hadoop s'exécutant avec la sécurité Kerberos, cochez cette case. Saisissez les noms des principaux Kerberos pour le service du ResourceManager et le service du JobHistory dans les champs qui s'affichent. Cela vous permet d'utiliser votre identifiant pour vous authentifier par rapport aux informations stockées dans Kerberos. Ces principaux se trouvent dans les fichiers de configuration de votre distribution. Par exemple, dans une distribution CDH4, le Principal du ResourceManager est configuré dans le fichier yarn-site.xml et celui du JobHistory dans le fichier mapred-site.xml.

      • Si ce cluster est un cluster MapR de version 4.0.1 ou postérieure, vous pouvez paramétrer la configuration de l'authentification par ticket MapR en plus ou comme une alternative en suivant les explications dans Connexion sécurisée à MapR.

        Gardez à l'esprit que cette configuration génère un nouveau ticket de sécurité MapR pour le nom d'utilisateur défini dans le Job dans chaque exécution. Si vous devez réutiliser un ticket existant provenant du même utilisateur, laissez décochées les cases Force MapR ticket authentication et Use Kerberos authentication. MapR devrait pouvoir trouver automatiquement ce ticket à la volée.

      Si vous souhaitez utiliser un fichier Kerberos keytab pour vous identifier, cochez la case Use a keytab to authenticate. Un fichier keytab contient des paires de principaux et clés cryptées Kerberos. Vous devez saisir le principal à utiliser dans le champ Principal et le chemin d'accès au fichier keytab dans le champ Keytab.

      Notez que l'utilisateur qui exécute un Job utilisant un keytab n'est pas forcément celui désigné par le principal mais qu'il doit avoir le droit de lecture pour le fichier keytab utilisé. Par exemple, le nom d'utilisateur que vous utilisez pour exécuter le Job est user1 et le principal à utiliser est guest. Dans cette situation, assurez-vous que user1 a les droits de lecture pour le fichier keytab à utiliser.

    • Le champ User name est disponible lorsque vous n'utilisez pas Kerberos pour vous authentifier. Dans ce champ, saisissez votre identifiant pour cette distribution. Si vous laissez le champ vide, le nom de la machine hébergeant le Studio sera utilisé.

      Comme le Job doit charger dans HDFS des fichiers .jar du cluster à utiliser, vous devez vous assurer que le nom d'utilisateur est le même que celui défini dans le tHDFSConfiguration, le composant utilisé pour fournir à Spark les informations de connexion à HDFS.

    Si vous utilisez le mode Standalone, vous devez configurer les paramètres suivants :

    • Dans le champ Spark host, saisissez l'URI du Spark Master du cluster Hadoop utilisé.

    • Dans le champ Spark home, saisissez l'emplacement où l'exécutable Spark est installé dans le cluster Hadoop utilisé.

  6. Si vous devez exécuter le Job courant sous Windows, il est recommandé de spécifier où le programme winutils.exe à utiliser est stocké.

    • Si vous savez où se trouve le fichier winutils.exe et que vous souhaitez l'utiliser, cochez la case Define the Hadoop home directory et saisissez le répertoire dans lequel est stocké winutils.exe.

    • Sinon, laissez cette case décochée. Le Studio en génère un par lui-même et l'utilise pour le Job.

  7. Si le cluster Spark ne peut reconnaître la machine sur laquelle le Job est lancé, cochez la case Define the driver hostname or IP address et saisissez le nom de l'hôte ou l'adresse IP de cette machine. Cela permet au Master et à son Worker Spark de reconnaître la machine où se trouve le Job et donc son pilote.

    Notez que, dans cette situation, vous devez également ajouter le nom et l'adresse IP de cette machine dans son fichier host.

  8. Dans le champ Batch size, saisissez l'intervalle de temps à la fin duquel le Job revoit la source de données pour identifier les modifications et traite les nouveaux micro-batches.

  9. Si nécessaire, cochez la case Define a streaming timeout et, dans le champ qui s'affiche, saisissez la période de temps à la fin de laquelle le Job Streaming s'arrête automatiquement.

  10. Si vous souhaitez que le Job résiste aux échecs, cochez la case Activate checkpointing pour activer l'opération Spark de point de contrôle. Dans le champ qui s'affiche, saisissez le répertoire dans lequel Spark stocke, dans le système de fichiers du cluster, les données de contexte des calculs de Streaming, comme les métadonnées et les RDD générés par ce calcul.

    Pour plus d'informations concernant les points de contrôle Spark, consultez http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing (en anglais).

  11. Cochez la case Set Tuning properties pour optimiser l'allocation des ressources à utiliser pour exécuter le Job. Ces propriétés ne sont pas obligatoires pour que le Job s'exécute correctement, mais elles sont utiles lorsque Spark est congestionné par des problèmes de ressources dans le cluster, comme le processeur, la bande passante ou la mémoire :

    • Driver memory et Driver core : saisissez la taille de la mémoire et le nombre de cœurs à allouer au pilote du Job courant.

    • Executor memory : saisissez la taille de la mémoire à allouer à chaque exécuteur Spark.

    • Set executor memory : cochez cette case et, dans le champ qui s'affiche, saisissez le montant de mémoire hors tas (off-heap) en Mo à allouer pour chaque exécuteur. Il s'agit de la propriété spark.yarn.executor.memoryOverhead.

    • Core per executor : cochez cette case et, dans le champ affiché, saisissez le nombre de cœurs à utiliser par chaque exécuteur. Si vous laissez cette case décochée, l'allocation définie par défaut par Spark est utilisée. Par exemple, tous les cœurs disponibles sont utilisés par un exécuteur en mode Standalone.

    • Set Web UI port : si vous devez modifier le port par défaut de l'application Web de Spark, cochez cette case et saisissez le numéro du port à utiliser.

    • Broadcast factory : sélectionnez l'implémentation du broadcast à utiliser pour mettre les variables en cache sur chaque machine de Worker.

    • Customize Spark serializer : si vous devez importer un sérialiseur Spark externe, cochez cette case et, dans le champ qui s'affiche, saisissez le nom de la classe entièrement qualifié du sérialiseur à utiliser.

    • Yarn resource allocation : sélectionnez la manière dont vous souhaitez que Yarn alloue des ressources parmi les exécuteurs.

      • Auto : vous laissez Yarn utiliser son nombre d'exécuteurs par défaut, à savoir 2.

      • Fixed : vous devez saisir le nombre d'exécuteurs à utiliser dans le champ Num executors qui s'affiche.

      • Dynamic : Yarn modifie le nombre d'exécuteurs afin de s'adapter à la charge de travail. Vous devez définir l'échelle de cette allocation dynamique en définissant le nombre initial d'exécuteurs à exécuter dans le champ Initial executors, le nombre le plus faible d'exécuteurs dans le champ Min executors et le plus grand nombre d'exécuteurs dans le champ Max executors.

      Cette fonctionnalité est disponible en mode Yarn client uniquement.

    • Activate backpressure : cochez cette case pour activer la fonctionnalité Backpressure de Spark. Cette fonctionnalité est disponible à partir de la version 1.5 de Spark. Une fois activée, Spark trouve automatiquement le taux de réception optimal et adapte dynamiquement ce taux en fonction des retards d'ordonnancement et des temps de traitement batch, afin de recevoir les données au rythme auquel il peut les traiter.

  12. Dans le champ Spark "scratch" directory, saisissez le répertoire dans lequel le Studio stocke, dans le système local, les fichiers temporaires comme les fichiers .jar à transférer. Si vous lancez votre Job sous Windows, le disque par défaut est C:. Si vous laissez /tmp dans ce champ, le répertoire est C:/tmp.

  13. Si vous utilisez Hortonworks Data Platform V2.4.0 ou supérieure et que vous avez installé Atlas dans votre cluster, vous pouvez cocher la case Use Atlas, afin de permettre le lignage du Job au niveau des composants, notamment les modifications de schéma entre les composants.

    Lorsque cette option est activée, vous devez configurer les paramètres suivants :

    • Atlas URL : saisissez l'emplacement d'Atlas auquel se connecter. C'est généralement http://nom_de_votre_noeud_atlas:port

    • Dans les champs Username et Password, saisissez respectivement l'identifiant et le mot de passe d'authentification pour accéder à Atlas.

    • Set Atlas configuration folder : si votre cluster Atlas contient des propriétés personnalisées, comme le SSL ou le délai avant suspension de la lecture, cochez cette case et, dans le champ qui s'affiche, saisissez un chemin vers un répertoire de votre machine locale, puis placez le fichier atlas-application.properties de votre Atlas dans ce répertoire. Ainsi, votre Job peut utiliser ces propriétés personnalisées.

      Vous devez demander à l'administrateur de votre cluster ces fichiers de configuration. Pour plus d'informations concernant ce fichier, consultez la section Client Configs dans la page Atlas configuration (en anglais).

    • Die on error: cochez cette case pour arrêter l'exécution du Job lorsque des problèmes relatifs à Atlas surviennent, par exemple des problèmes de connexion à Atlas.

      Sinon, laissez cette case décochée pour que votre Job continue à s'exécuter.

    Si vous utilisez Hortonworks Data Platform V2.4, le Studio supporte uniquement Atlas 0.5. Si vous utilisez Hortonworks Data Platform.V2.5, le Studio supporte uniquement Atlas 0.7.

  14. Dans la table Advanced properties, ajoutez toute propriété Spark à utiliser pour écraser la propriété équivalente utilisée par le Studio.

    Les propriétés avancées requises par les différentes distributions Hadoop et leurs valeurs sont listées ci-dessous :

    • Hortonworks Data Platform V2.4 :

      • spark.yarn.am.extraJavaOptions : -Dhdp.version=2.4.0.0-169

      • spark.driver.extraJavaOptions : -Dhdp.version=2.4.0.0-169

      Vous devez également ajouter -Dhdp.version=2.4.0.0-169 dans la zone JVM settings, soit dans l'onglet Advanced settings de la vue Run, soit dans la vue Talend > Run/Debug de la fenêtre [Preferences]. La configuration de cet argument dans la fenêtre [Preferences] l'applique à tous les Jobs conçus dans le même Studio.

    • MapR V5.1 et V5.2 lorsque le cluster est utilisé avec la HBase ou les composants MapRDB :

      • spark.hadoop.yarn.application.classpath : saisissez la valeur de ce paramètre spécifique à votre cluster et ajoutez, s'il n'est pas renseigné, le classpath vers HBase pour vous assurer que le Job à utiliser trouve les classes et les packages dans le cluster.

        Par exemple, si la version HBase installée dans le cluster est 1.1.1, copiez-collez tous les chemins d'accès définis dans le paramètre spark.hadoop.yarn.application.classpath de votre cluster et ajoutez ensuite opt/mapr/hbase/hbase-1.1.1/lib/* et /opt/mapr/lib/* à ces chemins d'accès. Séparez les chemins d'accès par une virgule. Les chemins d'accès ajoutés indiquent les endroits où HBase est généralement installé dans le cluster MapR. Si votre HBase est installée autre part, contactez l'administrateur de votre cluster pour plus de détails et pour adapter ces chemins d'accès.

        Pour une explication détaillée relative à l'ajout de ce paramètre, consultez l'article Les Jobs HBase/MapR-DB ne peuvent pas être exécutés correctement avec MapR 5.1 ou 5.2 sur Talend Help Center.

    Pour plus d'informations concernant les propriétés Spark valides, consultez la documentation Spark à l'adresse https://spark.apache.org/docs/latest/configuration (en anglais).

Configurer la connexion au système de fichier à utiliser par Spark

  1. Double-cliquez sur le tHDFSConfiguration pour ouvrir sa vue Component. Notez que le tHDFSConfiguration est utilisé car le mode Spark Yarn client est utilisé pour exécuter des Jobs Spark dans ce scénario.

    Spark utilise ce composant pour se connecter au système HDFS auquel sont transférés les fichiers .jar dépendants du Job.

  2. Dans la zone Version, sélectionnez la distribution Hadoop et la version à laquelle vous connecter.

  3. Dans le champ NameNode URI, saisissez l'emplacement de la machine hébergeant le service NameNode du cluster.

  4. Dans le champ Username, saisissez les informations d'authentification utilisée pour vous connecter au système HDFS à utiliser. Notez que le nom d'utilisateur doit être le même que celui saisi dans l'onglet Spark configuration.

Configurer la connexion à la base de données MongoDB à utiliser par Spark

  1. Double-cliquez sur le tMongoDBConfiguration pour ouvrir sa vue Component.

  2. Dans la liste DB Version, sélectionnez la version de la base de données MongoDB à utiliser.

  3. Dans les champs Server et Port, saisissez les informations correspondantes de la base de données MongoDB.

  4. Dans le champ Database, saisissez le nom de la base de données. Cette base de données doit déjà exister.

Charger les données des films

  1. Double-cliquez sur le composant tFixedFlowIput pour ouvrir sa vue Component.

  2. Cliquez sur le bouton [...] à côté de Edit schema pour ouvrir l'éditeur de schéma.

  3. Cliquez sur le bouton [+] pour ajouter les colonnes au schéma, comme suit :

  4. Cliquez sur OK pour valider ces modifications et acceptez la propagation proposée par la boîte de dialogue.

  5. Dans le champ Input repetition interval, saisissez l'intervalle de temps à la fin duquel le tFixedFlowInput envoie les données des films une nouvelle fois. Cela vous permet de générer un flux de données.

  6. Dans la zone Mode, sélectionnez l'option Use Inline Content et collez les données suivantes dans le champ Content.

    691;Dark City;1998;http://us.imdb.com/M/title-exact?imdb-title-118929;4
    1654;Chairman of the Board;1998;http://us.imdb.com/Title?Chairman+of+the+Board+(1998);6
    903;Afterglow;1997;http://us.imdb.com/M/title-exact?imdb-title-118566;3
    255;My Best Friend's Wedding;1997;http://us.imdb.com/M/title-exact?My+Best+Friend%27s+Wedding+(1997);2
    1538;All Over Me;1997;http://us.imdb.com/M/title-exact?All%20Over%20Me%20%281997%29;5
  7. Dans le champ Field separator, saisissez un point-virgule (;).

Extraire les données des réalisateurs depuis MongoDB

  1. Double-cliquez sur le tMongoDBLookupInput pour ouvrir sa vue Component.

  2. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur de schéma.

  3. Cliquez sur le bouton [+] pour ajouter les colonnes au schéma, comme suit :

  4. Dans le champ Collection, saisissez le nom de la collection à partir de laquelle le tMongoDBLookupInput extrait les données.

  5. Dans le champ Query, saisissez la requête suivante.

    "{'person.id':" + row2.directorID +"}"

    Dans cette instruction, row2 représente le flux principal du tMap et row2.directorID la colonne directorID de ce flux. Vous devez adapter row2 au nom du lien du flux principal de votre Job.

    L'instruction complète signifie sélectionner tous les enregistrements dans lesquels le champ id dans le champ person a la même valeur que celle de la colonne directorID.

    L'exemple ci-dessus montre comment utiliser le schéma du flux principal pour construire l'instruction SQL afin de charger uniquement les enregistrements correspondants dans le flux de référence. Cette approche vous assure qu'aucun enregistrement redondant n'est stocké en mémoire avant d'être envoyé au tMap.

  6. Dans la table Mapping, les colonnes id et name ont été automatiquement ajoutées. Saisissez, entre guillemets doubles, person dans la colonne Parent node path, pour chaque ligne.

    Cette table définit la manière dont la construction hiérarchique des données de MongoDB doit être interprétée afin de correspondre au schéma du tMongoDBLookupInput.

Configurer la transformation dans le tMap

  • Double-cliquez sur le tMap pour ouvrir son éditeur Map Editor.

Créer le schéma de sortie

  1. Du côté de l'entrée (gauche) de l'éditeur Map Editor, les deux tables représentent le flux d'entrée, la table en haut représente le flux principal et celle du dessous représente le flux de référence.

    Du côté de la sortie (droite), les deux tables représentent le flux de sortie, composé des tables précédemment nommées out1 et reject.

    De la table d'entrée principale, déposez les lignes movieID, title, release et url dans chacune des tables de sortie.

  2. Déposez également la ligne directorID de la table principale dans la table de sortie reject.

  3. De la table de référence, déposez la ligne name dans chaque table de sortie.

    Dans la vue Schema editor, vous pouvez voir que les schémas des deux côtés ont été renseignés.

Configurer les conditions de mapping

  1. De la table principale, déposez la colonne directorID dans la table de référence, dans la colonne Expr. key de la ligne id.

    Cela définit la colonne utiliser pour fournir les clés de jointure.

  2. Dans la table de référence, cliquez sur l'icône pour ouvrir les paramètres de la table.

  3. Cliquez dans la colonne Value de la ligne Lookup model pour afficher le bouton [...] et cliquez sur ce bouton pour ouvrir la fenêtre [Options].

  4. Sélectionnez Reload at each row et cliquez sur OK pour valider votre choix.

  5. Répétez l'opération dans la ligne Join model pour ouvrir la fenêtre [Options] correspondante.

  6. Sélectionnez Inner Join pour vous assurer que seuls les enregistrements correspondants entre le flux principal et le flux de référence sont écrits en sortie.

  7. Dans la table de sortie reject, cliquez sur l'icône pour ouvrir les paramètres.

  8. Dans la ligne Catch lookup inner join reject, cliquez dans la colonne Value pour afficher le bouton [...], puis cliquez sur ce bouton pour ouvrir la fenêtre [Options].

  9. Sélectionnez true pour envoyer les enregistrements filtrés par la jointure Inner Join dans le flux reject, puis cliquez sur OK pour valider votre choix.

  10. Cliquez sur Apply, puis sur OK afin de valider vos modifications. Acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.

Écrire les données traités dans MongoDB

  1. Double-cliquez sur le tMongoDBOutput pour ouvrir sa vue Component.

  2. Si ce composant n'a pas le même schéma que le composant précédent, une icône d'avertissement s'affiche. Dans cette situation, cliquez sur le bouton Sync columns afin de récupérer le schéma du composant précédent. Cela fait, l'icône d'avertissement disparaît.

  3. Dans le champ Collection, saisissez le nom de la collection dans laquelle écrire les données. Si cette collection n'existe pas, elle sera automatiquement créée lors de l'exécution.

  4. Dans la liste Action on data, sélectionnez l'opération à effectuer sur les données. Dans cet exemple, sélectionnez Insert, pour créer des documents dans MongoDB. Que ces documents existent déjà ou non, l'insertion génère un nouvel ID technique pour chaque nouveau document.

  5. Laissez la table Mapping comme elle est, pour ajouter chaque enregistrement à la racine de chaque document.

Écrire les données rejetées dans le tLogRow

  1. Double-cliquez sur le tLogRow pour ouvrir sa vue Component.

  2. Si ce composant n'a pas le même schéma que le composant précédent, une icône d'avertissement s'affiche. Dans cette situation, cliquez sur le bouton Sync columns afin de récupérer le schéma du composant précédent. Cela fait, l'icône d'avertissement disparaît.

  3. Sélectionnez le bouton radio Table, pour afficher les résultats d'exécution sous forme de tableau.

Exécuter le Job

Vous pouvez appuyer sur F6 pour exécuter le Job.

Cela fait, dans la console de la vue Run, vous pouvez voir les données rejetées par la jointure Inner Join.

Les données sont affichées plusieurs fois car le tFixedFlowInput a créé un flux de données en envoyant régulièrement les mêmes enregistrements.

Vous pouvez gérer le niveau d'informations relatives à l'exécution à écrire en sortie dans la console en cochant la case log4jLevel dans l'onglet Advanced settings et en sélectionnant le niveau d'informations à afficher.

Pour plus d'informations sur les niveaux de logs du log4j, consultez la documentation d'Apache : http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html (en anglais).

Dans la base de données MongoDB default, vous pouvez vérifier que les documents ont bien été créés dans la collection movie.

{ "_id" : ObjectId("57559a613b1c7e2e6497b2bb"), "movieID" : 691, "title" : "Dark City", "release" : "1998", "url" : "http://us.imdb.com/M/title-exact?imdb-title-118929", "director_name" : "Alex Proyas" }
{ "_id" : ObjectId("57559a613b1c7e2e6497b2bc"), "movieID" : 903, "title" : "Afterglow", "release" : "1997", "url" : "http://us.imdb.com/M/title-exact?imdb-title-118566", "director_name" : "Alan Rudolph " }
{ "_id" : ObjectId("57559a613b1c7e2e6497b2be"), "movieID" : 255, "title" : "My Best Friend's Wedding", "release" : "1997", "url" : "http://us.imdb.com/M/title-exact?My+Best+Friend%27s+Wedding+(1997)", "director_name" : "P.J. Hogan " }
{ "_id" : ObjectId("57559a613b1c7e2e6497b2c0"), "movieID" : 1538, "title" : "All Over Me", "release" : "1997", "url" : "http://us.imdb.com/M/title-exact?All%20Over%20Me%20%281997%29", "director_name" : "Alex Sichel" }
{ "_id" : ObjectId("57559a613b1c7e2e6497b2ba"), "movieID" : 691, "title" : "Dark City", "release" : "1998", "url" : "http://us.imdb.com/M/title-exact?imdb-title-118929", "director_name" : "Alex Proyas" }
{ "_id" : ObjectId("57559a613b1c7e2e6497b2bd"), "movieID" : 903, "title" : "Afterglow", "release" : "1997", "url" : "http://us.imdb.com/M/title-exact?imdb-title-118566", "director_name" : "Alan Rudolph " }
{ "_id" : ObjectId("57559a613b1c7e2e6497b2bf"), "movieID" : 255, "title" : "My Best Friend's Wedding", "release" : "1997", "url" : "http://us.imdb.com/M/title-exact?My+Best+Friend%27s+Wedding+(1997)", "director_name" : "P.J. Hogan " }
{ "_id" : ObjectId("57559a613b1c7e2e6497b2c1"), "movieID" : 1538, "title" : "All Over Me", "release" : "1997", "url" : "http://us.imdb.com/M/title-exact?All%20Over%20Me%20%281997%29", "director_name" : "Alex Sichel" }

Les informations des films contiennent à présent le nom des réalisateurs à la place de leur ID. Les mêmes enregistrements ont été écrits plusieurs fois dans la collection mais leurs ID techniques (champ _id field) sont tous différents.