This scenario shows how to easily create Spark Batch Job by converting the existing MapReduce Job in order to process data about movies and movie directors in the Spark environment.
This scenario demonstrates:
Converting the existing MapReduce Job to a Spark Batch Job allows you to make full use of existing assets to easily create Spark Jobs.
You have launched your Talend Studio and opened the Integration perspective.
You have created the aggregate_movie_director_mr MapReduce Job described in Joining movie and director information using a MapReduce Job and run it successfully.
Proceed as follows to convert the Job:
In the Repository tree view, expand the Job Designs node, the Big Data Batch node and then the getting_started folder and the mapreduce folder.
Right-click the aggregate_movie_director_mr Job and from the contextual menu, select Duplicate.
The [Duplicate] window is opened.
In the Input new name field, name this duplicate to aggregate_movie_director_spark_batch.
From the [Framework] list, select Spark and click OK to validate the changes.
The aggregate_movie_director_spark_batch Job is displayed in the mapreduce folder in the Repository.
Right-click the getting_started folder and select Create folder from the contextual menu.
In the [New Folder] wizard, name the new folder to spark_batch and click Finish to create the folder.
Drop the aggregate_movie_director_spark_batch Job into this spark_batch folder.
This new Spark Batch Job is now ready for further editing.
You update the components, when necessary, to finalize a data transformation process that runs in the Spark framework.
You have ensured that the client machine on which the Talend Jobs are executed can recognize the host names of the nodes of the Hadoop cluster to be used. For this purpose, add the IP address/hostname mapping entries for the services of that Hadoop cluster in the hosts file of the client machine.
For example, if the host name of the Hadoop Namenode server is talend-cdh550.weave.local and its IP address is 192.168.x.x, the mapping entry reads 192.168.x.x talend-cdh550.weave.local.
The Hadoop cluster to be used has been properly configured and is running.
The Cloudera CDH V5.5 cluster used in this use case integrates Spark by default.
The administrator of the cluster has given read/write rights and permissions to the username to be used for the access to the related data and directories in HDFS.
Once the aggregate_movie_director_spark_batch Job has been created through the conversion from its MapReduce version, you need to edit the Job in order to finalize it.
In the Repository, double-click the aggregate_movie_director_spark_batch Job to open it in the workspace.
A tHDFSConfiguration component has been added automatically and inherits the configuration for the connection to HDFS from the original MapReduce Job.
The icons indicate that the components that are used in the original Job do not exist in the current Job framework, Spark Batch. They are tHDFSInput and tHDFSOutput in this example.
Click tHDFSInput to select it and then in the popup [Warning] window, click OK to close this window.
Press Delete on your keyboard to remove tHDFSInput.
In the Job workspace, enter tFileInputDelimited and select this component from the list that appears.
tFileInputDelimited is added to the workspace.
Do the same to replace tHDFSOutput with tFileOutputDelimited.
Expand the Hadoop cluster node under the Metadata node in the Repository and then the my_cdh Hadoop connection node and its child node to display the movies schema metadata node you have set up under the HDFS folder as explained in Preparing file metadata.
Drop this schema metadata node onto the new tFileInputDelimited component in the workspace of the Job.
Right-click this tFileInputDelimited component, then from the contextual menu, select Row > Main and click tMap to connect it to tMap.
Right-click tMap, then from the context menu, select Row > out1 and click the new tFileOutputDelimited to connect tMap to this component.
Double-click the new tFileOutputDelimited component to open its Component view.
In the Folder field, enter or browse to the directory you need to write the result in. In this scenario, it is /user/ychen/output_data/spark_batch/out, which receives the records that contain the names of the movie directors.
Select the Merge result to single file check box in order to merge the part- files typically generated by MapReduce into one single file.
The Merge file path field is displayed.
In the Merge file path field, enter or browse to the file into which you want the part- files to merge.
In this scenario, this file is /user/ychen/output_data/spark_batch/out/merged.
Double-click the other tFileOutputDelimited component which receives the reject link from tMap to open its Component view.
In the Folder field, set the directory to /user/ychen/output_data/spark_batch/reject.
In the Run view, click the Spark configuration tab to verify that the Hadoop/Spark connection metadata has been properly inherited from the original Job.
You always need to use this Spark Configuration tab to define the connection to a given Hadoop/Spark distribution for the whole Spark Batch Job and this connection is effective on a per-Job basis.
If you are not sure that the Spark cluster is able to resolve the hostname of the machine where the Job is executed, select the Define the driver hostname or IP address check box and in the field that is displayed, enter the IP address of this machine.
If you leave this check box clear, the Spark cluster looks at the machine located at 127.0.0.1, that is to say, the machine within the cluster itself for the Spark driver.
Press F6 to run the Job.
The Run view is automatically opened in the lower part of the Studio and shows the execution progress of this Job.
Once done, you can check, for example in the web console of your HDFS system, that the output has been written in HDFS.