tTop - 6.1

Talend Components Reference Guide

EnrichVersion
6.1
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for Data Quality
Talend Open Studio for ESB
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
task
Data Governance
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

Warning

This component will be available in the Palette of Talend Studio on the condition that you have subscribed to one of the Talend solutions with Big Data.

Function

tTop sorts input records based on their schema columns and sends to its following component a given number of first rows of the sorted records.

Purpose

This component sorts data and outputs several rows from the first one of this data.

If you have subscribed to one of the Talend solutions with Big Data, this component is available in the following types of Jobs:

tTop properties in MapReduce Jobs

Component family

Processing

 

Basic settings

Schema and Edit Schema

A schema is a row description. It defines the number of fields (columns) to be processed and passed on to the next component. The schema is either Built-In or stored remotely in the Repository.

Click Edit schema to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this option to view the schema only.

  • Change to built-in property: choose this option to change the schema to Built-in for local changes.

  • Update repository connection: choose this option to change the schema stored in the repository and decide whether to propagate the changes to all the Jobs upon completion. If you just want to propagate the changes to the current Job, you can select No upon completion and choose this schema metadata again in the [Repository Content] window.

Click Sync columns to retrieve the schema from the previous component connected in the Job.

 

 

Built-In: You create and store the schema locally for this component only. Related topic: see Talend Studio User Guide.

 

 

Repository: You have already created the schema and stored it in the Repository. You can reuse it in various projects and Job designs. Related topic: see Talend Studio User Guide.

 

Number of line selected

Enter the number of rows to be outputted. The current component selects this number of rows down from the first rows of the sorted data.

Criteria

Click [+] to add as many lines as required for the sort to be completed.

In the Schema column column, select the column from your schema, which the sort will be based on. Note that the order is essential as it determines the sorting priority.

In the other columns, select how you need the data to be sorted. For example, if you need to sort the data in ascending alphabetical order (from A to Z), select alpha and asc in the corresponding columns.

Usage in MapReduce Jobs

In a Talend Map/Reduce Job, this component is used as an intermediate step and other components used along with it must be Map/Reduce components, too. They generate native Map/Reduce code that can be executed directly in Hadoop.

This connection is effective on a per-Job basis.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents only Standard Jobs, that is to say traditional Talend data integration Jobs, and non Map/Reduce Jobs.

Log4j

If you are using a subscription-based version of the Studio, the activity of this component can be logged using the log4j feature. For more information on this feature, see Talend Studio User Guide.

For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.

Hadoop Connection

You need to use the Hadoop Configuration tab in the Run view to define the connection to a given Hadoop distribution for the whole Job.

This connection is effective on a per-Job basis.

Related scenarios

No scenario is available for the Map/Reduce version of this component yet.

tTop properties in Spark Batch Jobs

Component family

Processing

 

Basic settings

Schema and Edit Schema

A schema is a row description. It defines the number of fields (columns) to be processed and passed on to the next component. The schema is either Built-In or stored remotely in the Repository.

Click Edit schema to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this option to view the schema only.

  • Change to built-in property: choose this option to change the schema to Built-in for local changes.

  • Update repository connection: choose this option to change the schema stored in the repository and decide whether to propagate the changes to all the Jobs upon completion. If you just want to propagate the changes to the current Job, you can select No upon completion and choose this schema metadata again in the [Repository Content] window.

Click Sync columns to retrieve the schema from the previous component connected in the Job.

 

 

Built-In: You create and store the schema locally for this component only. Related topic: see Talend Studio User Guide.

 

 

Repository: You have already created the schema and stored it in the Repository. You can reuse it in various projects and Job designs. Related topic: see Talend Studio User Guide.

 

Number of line selected

Enter the number of rows to be outputted. The current component selects this number of rows down from the first rows of the sorted data.

Criteria

Click [+] to add as many lines as required for the sort to be completed.

