Création d'un script de Job pour filtrer les enregistrements de données - 6.4

Talend Real-time Big Data Platform Studio Guide utilisateur

EnrichVersion
6.4
EnrichProdName
Talend Real-Time Big Data Platform
task
Création et développement
Qualité et préparation de données
EnrichPlatform
Studio Talend

Cet exemple explique comment définir un Job qui lira un fichier CSV et qui filtrera les enregistrements de données selon des conditions données. Le Job affichera ensuite le résumé des informations : le nombre total d'enregistrements lus à partir du fichier source, le nombre d'enregistrements passant le filtre (acceptés) et le nombre d'enregistrements rejetés.

Le Job contiendra les composants suivants :

  • un composant tFileInputDelimited pour lire le fichier CSV source contenant des informations sur des personnes. Le fichier source contient cinq colonnes, présentées comme suit :

    name;gender;age;city;marriageStatus
    Van Buren;M;73;Chicago;married
    Adams;M;40;Albany;single
    Jefferson;F;66;New York;married
    Adams;M;9;Albany;-
    Jefferson;M;30;Chicago;single
    Carter;F;26;Chicago;married
    Harrison;M;40;New York;married
    Roosevelt;F;15;Chicago;
    Monroe;M;8;Boston;-
    Arthur;M;20;Albany;married
    Pierce;M;18;New York;-
    Quincy;F;83;Albany;married
    McKinley;M;70;Boston;married
    Coolidge;M;4;Chicago;-
    Monroe;M;60;Chicago;single
    ----- end of file --------
  • un composant tReplicate, pour dupliquer les données d'entrée en deux flux de sortie, dont un affiché dans la console comme données non traitées et l'autre traité par un composant filtrant les colonnes.

  • un composant tFilterColumns, pour supprimer la ou les colonnes souhaitées, marriageStatus.

  • un composant tFilterRow, pour filtrer les deux tables de sortie des données :

    • une table liste tous les hommes âgés entre 10 et 80 ans dont le nom fait moins de neuf caractères.

    • l'autre table liste tous les enregistrements rejetés et affiche un message d'erreur expliquant la raison du rejet de chaque enregistrement.

  • trois composants tLogRow : le premier pour afficher les données non traitées, le deuxième pour afficher les enregistrements acceptés et le troisième pour afficher les enregistrements rejetés et les messages d'erreurs correspondants.

  • un composant tJava, pour afficher le résumé des informations.

La procédure présentée ci-dessous explique comment écrire ce script de Job dans l'éditeur de scripts de Jobs, en commençant par l'ajout des composants requis. Pour plus d'informations concernant la création d'un script de Job vide, consultez Créer un Job script.

Définir un composant d'entrée

Suivez les étapes ci-dessous pour ajouter et configurer le composant utilisé dans cet exemple.

  1. Dans l'éditeur de scripts de Jobs, saisissez une fonction addComponent {} et sa sous-fonction setComponentDefinition {} pour ajouter le composant d'entrée, tFileInputDelimited.

    // read input data
    addComponent {
    	setComponentDefinition {
    		TYPE: "tFileInputDelimited",
    		NAME: "tFileInputDelimited_1",
    		POSITION: 160, 192
    	}
    }
  2. À côté de la fonction setComponentDefinition {}, saisissez la fonction setSettings {} pour spécifier le chemin d'accès au fichier source, le nombre d'en-têtes et de pieds de page à ignorer et éventuellement le libellé du composant affiché dans l'espace de modélisation graphique.

    Comme présenté ci-dessous, ce composant lira un fichier CSV nommé sampleRecords.csv avec une ligne d'en-tête et une ligne de pied de page à ignorer. Le composant est renommé source_data.

    	setSettings {
    		FILENAME : "\"E:/Talend/Data/Input/sampleRecords.csv\"",
    		HEADER : "1",
    		FOOTER : "1",
    		LABEL : "source_data"
    	}

    Avertissement

    Assurez-vous d'utiliser la barre oblique inversée (\) lorsque vous spécifiez un métacaractère.

  3. À côté de la fonction setSettings {}, saisissez la fonction addSchema {} et ses sous-colonnes addColumn {} pour définir le schéma du composant.

    Dans cet exemple, le fichier source contient cinq colonnes, toutes non-nullables :

    • name, de type String

    • gender, de type String

    • age, de type Integer, d'une longueur de deux caractères

    • city, de type String

    • marriageStatus, de type String

    	addSchema {
    		NAME: "tFileInputDelimited_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "marriageStatus",
    			TYPE: "id_String"
    		}
    	}

