Creating a classification model to filter spam - 6.1

Talend Components Reference Guide

EnrichVersion
6.1
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for Data Quality
Talend Open Studio for ESB
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
task
Data Governance
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

In this scenario, you create Spark Batch Jobs. The key components to be used are as follows:

  • tModelEncoder: several tModelEncoder components are used to transform given SMS text messages into feature sets.

  • tRandomForestModel: it analyzes the features incoming from tModelEncoder to build a classification model that understands what a junk message or a normal message could look like.

  • tClassify: in a new Job, it applies this classification model to process a new set of SMS text messages to classify the spam and the normal messages. In this scenario, the result of this classification is used to evaluate the accuracy of the model, since the classification of the messages processed by tClassify is already known and explicitly marked.

  • A configuration component such as tHDFSConfiguration in each Job: this component is used to connect to the file system to which the jar files dependent on the Job are transferred during the execution of the Job.

    This file-system-related configuration component is required unless you run your Spark Jobs in the Local mode.

Prerequisites:

  • Two sets of SMS text messages: one is used to train classification models and the other is used to evaluate the created models. You can download the train set from trainingSet.zip and the test set from testSet.zip.

    Talend created these two sets out of the dataset downloadable from https://archive.ics.uci.edu/ml/datasets/SMS+Spam+Collection, by using this dataSet_preparation Job to add 3 feature columns (number of currency symbols, number of numeric values and number of exclamation marks) to the raw dataset and proportionally split the dataset.

    An example of the junk messages reads as follows:

    Free entry in 2 a wkly comp to win FA Cup final tkts 21st May 2005. Text FA to 87121 to receive entry question(std txt rate)T&C's apply 08452810075over18's

    An example of the normal messages reads as follows:

    Ahhh. Work. I vaguely remember that! What does it feel like? Lol

    Note that the new features added to the raw dataset were discovered as the result of the observation of the junk messages used specifically in this scenario (these junk messages often contain prices and/or exclamation marks) and so cannot be generalized for whatever junk messages you want to analyze. In addition, the dataset was randomly split into two sets and used as is but in a real-world practice, you can continue to preprocess them using many different methods such as dataset balancing in order to better train your classification model.

  • The two sets must be stored in the machine where the Job is going to be executed, for example in the HDFS system of your Yarn cluster if you use the Spark Yarn client mode to run Talend Spark Jobs, and you have appropriate rights and permissions to read data from and write data in this system.

    In this scenario, the Spark Yarn client will be used and the datasets are stored in the associated HDFS system.

  • The Spark cluster to be used must have been properly set up and is running.

Creating a classification model using Random Forest

Linking the components
  1. In the Integration perspective of the Studio, create an empty Spark Batch Job, named rf_model_creation for example, from the Job Designs node in the Repository tree view.

    For further information about how to create a Spark Batch Job, see the Getting Started Guide of the Studio.

  2. In the workspace, enter the name of the component to be used and select this component from the list that appears. In this scenario, the components are tHDFSConfiguration, tFileInputDelimited, tRandomForestModel component, and 4 tModelEncoder components.

    It is recommended to label the 4 tModelEncoder components to different names so that you can easily recognize the task each of them is used to complete. In this scenario, they are labelled Tokenize, tf, tf_idf and features_assembler, respectively.

  3. Except tHDFSConfiguration, connect the other components using the Row > Main link as is previously displayed in the image.

Configuring the connection to the file system to be used by Spark
  1. Double-click tHDFSConfiguration to open its Component view. Note that tHDFSConfiguration is used because the Spark Yarn client mode is used to run Spark Jobs in this scenario.

    Spark uses this component to connect to the HDFS system to which the jar files dependent on the Job are transferred.

  2. In the Version area, select the Hadoop distribution you need to connect to and its version.

  3. In the NameNode URI field, enter the location of the machine hosting the NameNode service of the cluster.

  4. In the Username field, enter the authentication information used to connect to the HDFS system to be used. Note that the user name must be the same as you have put in the Spark configuration tab.