In the Schema column column, select the column from your schema, which the sort will be based on. Note that the order is essential as it determines the sorting priority.

In the other columns, select how you need the data to be sorted. For example, if you need to sort the data in ascending alphabetical order (from A to Z), select alpha and asc in the corresponding columns.

Usage in Spark Batch Jobs

In a Talend Spark Batch Job, this component is used as an intermediate step and other components used along with it must be Spark Batch components, too. They generate native Spark Batch code that can be executed directly in a Spark cluster.

This component, along with the Spark Batch component Palette it belongs to, appears only when you are creating a Spark Batch Job.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents only Standard Jobs, that is to say traditional Talend data integration Jobs.

Log4j

If you are using a subscription-based version of the Studio, the activity of this component can be logged using the log4j feature. For more information on this feature, see Talend Studio User Guide.

For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.

Spark Connection

You need to use the Spark Configuration tab in the Run view to define the connection to a given Spark cluster for the whole Job. In addition, since the Job expects its dependent jar files for execution, one and only one file system related component from the Storage family is required in the same Job so that Spark can use this component to connect to the file system to which the jar files dependent on the Job are transferred:

This connection is effective on a per-Job basis.

Performing download analysis

In this scenario, you create a Spark Batch Job to analyze how often a given product is downloaded.

In this Job, you analyze the download preference of some specific customers known to your customer base.

The sample data used as the customer base is as follows:

10103|Herbert|Clinton|FR|SILVER|28-06-2011|herbert.clinton@msn.com|6571183
10281|Bill|Ford|BE|PLATINUM|13-04-2014|bill.ford@gmail.com|6360604
10390|George|Garfield|GB|SILVER|12-02-2011|george.garfield@gmail.com|7919508
10566|Abraham|Garfield|CN|SILVER|11-10-2012|abraham.garfield@msn.com|9155569
10691|John|Polk|GB|SILVER|05-11-2012|john.polk@gmail.com|6488579
10884|Herbert|Hayes|GB|SILVER|12-10-2007|herbert.hayes@gmail.com|8728181
11020|Chester|Roosevelt|BE|GOLD|28-06-2008|chester.roosevelt@yahoo.com|4172181
11316|Franklin|Madison|BR|SILVER|08-01-2014|franklin.madison@gmail.com|4711801
11707|James|Tyler|ES|GOLD|25-03-2010|james.tyler@gmail.com|7276942
11764|Theodore|McKinley|GB|GOLD|24-08-2013|theodore.mckinley@gmail.com|3224767
11777|Warren|Madison|BE|N/A|23-12-2008|warren.madison@msn.com|6695520
11857|Ronald|Arthur|SG|PLATINUM|01-04-2009|ronald.arthur@msn.fr|6704785
11936|Theodore|Buchanan|NL|SILVER|14-11-2014|theodore.buchanan@yahoo.fr|2783553
11940|Lyndon|Wilson|BR|PLATINUM|27-07-2010|lyndon.wilson@yahoo.com|1247110
12214|Gerald|Jefferson|SG|N/A|06-06-2007|gerald.jefferson@yahoo.com|5879162
12382|Herbert|Taylor|IT|GOLD|22-04-2012|herbert.taylor@msn.com|3873628
12475|Richard|Kennedy|FR|N/A|29-12-2014|richard.kennedy@yahoo.fr|7287388
12479|Calvin|Eisenhower|ES|N/A|06-11-2008|calvin.eisenhower@yahoo.fr|1792573
12531|Chester|Arthur|JP|PLATINUM|23-01-2009|chester.arthur@msn.fr|8772326
12734|Jimmy|Buchanan|IT|SILVER|09-03-2010|jimmy.buchanan@gmail.com|7007786

This data contains these customers' ID numbers known to this customer base, their first and last names and country codes, their support levels and registration dates, their email addresses and phone numbers.

The sample web-click log of some of these customers reads as follows:

