In this scenario, you create a Spark Batch Job to analyze how often a given product is downloaded.
In this Job, you analyze the download preference of some specific customers known to your customer base.
The sample data used as the customer base is as follows:
10103|Herbert|Clinton|FR|SILVERemail@example.com|6571183 10281|Bill|Ford|BE|PLATINUMfirstname.lastname@example.org|6360604 10390|George|Garfield|GB|SILVERemail@example.com|7919508 10566|Abraham|Garfield|CN|SILVERfirstname.lastname@example.org|9155569 10691|John|Polk|GB|SILVERemail@example.com|6488579 10884|Herbert|Hayes|GB|SILVERfirstname.lastname@example.org|8728181 11020|Chester|Roosevelt|BE|GOLDemail@example.com|4172181 11316|Franklin|Madison|BR|SILVERfirstname.lastname@example.org|4711801 11707|James|Tyler|ES|GOLDemail@example.com|7276942 11764|Theodore|McKinley|GB|GOLDfirstname.lastname@example.org|3224767 11777|Warren|Madison|BE|N/Aemail@example.com|6695520 11857|Ronald|Arthur|SG|PLATINUMfirstname.lastname@example.org|6704785 11936|Theodore|Buchanan|NL|SILVERemail@example.com|2783553 11940|Lyndon|Wilson|BR|PLATINUMfirstname.lastname@example.org|1247110 12214|Gerald|Jefferson|SG|N/Aemail@example.com|5879162 12382|Herbert|Taylor|IT|GOLDfirstname.lastname@example.org|3873628 12475|Richard|Kennedy|FR|N/Aemail@example.com|7287388 12479|Calvin|Eisenhower|ES|N/Afirstname.lastname@example.org|1792573 12531|Chester|Arthur|JP|PLATINUMemail@example.com|8772326 12734|Jimmy|Buchanan|IT|SILVERfirstname.lastname@example.org|7007786
This data contains these customers' ID numbers known to this customer base, their first and last names and country codes, their support levels and registration dates, their email addresses and phone numbers.
The sample web-click log of some of these customers reads as follows:
10103|/download/products/talend-open-studio 10281|/services/technical-support 10390|/services/technical-support 10566|/download/products/data-integration 10691|/services/training 10884|/download/products/integration-cloud 11020|/services/training 11316|/download/products/talend-open-studio 11707|/download/products/talend-open-studio 11764|/customers
This data contains the ID numbers of the customers who visited different Talend web pages and the pages they visited.
By reading this data, you can find that the visits come from customers of different support-levels for different purposes. The Job to be designed is used to identify the sources of these visits against the sample customer base and analyze which product is most downloaded by the Silver-level customers.
Note that the sample data is created for demonstration purposes only.
To replicate this scenario, proceed as follows:
In the Integration perspective of the Studio, create an empty Spark Batch Job from the Job Designs node in the Repository tree view.
For further information about how to create a Spark Batch Job, see Talend Big Data Getting Started Guide.
In the workspace, enter the name of the component to be used and select this component from the list that appears. In this scenario, the components are tHDFSConfiguration, two tFixedFlowInput components (label one to customer_base and the other to web_data), tSqlRow, tCacheOut, tCacheIn, tMap, tExtractDelimitedFields, tAggregateRow, tTop and tLogRow.
The tFixedFlowInput components are used to load the sample data into the data flow. In the real-world practice, you can use other components such as tMysqlInput, alone or even with a tMap, in the place of tFixedFlowInput to design a sophisticated process to prepare your data to be processed.
Connect customer_base (tFixedFlowInput), tSqlRow and tCacheOut using the Row > Main link. In this Subjob, the records about the Silver-level customers are selected and stored in cache.
Connect web_data (tFixedFlowInput) to tMap using the Row > Main link. This is the main input flow to the tMap component.
Do the same to connect tCacheIn to tMap. This is the lookup flow to tMap.
Connect tMap to tExtractDelimitedFields using the Row > Main link and name this connection in the dialog box that is displayed. For example, name it to output.
Connect tExtractDelimitedFields, tAggregateRow, tTop and tLogRow using the Row > Main link.
Connect customer_base to web_data using the Trigger > OnSubjobOk link.
Leave the tHDFSConfiguration component alone without any connection.
Click Run to open its view and then click the Spark Configuration tab to display its view for configuring the Spark connection.
This view looks like the image below:
Select the type of the Spark cluster you need to connect to.
Local: the Studio builds the Spark environment in itself at runtime to run the Job locally within the Studio. With this mode, each processor of the local machine is used as a Spark worker to perform the computations. This mode requires minimum parameters to be set in this configuration view.
Note this local machine is the machine in which the Job is actually run. The Local mode is the default mode and you need to clear its check box to display the drop-down list for you to select the other modes.
Standalone: the Studio connects to a Spark-enabled cluster to run the Job from this cluster.
Yarn client: the Studio runs the Spark driver to orchestrate how the Job should be performed and then send the orchestration to the Yarn service of a given Hadoop cluster so that the Resource Manager of this Yarn service requests execution resources accordingly.
If you are using the Yarn client mode, the Property type list is displayed to allow you to select an established Hadoop connection from the Repository, on the condition that you have created this connection in the Repository. Then the Studio will reuse that set of connection information for this Job.
For further information about how to create an Hadoop connection in Repository, see the chapter describing the Hadoop cluster node of the Talend Studio User Guide.
Select the version of the Hadoop distribution you are using along with Spark.
If you cannot find the distribution corresponding to yours from this drop-down list, this means the distribution you want to connect to is not officially supported by Talend. In this situation, you can select Custom, then select the Spark version of the cluster to be connected and click the button to display the dialog box in which you can alternatively:
Select Import from existing version to import an officially supported distribution as base and then add other required jar files which the base distribution does not provide.
Select Import from zip to import the configuration zip for the custom distribution to be used. This zip file should contain the libraries of the different Hadoop/Spark elements and the index file of these libraries.
Note that custom versions are not officially supported by Talend. Talend and its community provide you with the opportunity to connect to custom versions from the Studio but cannot guarantee that the configuration of whichever version you choose will be easy. As such, you should only attempt to set up such a connection if you have sufficient Hadoop and Spark experience to handle any issues on your own.
Configure the connection information to the principal services of the cluster to be used.
If you are using the Yarn client mode, you need to enter the addresses of the following different services in their corresponding fields (if you leave the check box of a service clear, then at runtime, the configuration about this parameter in the Hadoop cluster to be used will be ignored ):
In the Resource manager field, enter the address of the ResourceManager service of the Hadoop cluster to be used.
Select the Set resourcemanager scheduler address check box and enter the Scheduler address in the field that appears.
Select the Set jobhistory address check box and enter the location of the JobHistory server of the Hadoop cluster to be used. This allows the metrics information of the current Job to be stored in that JobHistory server.
Select the Set staging directory check box and enter this directory defined in your Hadoop cluster for temporary files created by running programs. Typically, this directory can be found under the yarn.app.mapreduce.am.staging-dir property in the configuration files such as yarn-site.xml or mapred-site.xml of your distribution.
If you are accessing the Hadoop cluster running with Kerberos security, select this check box, then, enter the Kerberos principal names for the ResourceManager service and the JobHistory service in the displayed fields. This enables you to use your user name to authenticate against the credentials stored in Kerberos. These principals can be found in the configuration files of your distribution. For example, in a CDH4 distribution, the Resource manager principal is set in the yarn-site.xml file and the Job history principal in the mapred-site.xml file.
If you need to use a Kerberos keytab file to log in, select Use a keytab to authenticate. A keytab file contains pairs of Kerberos principals and encrypted keys. You need to enter the principal to be used in the Principal field and the access path to the keytab file itself in the Keytab field.
Note that the user that executes a keytab-enabled Job is not necessarily the one a principal designates but must have the right to read the keytab file being used. For example, the user name you are using to execute a Job is user1 and the principal to be used is guest; in this situation, ensure that user1 has the right to read the keytab file to be used.
The User name field is available when you are not using Kerberos to authenticate. In the User name field, enter the login user name for your distribution. If you leave it empty, the user name of the machine hosting the Studio will be used.
Since the Job needs to upload jar files to HDFS of the cluster to be used, you must ensure that this user name is the same as the one you have put in tHDFSConfiguration, the component used to provides HDFS connection information to Spark.
If you are using the Standalone mode, you need to set the following parameters:
In the Spark host field, enter the URI of the Spark Master of the Hadoop cluster to be used.
In the Spark home field, enter the location of the Spark executable installed in the Hadoop cluster to be used.
If you need to run the current Job on Windows, it is recommended to specify where the winutils.exe program to be used is stored.
If you know where to find your winutils.exe file and you want to use it, select the Define the Hadoop home directory check box and enter the directory where your winutils.exe is stored.
Otherwise, leave this check box clear, the Studio generates one by itself and automatically uses it for this Job.
If the Spark cluster cannot recognize the machine in which the Job is launched, select this Define the driver hostname or IP address check box and enter the host name or the IP address of this machine. This allows the Spark master and its workers to recognize this machine to find the Job and thus its driver.
Note that in this situation, you also need to add the name and the IP address of this machine to its host file.
Select the Set Tuning properties check box to optimize the allocation of the resources to be used to run this Job. These properties are not mandatory for the Job to run successfully, but they are useful when Spark is bottlenecked by any resource issue in the cluster such as CPU, bandwidth or memory:
Driver memory and Driver core: enter the allocation size of memory and the number of cores to be used by the driver of the current Job.
Executor memory: enter the allocation size of memory to be used by each Spark executor.
Core per executor: select this check box and in the displayed field, enter the number of cores to be used by each executor. If you leave this check box clear, the default allocation defined by Spark is used, for example, all available cores are used by one single executor in the Standalone mode.
Set Web UI port: if you need to change the default port of the Spark Web UI, select this check box and enter the port number you want to use.
Broadcast factory: select the broadcast implementation to be used to cache variables on each worker machine.
Customize Spark serializer: if you need to import an external Spark serializer, select this check box and in the field that is displayed, enter the fully qualified class name of the serializer to be used.
Yarn resource allocation: select how you want Yarn to allocate resources among executors.
Auto: you leave Yarn to manage the allocation by itself.
Fixed: you need to enter the number of executors to be used in the Num executors that is displayed.
Dynamic: Yarn adapts the number of executors to suit the workload. You need to define the scale of this dynamic allocation by defining the initial number of executors to run in the Initial executors field, the lowest number of executors in the Min executors field and the largest number of executors in the Max executors field.
This feature is available to the Yarn client mode only.
In the Spark "scratch" directory field, enter the directory in which the Studio stores in the local system the temporary files such as the jar files to be transferred. If you launch the Job on Windows, the default disk is C:. So if you leave /tmp in this field, this directory is C:/tmp.
Add any Spark properties you need to use to override their default counterparts used by the Studio.
For example, if you are using the Yarn client mode with a CDH distribution, you need to specify the Yarn classpath of your cluster for the Job. The property to be added is spark.hadoop.yarn.application.classpath. Please contact the administrator of your cluster to obtain related information.
The following value from a Cloudera cluster is presented for demonstration purposes:
If you want the Spark application logs of this Job to be persistent in the file system, add the related properties to this Advanced properties table. For example, the properties to be set for the Yarn client mode are:
The value of the spark.eventlog.enabled property should be true; for the values of the other two properties, contact the administrator of the Spark cluster to be used.
For further information about the valid Spark properties, see Spark documentation at https://spark.apache.org/docs/latest/configuration.
Double-click tHDFSConfiguration to open its Component view. Note that tHDFSConfiguration is used because the Spark Yarn client mode is used to run Spark Jobs in this scenario.
Spark uses this component to connect to the HDFS system to which the jar files dependent on the Job are transferred.
In the Version area, select the Hadoop distribution you need to connect to and its version.
In the NameNode URI field, enter the location of the machine hosting the NameNode service of the cluster.
In the Username field, enter the authentication information used to connect to the HDFS system to be used. Note that the user name must be the same as you have put in the Spark configuration tab.