Creating a Job script to filter data records - 6.4

Talend Big Data Platform Studio User Guide

EnrichVersion
6.4
EnrichProdName
Talend Big Data Platform
task
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

This example shows how to define a Job that will read a CSV file and filter the data records based on given conditions. Then, the Job will display the summary information: the total number of records read from the source file, the number of accepted records, and the number of rejected records.

The Job will contain the following components:

  • a tFileInputDelimited component to read the source CSV file that contains people information. The source file contains five columns, as shown below:

    name;gender;age;city;marriageStatus
    Van Buren;M;73;Chicago;married
    Adams;M;40;Albany;single
    Jefferson;F;66;New York;married
    Adams;M;9;Albany;-
    Jefferson;M;30;Chicago;single
    Carter;F;26;Chicago;married
    Harrison;M;40;New York;married
    Roosevelt;F;15;Chicago;
    Monroe;M;8;Boston;-
    Arthur;M;20;Albany;married
    Pierce;M;18;New York;-
    Quincy;F;83;Albany;married
    McKinley;M;70;Boston;married
    Coolidge;M;4;Chicago;-
    Monroe;M;60;Chicago;single
    ----- end of file --------
  • a tReplicate component, to duplicate the input data into two output flows, one of which is displayed on the console as unprocessed data, and the other goes to a column filter for processing.

  • a tFilterColumns component, to remove an unwanted column, marriageStatus.

  • a tFilterRow component, to filter the data output two tables:

    • one lists all male persons with a last name shorter than nine characters and aged between 10 and 80 years.

    • the other lists all rejected records, with an error message for each rejected record to explain why the record has been rejected.

  • three tLogRow components: the first one to display the unprocessed data, the second one to display the accepted records, and the third one to display the rejected records and the corresponding error messages.

  • a tJava component, to display the summary information.

The procedures below demonstrate how to write this Job script in the Job script editor, starting from adding the required components. For how to create an empty Job script, see How to create a Job script.

Defining the input component

Follow the steps below to add and configure the component used in this example.

  1. In the Job script edit, enter an addComponent {} function and its setComponentDefinition {} sub-function to add the input component, tFileInputDelimited.

    // read input data
    addComponent {
    	setComponentDefinition {
    		TYPE: "tFileInputDelimited",
    		NAME: "tFileInputDelimited_1",
    		POSITION: 160, 192
    	}
    }
  2. Next to the setComponentDefinition {} function, enter the setSettings {} function to specify the path to the source file, the number of header and footer rows to skip, and optionally the label of the component displayed in the design workspace.

    As shown below, this component will read a CSV file named sampleRecords.csv, which has a header row and a footer row to skip, and the component will be labelled source_data.

    	setSettings {
    		FILENAME : "\"E:/Talend/Data/Input/sampleRecords.csv\"",
    		HEADER : "1",
    		FOOTER : "1",
    		LABEL : "source_data"
    	}

    Warning

    Be sure to use a backslash (\) when specifying a metacharacter.

  3. Next to the setSettings {} function, enter in an addSchema {} function and its addColumn {} sub-functions to define the component schema.

    In this example, the source file has five columns, all being non-nullable:

    • name, type String

    • gender, type String

    • age, type Integer, two characters long

    • city, type String

    • marriageStatus, type String

    	addSchema {
    		NAME: "tFileInputDelimited_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "marriageStatus",
    			TYPE: "id_String"
    		}
    	}

Replicating the input data flow

  1. Next to the tFileInputDelimited component definition, enter the following functions to add and label the tReplicate component.

    // replicate input data
    addComponent {
    	setComponentDefinition {
    		TYPE: "tReplicate",
    		NAME: "tReplicate_1",
    		POSITION: 288, 192
    	}
    	setSettings { 
    		LABEL : "replicate_flows"
    	}
    }
  2. Next to the setSettings {} function, enter an addSchema {} function and its addColumn {} sub-function to define the data structure of the output flows.

    As this component only passes on the schema of the input flow to the output flows, just copy the column definitions from the tFileInputDelimited component definition.

    	addSchema {
    		NAME: "tReplicate_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "marriageStatus",
    			TYPE: "id_String"
    		}
    	}

