tMahoutClustering - 6.1

Talend Components Reference Guide

EnrichVersion
6.1
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for Data Quality
Talend Open Studio for ESB
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
task
Data Governance
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

Warning

This component will be available in the Palette of the studio on the condition that you have subscribed to any Talend Platform product with Big Data.

Function

tMahoutClustering groups data together into clusters based on some similarities. The component offers several similarity methods that can be used in different clustering algorithms.

tMahoutClustering uses clustering algorithms from Mahout libraries. All processes are run in a given distributed file system.

Note

Currently, the studio supports Mahout 0.9.

Purpose

tMahoutClustering helps you to group unlabeled numerical data into clusters that can reveal interesting patterns or helps identifying abnormal data items in the data set.

If you have subscribed to one of the Talend solutions with Big Data, this component is available in the following types of Jobs:

tMahoutClustering in Talend Map/Reduce Jobs

Warning

The information in this section is only for users that have subscribed to one of the Talend solutions with Big Data and is not applicable to Talend Open Studio for Big Data users.

In a Talend Map/Reduce Job, tMahoutClustering, as well as the other Map/Reduce components preceding it, generates native Map/Reduce code. This section presents the specific properties of tMahoutClustering when it is used in that situation. For further information about a Talend Map/Reduce Job, see Talend Big Data Getting Started Guide.

Component family

MapReduce

This component is deprecated and hidden from the Palette by default, but it will continue to work in Jobs you import from older releases. However you must use JDK 7 to be able to run migrated Jobs with tMahoutClustering successfully.

For information about how to show a hidden component on the Palette, see Talend Studio User Guide.

The Spark Batch component tKMeansModel is recommended to replace tMahoutClustering to execute clustering algorithms on datasets.

Basic settings

Schema and Edit schema

A schema is a row description. It defines the number of fields (columns) to be processed and passed on to the next component. The schema is either Built-In or stored remotely in the Repository.

Click Edit schema to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this option to view the schema only.

  • Change to built-in property: choose this option to change the schema to Built-in for local changes.

  • Update repository connection: choose this option to change the schema stored in the repository and decide whether to propagate the changes to all the Jobs upon completion. If you just want to propagate the changes to the current Job, you can select No upon completion and choose this schema metadata again in the [Repository Content] window.

The output schema of tMahoutClustering provides one read-only column, ClusterID.

 

 

Built-In: You create and store the schema locally for this component only. Related topic: see Talend Studio User Guide.

 

 

Repository: You have already created the schema and stored it in the Repository. You can reuse it in various projects and Job designs. Related topic: see Talend Studio User Guide.

File configuration

Input HDFS file

Browse to the HDFS file that holds the numerical data to be processed.

 

Field separator

Enter a character, string or regular expression to separate fields in the input and output data.

 

Cluster columns

In the Input Column, select the column(s) from the main flow on which you want to define clustering algorithms. These columns are used to calculate the clusters.

You can add only numerical columns to this table.

Clustering Configuration

Clustering type

Select the relevant clustering algorithm from the list:

Canopy: this algorithm uses an approximate distance metric and two distance thresholds T 1 and T 2 ,where T 1 >T 2. It starts with a set of data points in any order, picks a point called the centroid of the cluster and approximately measures its distance to all other points. It puts all points that are within distance threshold T 1 into a canopy. It removes from the main set all points that are within distance threshold T 2. This way points that are very close to the centroid will avoid all further processing. The algorithm then chooses a second centroid among the data points in the principal set. It continues until the initial set is empty, accumulating a set of Canopies, each containing one or more points. A given point may occur in more than one Canopy.

Canopy clustering is often used as an initial step in more rigorous clustering techniques, such as K-Means clustering . By starting with Canopy clustering the number of more expensive distance measurements can be significantly reduced by ignoring points outside of the initial canopies.

K-Means: it sorts a given data set into a number of clusters, the number of which you must define. The algorithm chooses k random points, used as centroids of k clusters.

The algorithm then associates each data point belonging to a given data set to the nearest cluster center.