Loading the training set into the Job
  1. Double-click tFileInputDelimited to open its Component view.

  2. Select the Define a storage configuration component check box and select the tHDFSConfiguration component to be used.

    tFileInputDelimited uses this configuration to access the training set to be used.

  3. Click the [...] button next to Edit schema to open the schema editor.

  4. Click the [+] button five times to add five rows and in the Column column, rename them to label, sms_contents, num_currency, num_numeric and num_exclamation, respectively.

    The label and the sms_contents columns carries the raw data which is composed of the SMS text messages in the sms_contents column and the labels indicating whether a message is spam in the label column.

    The other columns are used to carry the features added to the raw datasets as explained previously in this scenario. These three features are the number of currency symbols, the number of numeric values and the number of exclamation marks found in each SMS message.

  5. In the Type column, select Integer for the num_currency, num_numeric and num_exclamation columns.

  6. Click OK to validate these changes.

  7. In the Folder/File field, enter the directory where the training set to be used is stored.

  8. In the Field separator field, enter \t, which is the separator used by the datasets you can download for use in this scenario.

Transforming SMS text messages to feature vectors using tModelEncoder

This step is meant to implement the feature engineering process.

Transforming messages to words

  1. Double-click the tModelEncoder component labelled Tokenize to open its Component view. This component tokenize the SMS messages into words.

  2. Click the Sync columns button to retrieve the schema from the preceding one.

  3. Click the [...] button next to Edit schema to open the schema editor.

  4. On the output side, click the [+] button to add one row and in the Column column, rename it to sms_tokenizer_words. This column is used to carry the tokenized messages.

  5. In the Type column, select Object for this sms_tokenizer_words row.

  6. Click OK to validate these changes.

  7. In the Transformations table, add one row by clicking the [+] button and then proceed as follows:

    • In the Input column column, select the column that provides data to be transformed to features. In this scenario, it is sms_contents.

    • In the Output column column, select the column that carry the features. In this scenario, it is sms_tokenizer_words.

    • In the Transformation column, select the algorithm to be used for the transformation. In this scenario, it is Regex tokenizer.

    • In the Parameters column, enter the parameters you want to customize for use in the algorithm you have selected. In this scenario, enter pattern=\\W;minTokenLength=3.

    Using this transformation, tModelEncoder splits each input message by whitespace, selects only the words contains at least 3 letters and put the result of the transformation in the sms_tokenizer_words column. Thus currency symbols, numeric values, punctuations and words such as a, an or to are excluded from this column.

Calculating the weight of a word in each message

  1. Double-click the tModelEncoder component labelled tf to open its Component view.

  2. Repeat the operations described previously over the tModelEncoder labelled Tokenizer to add the sms_tf_vect column of the Vector type to the output schema and define the transformation as displayed in the image above.

    In this transformation, tModelEncoder uses HashingTF to convert the already tokenized SMS messages into fixed-length (15 in this scenario) feature vectors to reflect the importance of a word in each SMS message.

Downplaying the weight of the irrelevant words in each message

  1. Double-click the tModelEncoder component labelled tf_idf to open its Component view. In this process, tModelEncoder reduces the weight of the words that appears very often but in too many messages, because a word like this often brings no meaningful information for text analysis, such as the word the.

  2. Repeat the operations described previously over the tModelEncoder labelled Tokenizer to add the sms_tf_idf_vect column of the Vector type to the output schema and define the transformation as displayed in the image above.

    In this transformation, tModelEncoder uses Inverse Document Frequency to downplay the weight of the words that appears in 5 or more than 5 messages.