Defining the column filter component

  1. Next to the tReplicate component definition, enter the addComponent {} function and its setComponentDefinition {} and setSettings {} functions and parameters to add and label the tFilterColumns component.

    // filter data columns - remove 'marriageStatus'
    addComponent {
    	setComponentDefinition {
    		TYPE: "tFilterColumns",
    		NAME: "tFilterColumns_1",
    		POSITION: 512, 224
    	}
    	setSettings {
    		LABEL : "filter_columns"
    }
  2. Next to the setSettings {} function, enter an addSchema {} function and its addColumn {} sub-functions to define the schema for the output flow.

    In this example, just copy the column settings defined in the previous component, with the marriageStatus column removed.

    	addSchema {
    		NAME: "tFilterColumns_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    	}

Defining the record filter component

  1. Next to the tFilterColumns component definition, enter another addComponent {} function and its setComponentDefinition {} sub-function to add the tFilterRow component.

    // filter records - accept men between 10 and 80 whose names are shorter than 9 characters
    addComponent {
    	setComponentDefinition {
    		TYPE: "tFilterRow",
    		NAME: "tFilterRow_1",
    		POSITION: 640, 224
    	}
    }
  2. Next to the setComponentDefinition {} function, enter the setSettings {} function to define the filter conditions and label the component.

    	setSettings {
    		LOGICAL_OP : "&&",
    		CONDITIONS {
    			INPUT_COLUMN : "name",
    			FUNCTION : "$source == null? false : $source.length() $operator $target",
    			OPERATOR : "<",
    			RVALUE : "9",
    			INPUT_COLUMN : "gender",
    			FUNCTION : "",
    			OPERATOR : "==",
    			RVALUE : "\"M\"",
    			INPUT_COLUMN : "age",
    			FUNCTION : "",
    			OPERATOR : ">",
    			RVALUE : "10",
    			INPUT_COLUMN : "age",
    			FUNCTION : "",
    			OPERATOR : "<",
    			RVALUE : "80"
    		},
    		LABEL : "filter_records"
    	}

    Warning

    Be sure to use a backslash (\) when specifying a metacharacter.

  3. Next to the setSettings {} function, enter the following script code to define the schemas for the output flows.

    In this example, the tFilterRow component has two output flows, one for accepted records, which has the same schema structure as defined in the previous component, tFilterColumns, and the other for rejected records, which has one more column, errorMessage. The errorMessage column is mandatory for the reject flow and it has fixed properties - even if you don't define this column, the Studio will automatically add it when generating the Job.

    	// define the schema for the accepted records
    	addSchema {
    		NAME: "ACCEPT",
    		CONNECTOR: "FILTER"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    	}
    	
    	// define the schema for the rejected records
    	addSchema {
    		NAME: "REJECT",
    		CONNECTOR: "REJECT"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "errorMessage",
    			TYPE: "id_String",
    			NULLABLE: true,
    			LENGTH: 255,
    			PRECISION: 0
    		}
    	}

Defining the output components

In this example, three tLogRow components are used to display the unprocessed records, the accepted records and the rejected records respectively, in the form of tables, and a tJava component is used to display the summary information. Use the procedure below to define these components.

  1. Enter the following script code to add and define the first tLogRow component. This component must have the same schema structure as the tReplicate component.

    // print unprocessed records
    addComponent {
    	setComponentDefinition {
    		TYPE: "tLogRow",
    		NAME: "tLogRow_1",
    		POSITION: 512, 160
    	}
    	setSettings {
    		BASIC_MODE : "false",
    		TABLE_PRINT : "true",
    		VERTICAL : "false",
    		PRINT_UNIQUE : "true",
    		LABEL : "unprocessed_records"
    	}
    	addSchema {
    		NAME: "tLogRow_1",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "marriageStatus",
    			TYPE: "id_String"
    		}
    	}
    }
  2. Enter the following script code to add and define the second tLogRow component. This component must have the same schema structure as the ACCEPT flow of the tFilterRow component.

    // print accepted records
    addComponent {
    	setComponentDefinition {
    		TYPE: "tLogRow",
    		NAME: "tLogRow_2",
    		POSITION: 832, 224
    	}
    	setSettings {
    		BASIC_MODE : "false",
    		TABLE_PRINT : "true",
    		VERTICAL : "false",
    		PRINT_UNIQUE : "true",
    		LABEL : "accepted_records"
    	}
    	addSchema {
    		NAME: "tLogRow_2",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    	}
    }
  3. Enter the following script code to add and define the third tLogRow component. This component must have the same schema structure as the REJECTED flow of the tFilterRow component.

    Warning

    The errorMessage column is required and it has fixed properties for the REJECTED output flow.

    // print rejected records
    addComponent {
    	setComponentDefinition {
    		TYPE: "tLogRow",
    		NAME: "tLogRow_3",
    		POSITION: 832, 288
    	}
    	setSettings {
    		BASIC_MODE : "false",
    		TABLE_PRINT : "true",
    		VERTICAL : "false",
    		PRINT_UNIQUE : "true",
    		LABEL : "rejected_records"
    	}
    	addSchema {
    		NAME: "tLogRow_3",
    		CONNECTOR: "FLOW"
    		addColumn {
    			NAME: "name",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "gender",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "age",
    			TYPE: "id_Integer",
    			LENGTH: 2
    		}
    		addColumn {
    			NAME: "city",
    			TYPE: "id_String"
    		}
    		addColumn {
    			NAME: "errorMessage",
    			TYPE: "id_String",
    			NULLABLE: true,
    			LENGTH: 255,
    			PRECISION: 0
    		}
    	}
    }
  4. Enter the following script code to add and define the tJava component. This component will call the global variables of the tLogRow components to show the number of rows each of them has handled. While this component doesn't have any schema columns to define, you need to provide the Java code to be executed in the setSettings {} function.

    // print summary
    addComponent {
    	setComponentDefinition {
    		TYPE: "tJava",
    		NAME: "tJava_1",
    		POSITION: 160, 416
    	}
    	setSettings {
    		CODE : "System.out.println(\"\\n\" +
    \"Total number of records    : \"+globalMap.get(\"tLogRow_1_NB_LINE\") + \"\\n\" +
    \"Number of records accepted : \"+globalMap.get(\"tLogRow_2_NB_LINE\") + \"\\n\" +
    \"Number of records rejected : \"+globalMap.get(\"tLogRow_3_NB_LINE\") + \"\\n\" + \"\\n\" );",
    		LABEL : "print_summary"
    	}
    	addSchema {
    		NAME: "tJava_1",
    		CONNECTOR: "FLOW"
    	}
    }

    Warning

    Be sure to use a backslash (\) when specifying a metacharacter.