Fuzzy K-Means: also called Fuzzy C-Means: it belongs to the family of fuzzy-logic clustering algorithms. It works like K-Means but recomputes the cluster centers using the probability of a point belonging to two or more clusters.

 

Distance measure

Select from the list the distance measure you want to use for clustering:

Euclidean: defines the "ordinary" distance between two points, as if measured with a ruler.

Manhattan: defines the distance between two points if a grid-like path is followed.

Chebyshev: defines the maximum distance between two vectors taken on any of the coordinate dimensions.

Cosine: uses the cosine of the angle between the two vectors representing the points to be compared.

 

Canopy threshold1

The threshold of distance T1 used for the Canopy algorithm.

 

Canopy threshold2

The threshold of distance T2 used for the Canopy algorithm.

 

Number of clusters

Enter the maximum number of clusters that can be generated by a clustering algorithm. Some clusters may not have data.

 

Max iterations

Enter the maximum number of iterations to be carried out for a clustering algorithm.

 

Convergence delta

Enter a rate of convergence for the algorithm. It must be between 0.0 and 1.0. The greater the rate is, the faster the algorithm is but results will be less precise.

 

Fuzziness

Enter the fuzziness parameter for the Fuzzy K-Means algorithm. It must be greater or equal to 1.0.

When the fuzziness is close to 1, then the cluster center closest to the point is given much more weight than the others, and the algorithm is similar to K-Means.

Global Variables

ERROR_MESSAGE: the error message generated by the component when an error occurs. This is an After variable and it returns a string. This variable functions only if the Die on error check box is cleared, if the component has this check box.

A Flow variable functions during the execution of a component while an After variable functions after the execution of the component.

To fill up a field or expression with a variable, press Ctrl + Space to access the variable list and choose the variable to use from it.

For further information about variables, see Talend Studio User Guide.

Usage

tMahoutClustering must be the start component in a Job. You can select an input HDFS file from its basic settings.

Scenario: Grouping customer numerical data into clusters on HDFS

The scenario is inspired from a research paper on model-based clustering. Its data can be found at Wholesale customers Data Set. The research paper is available at Enhancing the selection of a model-based clustering with external categorical variables. This scenario is included in the Data Quality Demos project you can import into your Talend Studio. For further information, see the Talend Studio User Guide.

The Job in this scenario connects to a given Hadoop distributed file system (HDFS), groups customers of a "wholesale distributor" into two clusters using the algorithms in tMahoutClustering and outputs data on a given HDFS.

The data set has 440 samples that refer to clients of a wholesale distributor. It includes the annual spending in monetary units on diverse product categories like fresh and grocery products or milk.

The data set refers to customers from different channels - Horeca (Hotel/Restaurant/Cafe) or Retail (sale of goods in small quantities) channel, and from different regions (Lisbon/Oporto/other).

This Job uses:

  • tMahoutClustering to compute the clusters for the input data set.

  • two tAggregateRow components to count the number of clients in both clusters based on the region and channel columns.

  • three tMap components to map the channel and region input flows into two separate output flows. The components are also used to map the single clusterID column received from tMahoutClustering to two-column data flow that feed the region and the channel clusters.

  • two tHDFSOutput components to write data to HDFS in two output files.

Prerequisites: Before being able to use the tMahoutClustering component, you must have a functional Hadoop system.

Setting up the Job

  1. Drop the following components from the Palette onto the design workspace: tMahoutClustering, three tMap, two tAggregateRow and two tHDFSOutput components.

  2. Set the components as shown in the capture and connect them together using Main links.