Répliquer un flux de données d'entrée

  1. À côté de la définition du composant tFileInputDelimited, saisissez les fonctions suivantes pour ajouter et renommer le composant tReplicate.

    // replicate input data
    addComponent {
    	setComponentDefinition {
    		TYPE: "tReplicate",
    		NAME: "tReplicate_1",
    		POSITION: 288, 192
    	}
    	setSettings { 
    		LABEL : "replicate_flows"
    	}
    }
  2. À côté de la fonction setSettings {}, saisissez une fonction addSchema {} et sa sous-fonction addColumn {} pour définir la structure des données des flux de sortie.

    Comme ce composant passe uniquement le schéma du flux d'entrée aux flux de sortie, il vous suffit de copier les définitions de colonnes à partir de la définition du composant tFileInputDelimited.

    	addSchema {
    		NAME: "tReplicate_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "marriageStatus",
    			TYPE: "id_String"
    		}
    	}

Définir un composant filtrant les colonnes

  1. À côté de la définition du composant tReplicate, saisissez la fonction addComponent {} et ses fonctions setComponentDefinition {} et setSettings {} ainsi que ses paramètres pour ajouter et renommer le composant tFilterColumns.

    // filter data columns - remove 'marriageStatus'
    addComponent {
    	setComponentDefinition {
    		TYPE: "tFilterColumns",
    		NAME: "tFilterColumns_1",
    		POSITION: 512, 224
    	}
    	setSettings {
    		LABEL : "filter_columns"
    }
  2. À côté de la fonction setSettings {}, saisissez une fonction addSchema {} et ses sous-fonctions addColumn {} pour définir le schéma du flux de sortie.

    Dans cet exemple, copiez les paramètres des colonnes définis dans le composant précédent, après avoir supprimé la colonne marriageStatus.

    	addSchema {
    		NAME: "tFilterColumns_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    	}

Définir un composant filtrant les enregistrements

  1. À côté de la définition du composant tFilterColumns, saisissez une autre fonction addComponent {} et sa sous-fonction setComponentDefinition {} pour ajouter le composant tFilterRow.

    // filter records - accept men between 10 and 80 whose names are shorter than 9 characters
    addComponent {
    	setComponentDefinition {
    		TYPE: "tFilterRow",
    		NAME: "tFilterRow_1",
    		POSITION: 640, 224
    	}
    }
  2. À côté de la fonction setComponentDefinition {}, saisissez la fonction setSettings {} pour définir les conditions de filtre et renommer le composant.

    	setSettings {
    		LOGICAL_OP : "&&",
    		CONDITIONS {
    			INPUT_COLUMN : "name",
    			FUNCTION : "$source == null? false : $source.length() $operator $target",
    			OPERATOR : "<",
    			RVALUE : "9",
    			INPUT_COLUMN : "gender",
    			FUNCTION : "",
    			OPERATOR : "==",
    			RVALUE : "\"M\"",
    			INPUT_COLUMN : "age",
    			FUNCTION : "",
    			OPERATOR : ">",
    			RVALUE : "10",
    			INPUT_COLUMN : "age",
    			FUNCTION : "",
    			OPERATOR : "<",
    			RVALUE : "80"
    		},
    		LABEL : "filter_records"
    	}

    Avertissement

    Assurez-vous d'utiliser la barre oblique inversée (\) lorsque vous spécifiez un métacaractère.

  3. À côté de la fonction setSettings {}, saisissez le code du script suivant afin de définir les schémas pour les flux de sortie.

    Dans cet exemple, le composant tFilterRow a deux flux de sortie, un pour les enregistrements acceptés, qui possède la même structure du schéma que celle définie dans le composant précédent, tFilterColumns et l'autre pour les enregistrements rejetés, qui possède une colonne supplémentaire, errorMessage. La colonne errorMessage est obligatoire pour le flux de rejet et possède des propriétés fixes : même si vous ne définissez pas cette colonne, le Studio l'ajoutera automatiquement lors de la génération du Job.

    	// define the schema for the accepted records
    	addSchema {
    		NAME: "ACCEPT",
    		CONNECTOR: "FILTER"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    	}
    	
    	// define the schema for the rejected records
    	addSchema {
    		NAME: "REJECT",
    		CONNECTOR: "REJECT"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "errorMessage",
    			TYPE: "id_String",
    			NULLABLE: true,
    			LENGTH: 255,
    			PRECISION: 0
    		}
    	}

Définir des composants de sortie

Dans cet exemple, trois composants tLogRow sont utilisés pour afficher respectivement les enregistrements non traités, les enregistrements acceptés et les enregistrements rejetés, sous la forme de tables et un composant tJava est utilisé pour afficher le résumé des informations. Suivez la procédure présentée ci-dessous pour définir ces composants.

  1. Saisissez le code du script suivant pour ajouter et définir le premier composant tLogRow. Ce composant doit posséder la même structure de schéma que celle du composant tReplicate.

    // print unprocessed records
    addComponent {
    	setComponentDefinition {
    		TYPE: "tLogRow",
    		NAME: "tLogRow_1",
    		POSITION: 512, 160
    	}
    	setSettings {
    		BASIC_MODE : "false",
    		TABLE_PRINT : "true",
    		VERTICAL : "false",
    		PRINT_UNIQUE : "true",
    		LABEL : "unprocessed_records"
    	}
    	addSchema {
    		NAME: "tLogRow_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "marriageStatus",
    			TYPE: "id_String"
    		}
    	}
    }
  2. Saisissez le code du script pour ajouter et définir le deuxième composant tLogRow. Ce composant doit posséder la même structure de schéma que celle du flux ACCEPT du composant tFilterRow.

    // print accepted records
    addComponent {
    	setComponentDefinition {
    		TYPE: "tLogRow",
    		NAME: "tLogRow_2",
    		POSITION: 832, 224
    	}
    	setSettings {
    		BASIC_MODE : "false",
    		TABLE_PRINT : "true",
    		VERTICAL : "false",
    		PRINT_UNIQUE : "true",
    		LABEL : "accepted_records"
    	}
    	addSchema {
    		NAME: "tLogRow_2",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    	}
    }
  3. Saisissez le script suivant pour ajouter et définir le troisième composant tLogRow. Ce composant doit posséder la même structure de schéma que celle du flux REJECTED du composant tFilterRow.

    Avertissement

    La colonne errorMessage est requise et possède des propriétés fixes pour le flux de sortie REJECTED.

    // print rejected records
    addComponent {
    	setComponentDefinition {
    		TYPE: "tLogRow",
    		NAME: "tLogRow_3",
    		POSITION: 832, 288
    	}
    	setSettings {
    		BASIC_MODE : "false",
    		TABLE_PRINT : "true",
    		VERTICAL : "false",
    		PRINT_UNIQUE : "true",
    		LABEL : "rejected_records"
    	}
    	addSchema {
    		NAME: "tLogRow_3",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "errorMessage",
    			TYPE: "id_String",
    			NULLABLE: true,
    			LENGTH: 255,
    			PRECISION: 0
    		}
    	}
    }
  4. Saisissez le code du script suivant pour ajouter et définir le composant tJava. Ce composant appellera les variables globales des composants tLogRow pour afficher le nombre de lignes géré par chacun des composants. Lorsque ce composant n'a pas de colonne de schéma à définir, vous devez fournir le code Java à exécuter dans la fonction setSettings {}.

    // print summary
    addComponent {
    	setComponentDefinition {
    		TYPE: "tJava",
    		NAME: "tJava_1",
    		POSITION: 160, 416
    	}
    	setSettings {
    		CODE : "System.out.println(\"\\n\" +
    \"Total number of records    : \"+globalMap.get(\"tLogRow_1_NB_LINE\") + \"\\n\" +
    \"Number of records accepted : \"+globalMap.get(\"tLogRow_2_NB_LINE\") + \"\\n\" +
    \"Number of records rejected : \"+globalMap.get(\"tLogRow_3_NB_LINE\") + \"\\n\" + \"\\n\" );",
    		LABEL : "print_summary"
    	}
    	addSchema {
    		NAME: "tJava_1",
    		CONNECTOR: "FLOW"
    	}
    }

    Avertissement

    Assurez-vous d'utiliser la barre oblique inversée (\) lorsque vous spécifiez un métacaractère.