Defining the connections linking the components

Follow the steps below to add and define the connections that link the components and subjobs in the Job.

  1. Enter the following script code to define a Main row connection between the tFileInputDelimited component and the tReplicate component.

    // add connections
    addConnection {
    	TYPE: "FLOW",
    	NAME: "row1",
    	LINESTYLE: 0,
    	SOURCE: "tFileInputDelimited_1",
    	TARGET: "tReplicate_1"
    }
  2. Enter the following script code to define a Main row connection between the tReplicate component and the first tLogRow component.

    addConnection {
    	TYPE: "FLOW",
    	NAME: "row2",
    	LINESTYLE: 0,
    	SOURCE: "tReplicate_1",
    	TARGET: "tLogRow_1"
    }
  3. Enter the following script code to define a Main row connection between the tReplicate component and the tFilterColumns component.

    addConnection {
    	TYPE: "FLOW",
    	NAME: "row3",
    	LINESTYLE: 0,
    	SOURCE: "tReplicate_1",
    	TARGET: "tFilterColumns_1"
    }
  4. Enter the following script code to define a Main row connection between the tFilterColumns component and the tFilterRow component.

    addConnection {
    	TYPE: "FLOW",
    	NAME: "row4",
    	LINESTYLE: 0,
    	SOURCE: "tFilterColumns_1",
    	TARGET: "tFilterRow_1"
    }
  5. Enter the following script code to define a Filter row connection between the tFilterRow component and the second tLogRow component.

    addConnection {
    	TYPE: "FILTER",
    	NAME: "row5",
    	LINESTYLE: 0,
    	SOURCE: "tFilterRow_1",
    	TARGET: "tLogRow_2"
    }
  6. Enter the following script code to define a Reject row connection between the tFilterRow component and the third tLogRow component.

    addConnection {
    	TYPE: "REJECT",
    	NAME: "row6",
    	LINESTYLE: 0,
    	SOURCE: "tFilterRow_1",
    	TARGET: "tLogRow_3"
    }
  7. Enter the following script code to define an OnSubjobOk trigger connection between the tFileInputDelimited component and the tJava component to link the two subjobs so that the successful execution of the first subjob triggers the execution of the second one.

    addConnection {
    	TYPE: "SUBJOB_OK",
    	NAME: "OnSubjobOk",
    	LINESTYLE: 1,
    	SOURCE: "tFileInputDelimited_1",
    	TARGET: "tJava_1"
    }

Labelling the subjobs

Optionally, you can give a title to each of the subjobs that compose your Job to better identify their roles.

  1. Enable the title display for the first subjob and give it a title Data processing.

    // label subjobs
    addSubjob {
    	NAME: "tFileInputDelimited_1"
    	SHOW_SUBJOB_TITLE : "true",
    	SUBJOB_TITLE : "Data processing"
    }
  2. Enable the title display for the second subjob and give it a title Summary.

    addSubjob {
    	NAME: "tJava_1"
    	SHOW_SUBJOB_TITLE : "true",
    	SUBJOB_TITLE : "Summary"
    }

Generating and executing the Job

Now you have set up all the components and connections required for the Job, you can generate your Job from the Job script and execute it in your Studio.

  1. Save your Job script, then right-click the Job script in the Repository tree view, and select Generate Job from the contextual menu.

  2. If needed, double-click the generated Job in the Repository tree view and check the Job design in the Designer tab view.

  3. Press F6 or click the Run button on the Run console to execute the Job.

    Upon successful execution of the Job, the Run console should display the unprocessed records, the accepted records, the rejected records along with the rejection reasons, and the summary information.