Combining feature vectors

  1. Double-click the tModelEncoder component labelled features_assembler to open its Component view.

  2. Repeat the operations described previously over the tModelEncoder labelled Tokenizer to add the features_vect column of the Vector type to the output schema and define the transformation as displayed in the image above.

    Note that the parameter to be put in the Parameters column is inputCols=sms_tf_idf_vect,num_currency,num_numeric,num_exclamation.

    In this transformation, tModelEncoder combines all feature vectors into one single feature column.

Training the model using Random Forest
  1. Double-click tRandomForestModel to open its Component view.

  2. From the Label column list, select the column that provides the classes to be used for classification. In this scenario, it is label, which contains two class names: spam for junk messages and ham for normal messages.

  3. From the Features column list, select the column that provides the feature vectors to be analyzed. In this scenario, it is features_vect, which combines all features.

  4. Select the Save the model on file system check box and in the HDFS folder field that is displayed, enter the directory you want to use to store the generated model.

  5. In the Number of trees in the forest field, enter the number of decision trees you want tRandomForestModel to build. You need to try different numbers to run the current Job to create the classification model several times; after comparing the evaluation results of every model created on each run, you can decide the number you need to use. In this scenario, put 20.

    An evaluation Job will be presented in one of the following sections.

  6. Leave the other parameters as is.