Définir des connexions reliant les composants

Suivez les étapes indiquées ci-dessous pour ajouter et définir les connexions reliant les composants et les sous-Jobs au Job.

  1. Saisissez le code du script suivant pour définir la connexion Row > Main entre le composant tFileInputDelimited et le composant tReplicate.

    // add connections
    addConnection {
    	TYPE: "FLOW",
    	NAME: "row1",
    	LINESTYLE: 0,
    	SOURCE: "tFileInputDelimited_1",
    	TARGET: "tReplicate_1"
    }
  2. Saisissez le code du script suivant pour définir une connexion Row > Main entre le composant tReplicate et le premier composant tLogRow.

    addConnection {
    	TYPE: "FLOW",
    	NAME: "row2",
    	LINESTYLE: 0,
    	SOURCE: "tReplicate_1",
    	TARGET: "tLogRow_1"
    }
  3. Saisissez le code du script suivant pour définir la connexion Row > Main entre le composant tReplicate et le composant tFilterColumns.

    addConnection {
    	TYPE: "FLOW",
    	NAME: "row3",
    	LINESTYLE: 0,
    	SOURCE: "tReplicate_1",
    	TARGET: "tFilterColumns_1"
    }
  4. Saisissez le code du script suivant pour définir la connexion Row > Main entre le composant tFilterColumns et le composant tFilterRow.

    addConnection {
    	TYPE: "FLOW",
    	NAME: "row4",
    	LINESTYLE: 0,
    	SOURCE: "tFilterColumns_1",
    	TARGET: "tFilterRow_1"
    }
  5. Saisissez le code du script suivant pour définir une connexion Row > Filter entre le composant tFilterRow et le deuxième composant tLogRow.

    addConnection {
    	TYPE: "FILTER",
    	NAME: "row5",
    	LINESTYLE: 0,
    	SOURCE: "tFilterRow_1",
    	TARGET: "tLogRow_2"
    }
  6. Saisissez le code du script suivant pour définir une connexion Row > Reject entre le composant tFilterRow et le troisième composant tLogRow.

    addConnection {
    	TYPE: "REJECT",
    	NAME: "row6",
    	LINESTYLE: 0,
    	SOURCE: "tFilterRow_1",
    	TARGET: "tLogRow_3"
    }
  7. Saisissez le code du script suivant pour définir une connexion Trigger > OnSubjobOk entre le composant tFileInputDelimited et le composant tJava pour relier les deux sous-Jobs afin que l'exécution réussie du premier sous-Job déclenche l'exécution du second.

    addConnection {
    	TYPE: "SUBJOB_OK",
    	NAME: "OnSubjobOk",
    	LINESTYLE: 1,
    	SOURCE: "tFileInputDelimited_1",
    	TARGET: "tJava_1"
    }