Setting up Hadoop connection

  1. Click Run to open its view and then click the Hadoop Configuration tab to display its view for configuring the Hadoop connection for this Job.

    This view looks like the image below:

  2. From the Property type list, select Built-in. If you have created the connection to be used in Repository, then select Repository and thus the Studio will reuse that set of connection information for this Job.

    For further information about how to create an Hadoop connection in Repository, see the chapter describing the Hadoop cluster node of Talend Studio User Guide.

  3. In the Version area, select the Hadoop distribution to be used and its version. If you cannot find from the list the distribution corresponding to yours, select Custom so as to connect to a Hadoop distribution not officially supported in the Studio.

    For a step-by-step example about how to use this Custom option, see Connecting to a custom Hadoop distribution.

    Along with the evolution of Hadoop, please note the following changes:

    • If you use Hortonworks Data Platform V2.2, the configuration files of your cluster might be using environment variables such as ${hdp.version}. If this is your situation, you need to set the mapreduce.application.framework.path property in the Hadoop properties table with the path value explicitly pointing to the MapReduce framework archive of your cluster. For example:

      mapreduce.application.framework.path=/hdp/apps/2.2.0.0-2041/mapreduce/mapreduce.tar.gz#mr-framework
    • If you use Hortonworks Data Platform V2.0.0, the type of the operating system for running the distribution and a Talend Job must be the same, such as Windows or Linux. Otherwise, you have to use Talend Jobserver to execute the Job in the same type of operating system in which the Hortonworks Data Platform V2.0.0 distribution you are using is run. For further information about Talend Jobserver, see Talend Installation Guide.

  4. In the Name node field, enter the location of the master node, the NameNode, of the distribution to be used. For example, hdfs://tal-qa113.talend.lan:8020.

    If you are using a MapR distribution, you can simply leave maprfs:/// as it is in this field; then the MapR client will take care of the rest on the fly for creating the connection. The MapR client must be properly installed. For further information about how to set up a MapR client, see the following link in MapR's documentation: http://doc.mapr.com/display/MapR/Setting+Up+the+Client

  5. In the Job tracker field, enter the location of the JobTracker of your distribution. For example, tal-qa114.talend.lan:8050.

    Note that the notion Job in this term JobTracker designates the MR or the MapReduce jobs described in Apache's documentation on http://hadoop.apache.org/.

    If you use YARN in your Hadoop cluster such as Hortonworks Data Platform V2.0.0 or Cloudera CDH4.3 + (YARN mode), you need to specify the location of the Resource Manager instead of the Jobtracker. Then you can continue to set the following parameters depending on the configuration of the Hadoop cluster to be used (if you leave the check box of a parameter clear, then at runtime, the configuration about this parameter in the Hadoop cluster to be used will be ignored ):

    • Select the Set resourcemanager scheduler address check box and enter the Scheduler address in the field that appears.

    • Select the Set jobhistory address check box and enter the location of the JobHistory server of the Hadoop cluster to be used. This allows the metrics information of the current Job to be stored in that JobHistory server.

    • Select the Set staging directory check box and enter this directory defined in your Hadoop cluster for temporary files created by running programs. Typically, this directory can be found under the yarn.app.mapreduce.am.staging-dir property in the configuration files such as yarn-site.xml or mapred-site.xml of your distribution.

    • Select the Use datanode hostname check box to allow the Job to access datanodes via their hostnames. This actually sets the dfs.client.use.datanode.hostname property to true. When connecting to a S3N filesystem, you must select this check box.

  6. If you are accessing the Hadoop cluster running with Kerberos security, select this check box, then, enter the Kerberos principal name for the NameNode in the field displayed. This enables you to use your user name to authenticate against the credentials stored in Kerberos.

    In addition, since this component performs Map/Reduce computations, you also need to authenticate the related services such as the Job history server and the Resource manager or Jobtracker depending on your distribution in the corresponding field. These principals can be found in the configuration files of your distribution. For example, in a CDH4 distribution, the Resource manager principal is set in the yarn-site.xml file and the Job history principal in the mapred-site.xml file.

    If you need to use a Kerberos keytab file to log in, select Use a keytab to authenticate. A keytab file contains pairs of Kerberos principals and encrypted keys. You need to enter the principal to be used in the Principal field and the access path to the keytab file itself in the Keytab field.

    Note that the user that executes a keytab-enabled Job is not necessarily the one a principal designates but must have the right to read the keytab file being used. For example, the user name you are using to execute a Job is user1 and the principal to be used is guest; in this situation, ensure that user1 has the right to read the keytab file to be used.

  7. In the User name field, enter the login user name for your distribution. If you leave it empty, the user name of the machine hosting the Studio will be used.

  8. In the Temp folder field, enter the path in HDFS to the folder where you store the temporary files generated during Map/Reduce computations.

  9. Leave the default value of the Path separator in server as it is, unless you have changed the separator used by your Hadoop distribution's host machine for its PATH variable or in other words, that separator is not a colon (:). In that situation, you must change this value to the one you are using in that host.

  10. Leave the Clear temporary folder check box selected, unless you want to keep those temporary files.

  11. Leave the Compress intermediate map output to reduce network traffic check box selected, so as to spend shorter time to transfer the mapper task partitions to the multiple reducers.

    However, if the data transfer in the Job is negligible, it is recommended to clear this check box to deactivate the compression step, because this compression consumes extra CPU resources.

  12. If you need to use custom Hadoop properties, complete the Hadoop properties table with the property or properties to be customized. Then at runtime, these changes will override the corresponding default properties used by the Studio for its Hadoop engine.

    For further information about the properties required by Hadoop, see Apache's Hadoop documentation on http://hadoop.apache.org, or the documentation of the Hadoop distribution you need to use.

  13. If the Hadoop distribution to be used is Hortonworks Data Platform V1.2 or Hortonworks Data Platform V1.3, you need to set proper memory allocations for the map and reduce computations to be performed by the Hadoop system.

    In that situation, you need to enter the values you need in the Mapred job map memory mb and the Mapred job reduce memory mb fields, respectively. By default, the values are both 1000 which are normally appropriate for running the computations.

    If the distribution is YARN, then the memory parameters to be set become Map (in Mb), Reduce (in Mb) and ApplicationMaster (in Mb), accordingly. These fields allow you to dynamically allocate memory to the map and the reduce computations and the ApplicationMaster of YARN.

  14. If you are using Cloudera V5.5+, you can select the Use Cloudera Navigator check box to enable the Cloudera Navigator of your distribution to trace your Job lineage to the component level, including the schema changes between components.

    With this option activated, you need to set the following parameters:

    • Username and Password: this is the credentials you use to connect to your Cloudera Navigator.

    • Cloudera Navigator URL : enter the location of the Cloudera Navigator to be connected to.

    • Cloudera Navigator Metadata URL: enter the location of the Navigator Metadata.

    • Activate the autocommit option: select this check box to make Cloudera Navigator generate the lineage of the current Job at the end of the execution of this Job.

      Since this option actually forces Cloudera Navigator to generate lineages of all its available entities such as HDFS files and directories, Hive queries or Pig scripts, it is not recommended for the production environment because it will slow the Job.

    • Kill the job if Cloudera Navigator fails: select this check box to stop the execution of the Job when the connection to your Cloudera Navigator fails.

      Otherwise, leave it clear to allow your Job to continue to run.

    • Disable SSL validation: select this check box to make your Job to connect to Cloudera Navigator without the SSL validation process.

      This feature is meant to facilitate the test of your Job but is not recommended to be used in a production cluster.