10103|/download/products/talend-open-studio
10281|/services/technical-support
10390|/services/technical-support
10566|/download/products/data-integration
10691|/services/training
10884|/download/products/integration-cloud
11020|/services/training
11316|/download/products/talend-open-studio
11707|/download/products/talend-open-studio
11764|/customers

This data contains the ID numbers of the customers who visited different Talend web pages and the pages they visited.

By reading this data, you can find that the visits come from customers of different support-levels for different purposes. The Job to be designed is used to identify the sources of these visits against the sample customer base and analyze which product is most downloaded by the Silver-level customers.

Note that the sample data is created for demonstration purposes only.

To replicate this scenario, proceed as follows:

Linking the components

  1. In the Integration perspective of the Studio, create an empty Spark Batch Job from the Job Designs node in the Repository tree view.

    For further information about how to create a Spark Batch Job, see Talend Big Data Getting Started Guide.

  2. In the workspace, enter the name of the component to be used and select this component from the list that appears. In this scenario, the components are tHDFSConfiguration, two tFixedFlowInput components (label one to customer_base and the other to web_data), tSqlRow, tCacheOut, tCacheIn, tMap, tExtractDelimitedFields, tAggregateRow, tTop and tLogRow.

    The tFixedFlowInput components are used to load the sample data into the data flow. In the real-world practice, you can use other components such as tMysqlInput, alone or even with a tMap, in the place of tFixedFlowInput to design a sophisticated process to prepare your data to be processed.

  3. Connect customer_base (tFixedFlowInput), tSqlRow and tCacheOut using the Row > Main link. In this Subjob, the records about the Silver-level customers are selected and stored in cache.

  4. Connect web_data (tFixedFlowInput) to tMap using the Row > Main link. This is the main input flow to the tMap component.

  5. Do the same to connect tCacheIn to tMap. This is the lookup flow to tMap.

  6. Connect tMap to tExtractDelimitedFields using the Row > Main link and name this connection in the dialog box that is displayed. For example, name it to output.

  7. Connect tExtractDelimitedFields, tAggregateRow, tTop and tLogRow using the Row > Main link.

  8. Connect customer_base to web_data using the Trigger > OnSubjobOk link.

  9. Leave the tHDFSConfiguration component alone without any connection.