Setting up Spark connection
  1. Click Run to open its view and then click the Spark Configuration tab to display its view for configuring the Spark connection.

    This view looks like the image below:

  2. Select the type of the Spark cluster you need to connect to.

    • Local: the Studio builds the Spark environment in itself at runtime to run the Job locally within the Studio. With this mode, each processor of the local machine is used as a Spark worker to perform the computations. This mode requires minimum parameters to be set in this configuration view.

      Note this local machine is the machine in which the Job is actually run. The Local mode is the default mode and you need to clear its check box to display the drop-down list for you to select the other modes.

    • Standalone: the Studio connects to a Spark-enabled cluster to run the Job from this cluster.

    • Yarn client: the Studio runs the Spark driver to orchestrate how the Job should be performed and then send the orchestration to the Yarn service of a given Hadoop cluster so that the Resource Manager of this Yarn service requests execution resources accordingly.

  3. If you are using the Yarn client mode, the Property type list is displayed to allow you to select an established Hadoop connection from the Repository, on the condition that you have created this connection in the Repository. Then the Studio will reuse that set of connection information for this Job.

    For further information about how to create an Hadoop connection in Repository, see the chapter describing the Hadoop cluster node of the Talend Studio User Guide.

  4. Select the version of the Hadoop distribution you are using along with Spark.

    If you cannot find the distribution corresponding to yours from this drop-down list, this means the distribution you want to connect to is not officially supported by Talend. In this situation, you can select Custom, then select the Spark version of the cluster to be connected and click the button to display the dialog box in which you can alternatively:

    1. Select Import from existing version to import an officially supported distribution as base and then add other required jar files which the base distribution does not provide.

    2. Select Import from zip to import the configuration zip for the custom distribution to be used. This zip file should contain the libraries of the different Hadoop/Spark elements and the index file of these libraries.

      Note that custom versions are not officially supported by Talend. Talend and its community provide you with the opportunity to connect to custom versions from the Studio but cannot guarantee that the configuration of whichever version you choose will be easy. As such, you should only attempt to set up such a connection if you have sufficient Hadoop and Spark experience to handle any issues on your own.

  5. Configure the connection information to the principal services of the cluster to be used.

    If you are using the Yarn client mode, you need to enter the addresses of the following different services in their corresponding fields (if you leave the check box of a service clear, then at runtime, the configuration about this parameter in the Hadoop cluster to be used will be ignored ):

    • In the Resource manager field, enter the address of the ResourceManager service of the Hadoop cluster to be used.

    • Select the Set resourcemanager scheduler address check box and enter the Scheduler address in the field that appears.

    • Select the Set jobhistory address check box and enter the location of the JobHistory server of the Hadoop cluster to be used. This allows the metrics information of the current Job to be stored in that JobHistory server.

    • Select the Set staging directory check box and enter this directory defined in your Hadoop cluster for temporary files created by running programs. Typically, this directory can be found under the yarn.app.mapreduce.am.staging-dir property in the configuration files such as yarn-site.xml or mapred-site.xml of your distribution.

    • If you are accessing the Hadoop cluster running with Kerberos security, select this check box, then, enter the Kerberos principal names for the ResourceManager service and the JobHistory service in the displayed fields. This enables you to use your user name to authenticate against the credentials stored in Kerberos. These principals can be found in the configuration files of your distribution. For example, in a CDH4 distribution, the Resource manager principal is set in the yarn-site.xml file and the Job history principal in the mapred-site.xml file.

      If you need to use a Kerberos keytab file to log in, select Use a keytab to authenticate. A keytab file contains pairs of Kerberos principals and encrypted keys. You need to enter the principal to be used in the Principal field and the access path to the keytab file itself in the Keytab field.

      Note that the user that executes a keytab-enabled Job is not necessarily the one a principal designates but must have the right to read the keytab file being used. For example, the user name you are using to execute a Job is user1 and the principal to be used is guest; in this situation, ensure that user1 has the right to read the keytab file to be used.

    • The User name field is available when you are not using Kerberos to authenticate. In the User name field, enter the login user name for your distribution. If you leave it empty, the user name of the machine hosting the Studio will be used.

      Since the Job needs to upload jar files to HDFS of the cluster to be used, you must ensure that this user name is the same as the one you have put in tHDFSConfiguration, the component used to provides HDFS connection information to Spark.

    If you are using the Standalone mode, you need to set the following parameters:

    • In the Spark host field, enter the URI of the Spark Master of the Hadoop cluster to be used.

    • In the Spark home field, enter the location of the Spark executable installed in the Hadoop cluster to be used.

  6. If you need to run the current Job on Windows, it is recommended to specify where the winutils.exe program to be used is stored.

    • If you know where to find your winutils.exe file and you want to use it, select the Define the Hadoop home directory check box and enter the directory where your winutils.exe is stored.

    • Otherwise, leave this check box clear, the Studio generates one by itself and automatically uses it for this Job.

  7. If the Spark cluster cannot recognize the machine in which the Job is launched, select this Define the driver hostname or IP address check box and enter the host name or the IP address of this machine. This allows the Spark master and its workers to recognize this machine to find the Job and thus its driver.

    Note that in this situation, you also need to add the name and the IP address of this machine to its host file.

  8. Select the Set Tuning properties check box to optimize the allocation of the resources to be used to run this Job. These properties are not mandatory for the Job to run successfully, but they are useful when Spark is bottlenecked by any resource issue in the cluster such as CPU, bandwidth or memory:

    • Driver memory and Driver core: enter the allocation size of memory and the number of cores to be used by the driver of the current Job.

    • Executor memory: enter the allocation size of memory to be used by each Spark executor.

    • Core per executor: select this check box and in the displayed field, enter the number of cores to be used by each executor. If you leave this check box clear, the default allocation defined by Spark is used, for example, all available cores are used by one single executor in the Standalone mode.

    • Set Web UI port: if you need to change the default port of the Spark Web UI, select this check box and enter the port number you want to use.

    • Broadcast factory: select the broadcast implementation to be used to cache variables on each worker machine.

    • Customize Spark serializer: if you need to import an external Spark serializer, select this check box and in the field that is displayed, enter the fully qualified class name of the serializer to be used.

    • Yarn resource allocation: select how you want Yarn to allocate resources among executors.

      • Auto: you leave Yarn to manage the allocation by itself.

      • Fixed: you need to enter the number of executors to be used in the Num executors that is displayed.

      • Dynamic: Yarn adapts the number of executors to suit the workload. You need to define the scale of this dynamic allocation by defining the initial number of executors to run in the Initial executors field, the lowest number of executors in the Min executors field and the largest number of executors in the Max executors field.

      This feature is available to the Yarn client mode only.

  9. In the Spark "scratch" directory field, enter the directory in which the Studio stores in the local system the temporary files such as the jar files to be transferred. If you launch the Job on Windows, the default disk is C:. So if you leave /tmp in this field, this directory is C:/tmp.

  10. Add any Spark properties you need to use to override their default counterparts used by the Studio.

    For example, if you are using the Yarn client mode with a CDH distribution, you need to specify the Yarn classpath of your cluster for the Job. The property to be added is spark.hadoop.yarn.application.classpath. Please contact the administrator of your cluster to obtain related information.

    The following value from a Cloudera cluster is presented for demonstration purposes:

    /etc/hadoop/conf,/usr/lib/hadoop/*,/usr/lib/hadoop/lib/*,/usr/lib/hadoop-hdfs/*,/usr/lib/hadoop-hdfs/lib/*,/usr/lib/hadoop-mapreduce/*,/usr/lib/hadoop-mapreduce/lib/*,/usr/lib/hadoop-yarn/*,/usr/lib/hadoop-yarn/lib/*,/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/*

    If you want the Spark application logs of this Job to be persistent in the file system, add the related properties to this Advanced properties table. For example, the properties to be set for the Yarn client mode are:

    • spark.yarn.historyServer.address

    • spark.eventLog.enabled

    • spark.eventLog.dir

    The value of the spark.eventlog.enabled property should be true; for the values of the other two properties, contact the administrator of the Spark cluster to be used.

    For further information about the valid Spark properties, see Spark documentation at https://spark.apache.org/docs/latest/configuration.

Executing the Job to create the classification model

Then you can run this Job.

  • Press F6 to run this Job.

Once done, the model file is created in the directory you have specified in tRandomForestModel.

Evaluating the classification model

Linking the components
  1. In the Integration perspective of the Studio, create another empty Spark Batch Job, named classify_and_evaluation for example, from the Job Designs node in the Repository tree view.

  2. In the workspace, enter the name of the component to be used and select this component from the list that appears. In this Job, the components are tHDFSConfiguration, tFileInputDelimited, tClassify, tReplicate, tJava, tFilterColumns and tLogRow.

  3. Except tHDFSConfiguration, connect them using the Row > Main link as is displayed in the image above.

  4. Double-click tHDFSConfiguration to open its Component view and configure it as explained previously in this scenario.

Loading the training set into the Job
  1. Double-click tFileInputDelimited to open its Component view.

  2. Select the Define a storage configuration component check box and select the tHDFSConfiguration component to be used.

    tFileInputDelimited uses this configuration to access the training set to be used.

  3. Click the [...] button next to Edit schema to open the schema editor.

  4. Click the [+] button five times to add five rows and in the Column column, rename them to reallabel, sms_contents, num_currency, num_numeric and num_exclamation, respectively.

    The reallabel and the sms_contents columns carries the raw data which is composed of the SMS text messages in the sms_contents column and the labels indicating whether a message is spam in the reallabel column.

    The other columns are used to carry the features added to the raw datasets as explained previously in this scenario. They contains the number of currency symbols, the number of numeric values and the number of exclamation marks found in each SMS message.

  5. In the Type column, select Integer for the num_currency, num_numeric and num_exclamation columns.

  6. Click OK to validate these changes.

  7. In the Folder/File field, enter the directory where the test set to be used is stored.

  8. In the Field separator field, enter \t, which is the separator used by the datasets you can download for use in this scenario.

Applying the classification model
  1. Double-click tClassify to open its Component view.

  2. Select the Model on filesystem radio button and enter the directory in which the classification model to be used is stored.

    The tClassify component contains a read-only column called label in which the model provides the classes to be used in the classification process, while the reallabel column retrieved from the input schema contains the classes to which each message actually belongs. The model will be evaluated by comparing the actual label of each message with the label the model determines.

Replicating the classification result
  1. Double-click tReplicate to open its Component view.