For further information about this Hadoop Configuration tab, see the section describing how to configure the Hadoop connection for a Talend Map/Reduce Job of the Talend Big Data Getting Started Guide.

For further information about the Resource Manager, its scheduler and the ApplicationMaster, see YARN's documentation such as http://hortonworks.com/blog/apache-hadoop-yarn-concepts-and-applications/.

For further information about how to determine YARN and MapReduce memory configuration settings, see the documentation of the distribution you are using, such as the following link provided by Hortonworks: http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.0.6.0/bk_installing_manually_book/content/rpm-chap1-11.html.

Configuring the clustering process

  1. Double-click tMahoutClustering to open its Component view.

  2. From the Schema list, select Built-In and then click the [...] button next to Edit Schema and describe the data structure in the input file.

  3. Add eight rows to the schema dialog box and define the input data as shown in the above capture.

    The component has one read-only column, clusterID.

  4. Click OK.

  5. In the File Configuration area:

    • Click the [...] button next to the Input HDFS file and browse to the HDFS file on the Hadoop system that holds the input numerical data you want to cluster.

    • Set the field separator used to separate the columns in the clustered data.

    • In the Cluster columns table, add rows to the table and click in each row to select a column from the input schema.

  6. In the Clustering Configuration area:

    • From the Clustering Type list, select what algorithm you want to use to cluster the numerical data, Fuzzy K-means in this example.

    • From the Distance Measure list, select the distance measure you want to use for clustering.

    • In the Number of clusters field, enter 3.

    • Leave the values in Max iterations and Convergence delta as they are.