Renommer des sous-Jobs

De manière facultative, vous pouvez donner un titre à chacun des sous-Jobs qui composent votre Job afin de mieux identifier leur rôle.

  1. Affichez le titre du premier sous-Job et nommez-le Data processing.

    // label subjobs
    addSubjob {
    	NAME: "tFileInputDelimited_1"
    	SHOW_SUBJOB_TITLE : "true",
    	SUBJOB_TITLE : "Data processing"
    }
  2. Affichez le titre du second sous-Job et nommez-le Summary.

    addSubjob {
    	NAME: "tJava_1"
    	SHOW_SUBJOB_TITLE : "true",
    	SUBJOB_TITLE : "Summary"
    }

Générer et exécuter le Job

Vous devez maintenant définir tous les composants et toutes les connexions requis pour le Job, vous pouvez générer votre Job à partir du script de Job et l'exécuter dans votre Studio.

  1. Sauvegardez votre script de Job, puis cliquez-droit sur le script de Job dans l'arborescence Repository avant de sélectionner Generate Job dans le menu contextuel.

  2. Si nécessaire, double-cliquez sur le Job généré dans l'arborescence Repository et vérifiez la conception du Job dans l'onglet Designer.

  3. Appuyez sur F6 ou cliquez sur le bouton Run dans la console Run pour exécuter le Job.

    Après réussite de l'exécution du Job, la console Run devrait afficher les enregistrements non traités, les enregistrements acceptés, les enregistrements rejetés avec les raisons des rejets et le résumé des informations.