Scenario: aggregating data from two relations using COGROUP - 6.3

Talend Open Studio for Big Data Components Reference Guide

EnrichVersion
6.3
EnrichProdName
Talend Open Studio for Big Data
task
Data Governance
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

In this scenario, a four-component Job is designed to aggregate two relations on top of a given Hadoop cluster.

The two relations used in this scenario consist of the following sample data:

  1. Alice,turtle,17
    Alice,goldfish,17
    Alice,cat,17
    Bob,dog,18
    Bob,cat,18
    John,dog,19
    Mary,goldfish,16
    Bill,dog,20

    This relation is composed of three columns that read owner, pet and age (of the owners).

  2. Cindy,Alice
    Mark,Alice
    Paul,Bob
    Paul,Jane
    John,Mary
    William,Bill

    This relation provides a list of students' names alongside their friends, of which some are pet owners displayed in the first relation. Therefore, the schema of this relation contains two columns: student and friend.

Before replicating this scenario, you need to write the sample data into the HDFS system of the Hadoop cluster to be used. To do this, you can use tHDFSOutput. For further information about this component, see tHDFSOutput.

The data used in this scenario is inspired by the examples that Pig's documentation uses to explain the GROUP and the GOGROUP operators. For related information, please see Apache's documentation for Pig.

Linking the components

  1. In the Integration perspective of the Studio, create an empty Job from the Job Designs node in the Repository tree view.

    For further information about how to create a Job, see Talend Studio User Guide.

  2. In the workspace, enter the name of the component to be used and select this component from the list that appears. In this scenario, the components are two tPigLoad components, a tPigCoGroup component and a tPigStoreResult component. One of the two tPigLoad components is used as the main loading component to connect to the Hadoop cluster to be used.

  3. Connect the main tPigLoad component to tPigCoGroup using the Row > Main link.

  4. Do the same to connect the second tPigLoad component to tPigCoGroup. The Lookup label appears over this link.

  5. Repeat the operation to connect tPigCoGroup to tPigStoreResult.

Reading data into the Pig flow

Reading the owner-pet sample data

  1. Double-click the main tPigLoad component to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor and click the [+] button three times to add three rows.

  3. In the Column column, rename the new rows to owner, pet and age, respectively, and in the Type column of the age row, select Integer.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Mode area, select Map/Reduce to use the remote Hadoop cluster to be used.

  6. In the Distribution and the Version lists, select the Hadoop distribution you are using. In this example, HortonWorks Data Platform V2.1.0 (Baikal) is selected.

  7. In the Load function list, select PigStorage. Then, the corresponding parameters to be set appear.

  8. In the NameNode URI and the Resource manager fields, enter the locations of those services, respectively.

  9. Select the Set Resourcemanager scheduler address check box and enter the URI of this service in the field that is displayed. This allows you to use the Scheduler service defined in the Hadoop cluster to be used. If this service is not defined in your cluster, you can ignore this step.

  10. In the User name field, enter the name of the user having the appropriate right to write data in the cluster. In this example, it is hdfs.

  11. In the Input file URI field, enter the path pointing to the relation you need to read data from. As explained previously, the relation to be read here is the one containing the owner and pet sample data.

  12. In the Field separator field, enter the separator of the data to be read. In this example, it is semicolon (;).

Loading the student-friend sample data

  1. Double-click the second tPigLoad component to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. Click the [+] button twice to add two rows and in the Column column, rename them to student and friend, respectively.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Mode area, select Map/Reduce.

    This component reuses the Hadoop connection you have configured in that main tPigLoad component. Therefore, the Distribution and the Version fields have been automatically filled with the values from that main loading component.

  6. In the Load function field, select the PigStorage function to read the source data.

  7. In the Input file URI field, enter the directory where the source data is stored. As explained previously, this data is from the second relation containing the student and friend sample data.

Aggregating the relations

  1. Double-click tPigCoGroup to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. Click the [+] button five times to add five rows and in the Column column, rename them to owner_friend, age, pet_number, pet and student, respectively.

  4. In the Type column of the age row, select Integer.

  5. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  6. In the Group by table, click the [+] button once to add one row.

  7. Then you need to set the grouping condition in this Group by table to aggregate the two input relations. In each column representing the input relation, click the newly added row and select the column you need to use to compose the grouping condition. In this scenario, the owner column from the owner-pet relation and the friend column from the student-friend relation are selected because they have common records. Based on these columns, the two relations are aggregated into bags.

    The bags regarding the record Alice might read as follow:

    Alice,{(Alice,turtle,17),(Alice,goldfish,17),(Alice,cat,17)},{(Cindy,Alice),(Mark,Alice)}
  8. In the Output mapping table, the output schema you defined previously has been automatically fed into the Column column. You need to complete this table to define how the grouped bags are aggregated into the schema of the output relation. The following list provides more details about how this aggregation is configured for this scenario:

    • The owner_friend column is used to receive the literal records incoming from the columns that are used as the grouping condition. For this reason, select the EMPTY function from the Function drop-down list so that the incoming records stay as is. Then select row1 from the Source schema list and owner from the Expression list to read the records from the corresponding input column; you can as well select row2 and friend, the records to be received are the same because the owner column and the friend column are joined when they are used as grouping condition.

      Note that the label row1 is the ID of the input link and thus may be different in your scenario.

    • The age column is used to receive the age data. As shown in the example bags in the previous step, the age of an owner repetitively appears in one of the bags after the grouping. You can select the AVG function from the Function list to make the average of the repetitive values such that this age appears only once in the final result. Then select row1 from the Source schema list and age from the Expression list.

    • The pet_number column is used to receive how many pets an owner has. Select the COUNT function from the Function list to perform this calculation. Then select row1 from the Source schema list and pet from the Expression list.

    • The pet column and the student column are used to receive the grouped records from the input pet and student columns, respectively. So select EMPTY for both of them and from the Source schema list of each, select the corresponding input schema and from the Expression list, the corresponding column.

Writing the aggregated data

  1. Double-click tPigStoreResult to open its Component view.

  2. If this component does not have the same schema of the preceding component, a warning icon appears. In this case, click the Sync columns button to retrieve the schema from the preceding one and once done, the warning icon disappears.

  3. In the Result folder URI field, enter the path in HDFS pointing to the location you want to write the result in.

  4. Select the Remove result directory if exists check box.

  5. From the Store function list, select PigStorage.

  6. In the Field separator field, enter the separator you want to use. In this scenario, enter a comma (,).

Executing the Job

Then you can press F6 to run this Job.

Once done, check the result from the HDFS system you are using.

You can read, for example, that the pet owner Alice is 17 years old, has 3 pets, a cat, a goldfish and a turtle and two of her friends are Mark and Cindy.