Setting up Spark connection

  1. Click Run to open its view and then click the Spark Configuration tab to display its view for configuring the Spark connection.

    This view looks like the image below:

  2. Select the type of the Spark cluster you need to connect to.

    • Local: the Studio builds the Spark environment in itself at runtime to run the Job locally within the Studio. With this mode, each processor of the local machine is used as a Spark worker to perform the computations. This mode requires minimum parameters to be set in this configuration view.

      Note this local machine is the machine in which the Job is actually run. The Local mode is the default mode and you need to clear its check box to display the drop-down list for you to select the other modes.

    • Standalone: the Studio connects to a Spark-enabled cluster to run the Job from this cluster.

    • Yarn client: the Studio runs the Spark driver to orchestrate how the Job should be performed and then send the orchestration to the Yarn service of a given Hadoop cluster so that the Resource Manager of this Yarn service requests execution resources accordingly.

  3. If you are using the Yarn client mode, the Property type list is displayed to allow you to select an established Hadoop connection from the Repository, on the condition that you have created this connection in the Repository. Then the Studio will reuse that set of connection information for this Job.

    For further information about how to create an Hadoop connection in Repository, see the chapter describing the Hadoop cluster node of the Talend Studio User Guide.

  4. Select the version of the Hadoop distribution you are using along with Spark.

    If you cannot find the distribution corresponding to yours from this drop-down list, this means the distribution you want to connect to is not officially supported by Talend. In this situation, you can select Custom, then select the Spark version of the cluster to be connected and click the button to display the dialog box in which you can alternatively:

    1. Select Import from existing version to import an officially supported distribution as base and then add other required jar files which the base distribution does not provide.

    2. Select Import from zip to import the configuration zip for the custom distribution to be used. This zip file should contain the libraries of the different Hadoop/Spark elements and the index file of these libraries.

      Note that custom versions are not officially supported by Talend. Talend and its community provide you with the opportunity to connect to custom versions from the Studio but cannot guarantee that the configuration of whichever version you choose will be easy. As such, you should only attempt to set up such a connection if you have sufficient Hadoop and Spark experience to handle any issues on your own.

  5. Configure the connection information to the principal services of the cluster to be used.

    If you are using the Yarn client mode, you need to enter the addresses of the following different services in their corresponding fields (if you leave the check box of a service clear, then at runtime, the configuration about this parameter in the Hadoop cluster to be used will be ignored ):

    • In the Resource manager field, enter the address of the ResourceManager service of the Hadoop cluster to be used.

    • Select the Set resourcemanager scheduler address check box and enter the Scheduler address in the field that appears.

    • Select the Set jobhistory address check box and enter the location of the JobHistory server of the Hadoop cluster to be used. This allows the metrics information of the current Job to be stored in that JobHistory server.

    • Select the Set staging directory check box and enter this directory defined in your Hadoop cluster for temporary files created by running programs. Typically, this directory can be found under the yarn.app.mapreduce.am.staging-dir property in the configuration files such as yarn-site.xml or mapred-site.xml of your distribution.

    • If you are accessing the Hadoop cluster running with Kerberos security, select this check box, then, enter the Kerberos principal names for the ResourceManager service and the JobHistory service in the displayed fields. This enables you to use your user name to authenticate against the credentials stored in Kerberos. These principals can be found in the configuration files of your distribution. For example, in a CDH4 distribution, the Resource manager principal is set in the yarn-site.xml file and the Job history principal in the mapred-site.xml file.

      If you need to use a Kerberos keytab file to log in, select Use a keytab to authenticate. A keytab file contains pairs of Kerberos principals and encrypted keys. You need to enter the principal to be used in the Principal field and the access path to the keytab file itself in the Keytab field.

      Note that the user that executes a keytab-enabled Job is not necessarily the one a principal designates but must have the right to read the keytab file being used. For example, the user name you are using to execute a Job is user1 and the principal to be used is guest; in this situation, ensure that user1 has the right to read the keytab file to be used.

    • The User name field is available when you are not using Kerberos to authenticate. In the User name field, enter the login user name for your distribution. If you leave it empty, the user name of the machine hosting the Studio will be used.

      Since the Job needs to upload jar files to HDFS of the cluster to be used, you must ensure that this user name is the same as the one you have put in tHDFSConfiguration, the component used to provides HDFS connection information to Spark.

    If you are using the Standalone mode, you need to set the following parameters:

    • In the Spark host field, enter the URI of the Spark Master of the Hadoop cluster to be used.

    • In the Spark home field, enter the location of the Spark executable installed in the Hadoop cluster to be used.

  6. If you need to run the current Job on Windows, it is recommended to specify where the winutils.exe program to be used is stored.

    • If you know where to find your winutils.exe file and you want to use it, select the Define the Hadoop home directory check box and enter the directory where your winutils.exe is stored.

    • Otherwise, leave this check box clear, the Studio generates one by itself and automatically uses it for this Job.

  7. If the Spark cluster cannot recognize the machine in which the Job is launched, select this Define the driver hostname or IP address check box and enter the host name or the IP address of this machine. This allows the Spark master and its workers to recognize this machine to find the Job and thus its driver.

    Note that in this situation, you also need to add the name and the IP address of this machine to its host file.

  8. Select the Set Tuning properties check box to optimize the allocation of the resources to be used to run this Job. These properties are not mandatory for the Job to run successfully, but they are useful when Spark is bottlenecked by any resource issue in the cluster such as CPU, bandwidth or memory:

    • Driver memory and Driver core: enter the allocation size of memory and the number of cores to be used by the driver of the current Job.

    • Executor memory: enter the allocation size of memory to be used by each Spark executor.

    • Core per executor: select this check box and in the displayed field, enter the number of cores to be used by each executor. If you leave this check box clear, the default allocation defined by Spark is used, for example, all available cores are used by one single executor in the Standalone mode.

    • Set Web UI port: if you need to change the default port of the Spark Web UI, select this check box and enter the port number you want to use.

    • Broadcast factory: select the broadcast implementation to be used to cache variables on each worker machine.

    • Customize Spark serializer: if you need to import an external Spark serializer, select this check box and in the field that is displayed, enter the fully qualified class name of the serializer to be used.

    • Yarn resource allocation: select how you want Yarn to allocate resources among executors.

      • Auto: you leave Yarn to manage the allocation by itself.

      • Fixed: you need to enter the number of executors to be used in the Num executors that is displayed.

      • Dynamic: Yarn adapts the number of executors to suit the workload. You need to define the scale of this dynamic allocation by defining the initial number of executors to run in the Initial executors field, the lowest number of executors in the Min executors field and the largest number of executors in the Max executors field.

      This feature is available to the Yarn client mode only.

  9. In the Spark "scratch" directory field, enter the directory in which the Studio stores in the local system the temporary files such as the jar files to be transferred. If you launch the Job on Windows, the default disk is C:. So if you leave /tmp in this field, this directory is C:/tmp.

  10. Add any Spark properties you need to use to override their default counterparts used by the Studio.

    For example, if you are using the Yarn client mode with a CDH distribution, you need to specify the Yarn classpath of your cluster for the Job. The property to be added is spark.hadoop.yarn.application.classpath. Please contact the administrator of your cluster to obtain related information.

    The following value from a Cloudera cluster is presented for demonstration purposes:

    /etc/hadoop/conf,/usr/lib/hadoop/*,/usr/lib/hadoop/lib/*,/usr/lib/hadoop-hdfs/*,/usr/lib/hadoop-hdfs/lib/*,/usr/lib/hadoop-mapreduce/*,/usr/lib/hadoop-mapreduce/lib/*,/usr/lib/hadoop-yarn/*,/usr/lib/hadoop-yarn/lib/*,/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/*

    If you want the Spark application logs of this Job to be persistent in the file system, add the related properties to this Advanced properties table. For example, the properties to be set for the Yarn client mode are:

    • spark.yarn.historyServer.address

    • spark.eventLog.enabled

    • spark.eventLog.dir

    The value of the spark.eventlog.enabled property should be true; for the values of the other two properties, contact the administrator of the Spark cluster to be used.

    For further information about the valid Spark properties, see Spark documentation at https://spark.apache.org/docs/latest/configuration.

Configuring the connection to the file system to be used by Spark

  1. Double-click tHDFSConfiguration to open its Component view. Note that tHDFSConfiguration is used because the Spark Yarn client mode is used to run Spark Jobs in this scenario.

    Spark uses this component to connect to the HDFS system to which the jar files dependent on the Job are transferred.

  2. In the Version area, select the Hadoop distribution you need to connect to and its version.

  3. In the NameNode URI field, enter the location of the machine hosting the NameNode service of the cluster.

  4. In the Username field, enter the authentication information used to connect to the HDFS system to be used. Note that the user name must be the same as you have put in the Spark configuration tab.

Loading the customer base

  1. Double-click the tFixedFlowIput component labeled customer_base to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. Click the [+] button to add the schema columns as shown in this image.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Mode area, select the Use Inline Content radio button and paste the above-mentioned sample customer base data into the Content field that is displayed.

  6. In the Field separator field, enter a vertical bar (|).

Selecting the Silver-level customer data

  1. Double-click tSqlRow to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. In the schema on the output side (the right side), change the column name Support to Silver_Support.

  4. In the SQL Query field, enter the query statement to be used to select the records about the Silver-level customers.

    select User_id, FirstName, LastName, Country, Support as Silver_Support, SubscriptionDate, email, telephone from row1 where Support = 'SILVER'

    You can read that the input link row1 is actually taken as the table in which this query is performed.

Accessing the selected data

  1. Double-click tCacheOut to open its Component view.

    This component stores the selected data into the cache.

  2. Click the [...] button next to Edit schema to open the schema editor to verify the schema is identical to the input one. If not so, click Sync columns.

  3. On the output side of the schema editor, click the button to export the schema to the local file system and click OK to close the editor.

  4. From the Storage level list, select Memory only.

    For further information about each of the storage level, see https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence.

  5. Double-click tCacheIn to open its Component view.

  6. Click the [...] button next to Edit schema to open the schema editor and click the button to import the schema you exported in the previous step. Then click OK to close the editor.

  7. From the Output cache list, select the tCacheOut component from which you need to read the cached data. At runtime, this data is loaded into the lookup flow of the Subjob that is used to process the web-click log.

Loading the web-click log

  1. Double-click the tFixedFlowIput component labeled web_data to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. Click the [+] button to add the schema columns as shown in this image.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Mode area, select the Use Inline Content radio button and paste the above-mentioned sample data about the web-click log into the Content field that is displayed.

  6. In the Field separator field, enter a vertical bar (|).

Joining the data

  1. Double-click tMap to open its Map editor.

    On the input side (the left side), the main flow (labeled row3 in this example) and the lookup flow (labeled row4 in this example) are presented as two tables.

    On the output side (the right side), an empty table is present.

  2. Drop all of the columns of the schema of the lookup flow into the output flow table on the right side, except the User_id column, and drop the user_id column and the url column from schema of the main flow into the same output flow table.

  3. On the left side, drop the user_id column from the main flow table into the Expr.key column in the User_id row in the lookup flow table. This makes the ID numbers of the customers the key for the join of the two input flows.

  4. In the lookup flow table, click the wrench icon to display the panel for the lookup settings and select Inner Join for the Join model property.

  5. Click Apply to validate these changes and click OK to close this editor.

Extracting the fields about the categories of the visited pages

  1. Double-click tExtractDelimitedFields to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. On the output side, click the [+] button four times to add four rows to the output schema table and rename these new schema columns to root, page, specialization and product, respectively. These columns are used to carry the fields extracted from the url column in the input flow.

  4. Click OK to validate these changes.

  5. From the Prev.Comp.Column.List list, select the column you need to extract data from. In this example, it is url from the input schema.

  6. In the Field separator field, enter a slash (/).

Counting the download of each product

  1. Double-click tAggregateRow to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. On the output side, click the [+] button two times to add two rows to the output schema table and rename these new schema columns to product and nb_download, respectively.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Group by table, add one row by clicking the [+] button and select product for both the Output column column and the Input column position column. This passes data from the product column of the input schema to the product column of the output schema.

  6. In the Operations table, add one row by clicking the [+] button.

  7. In the Output column column, select nb_download, in the Function column, select count and in the Input column position column, select product.

Selecting the most downloaded product

  1. Double-click tTop to open its Component view.

  2. In the Number of line selected field, enter the number of rows to be output to the next component, counting down from the first row of the data sorted by tTop.

  3. In the Criteria table, added one row by clicking the [+] button.

  4. In the Schema column column, select nb_download, the column for which the data is sorted, in the sort num or alpha column, select num, which means the data to be sorted are numbers, and in the Order asc or desc column, select desc to arrange the data in descending order.

Executing the Job

Then you can run this Job.

The tLogRow component is used to present the execution result of the Job.

  1. Double-click the tLogRow component to open the Component view.

  2. Select the Define a storage configuration component check box and select tHDFSConfiguration.

  3. Press F6 to run this Job.

Once done, in the console of the Run view, you can check the execution result.