Mapping data

  1. Double-click tMap to open the Map Editor.

  2. Drop the Region and the clusterID columns to the first output table that corresponds to the first tAggregateRow component.

    Drop the Channel and the clusterID columns to the second output table that corresponds to the second tAggregateRow component.

    Use the Schema editor section at the bottom of the editor to add necessary lines to the output tables.

  3. Click OK to validate changes.

Aggregating and calculating output data

  1. Double-click the first tAggregateRow to display its Basic settings view and define the component properties.

  2. Click the [...] button next to Edit schema and define the output flow.

  3. Move the columns in the input schema to the output schema and then use the [+] button to add a new column in the output schema. Call it count.

    When done, click OK to close the dialog box.

  4. In the Group by section, click the plus button to add an many lines as needed. Here you can define the group-by values.

    • Click in the first Output column row and select the output column that will hold the aggregated data, the region column in this example.

    • Click in the first Input column position row and select the input column from which you want to collect the values to be aggregated, the region column in this example.

  5. In the Operations section, click the plus button to add rows for the columns that will hold the aggregated data. Here you can define the calculation values.

    • Click in the Output column row and select the destination column from the list, the count column in this example.

    • Click in the Function column row and select any of the listed operations.

      In this example, we want to count the number of clients, based on their regions, to be listed only once in the output column.

    • Click in the Input column position row and select the input column from which you want to collect the values to be aggregated, the region column in this example.

  6. Double-click the second tAggregateRow component and define, the same way, its basic settings to count the number of clients in the second cluster based on the channel column.

Mapping output data

  1. Double-click the second tMap to open the Map Editor.

  2. Drop the region, the clusterID and the count columns to the output table that corresponds to the first HDFS file.

  3. Click OK to validate changes.

  4. Double-click the third tMap to open the Map Editor.

  5. Drop the channel, the clusterID and the count columns to the output table that corresponds to the second HDFS file.

  6. Click OK to validate changes.

Writing output data in HDFS

  1. Double-click the first tHDFSOutput to open its Component view.

  2. Click the [...] button next to the Folder field and browse to the folder in which you want to write the region data.

  3. From the Type list, select the data format for the records to be written. In this example, select Text file.

  4. From the Action list, select the operation you need to perform on the file in question. If the file already exists, select Overwrite, otherwise select Create.

  5. Select the Merge result to single file check box and enter the path, or browse to the file you need to write the merged output data in.

  6. If the file for the merged data exists, select the Override target file check box to overwrite that file.

  7. Double-click the second tHDFSOutput to open its Component view.

  8. Define the component settings similarly to write the data about the client channels from the second cluster to an output HDFS folder.

Finalizing and executing the Job

  • Save your Job and press F6 to execute it.

    The below figure shows part of the clustered data written to the HDFS folders.

    tMahoutClustering reads data from the given Hadoop system and groups customer records into clusters. The other components in the Job analyze clustering results, show the number of customers in each cluster grouped by channels or regions and gives the cluster identification.

Visualizing output data

You can build a bar chart on each of the clusters to visualize the number of customers grouped by different regions or channels.

  • Use tHDFSInput and tBarChart components in two Jobs to read the output HDFS files and generate a bar chart on the data to ease technical analysis.

    In the first Job, tHDFSInput reads the region output HDFS file and passes the flow to tBarChart. tBarChart reads data from the input flow and transforms it into a bar chart in a PNG image file.

    In the second Job, tHDFSInput reads the channel output HDFS file and passes the flow to tBarChart which transforms the input data into a bar chart in a PNG image file.

    Each bar chart has three columns, every column represents the number of records in one cluster.

    For further information about the tHDFSInput component, see tHDFSInput and for more information about the tBarChart, see tBarChart.