Scenario: analyzing people's activities using a Storm topology - 6.1

Talend Components Reference Guide

EnrichVersion
6.1
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for Data Quality
Talend Open Studio for ESB
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
task
Data Governance
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

In this scenario, a four-component Storm Job (a topology) is created to transmit messages about the activities of some given people to the topology you are designing in order to analyze the popularities of those activities.

This Job subscribes to the related topic created by the Kafka topic producer, which means you need to install the Kafka cluster in your messagingg system to maintain the feeds of messages. For further information about the Kafka messaging service, see Apache's documentation about Kafka.

On the other hand, since this Job runs on top of Storm, you need to ensure that your Storm system is ready for use. For further information about Storm, see Apache's documentation about Storm.

Note that when you use the Storm system installed in the Hortonwork Data Platform 2.1 (HDP2.1), ensure that the Storm DRPC (distributed remote procedure call) servers' names have been properly defined in the Custom storm.yaml section of the Config tab of Storm in Ambari's web console. For example, you need to use two Storm DRPC servers which are Server1 and Server2, then you must define them in the Custom storm.yaml secton as follows: [Server1,Server2].

To replicate this scenario, proceed as follows.

Producing the sample messages

In the real-world practice, the system that produces messages to Kafka is completely decoupled. While in this scenario, Kafka itself is used to produce the sample messages. You need to perform the following operations to produce these messages:

  1. Create the Kafka topic to be used to categorize the messages. The following command is used for demonstration purposes only. If you need further information about the creation of a Kafka topic, see Apache's documentation for Kafka.

    /usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic activities --partitions 1 --replication-factor 1

    This command creates a topic named activities, using the Kafka brokers managed by the Zookeeper service on the localhost machine.

  2. Publish the message you want to analyze to the activities topic you have just created. In this scenario, Kafka is used to perform this publication using, for example, the following command:

    echo 'Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink | /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list  localhost:9092 --topic  activities

    This command publishes 10 simple messages

    Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink

    As explained previously, you can use your actual message producer system instead to perform the publication.

Linking the components

  1. In the Integration perspective of the Studio, create an empty Storm Job from the Job Designs node in the Repository tree view.

    For further information about how to create a Storm Job, see Talend Big Data Getting Started Guide.

  2. In the workspace, enter the name of the component to be used and select this component from the list that appears. In this scenario, the components are tKafkaInput, tJavaStorm, tAggregateRow, and tLogRow.

  3. Connect them using Row > Main links.

Configuring the connection

  1. Click Run to open its view and then click Storm configuration to set up the connection to the Storm system to be used.

  2. In the Storm node roles area, indicate the locations of the Nimbus server and the DRPC endpoint of the Storm cluster to be used:

    • Local mode: select this check box to build the Storm environment within the Studio. In this situation, the Storm Job you are designing is run locally within the Studio and you do not need to define any specific Nimbus and DRPC endpoint addresses.

    • Nimbus host and Port: in these two fields, enter the location of the Storm Nimbus server and its port number, respectively.

    • DRPC endpoint and Port: in these two fields, enter the location of the Storm DRPC endpoint and its port number, respectively.

  3. In the Topology name field, enter the name you want the Storm system to use for the Storm Job, or topology in terms of the Storm, you are designing. It is recommended to use the same name as you have named this Storm Job so that you can easily recognize this Job even within the Storm system.

  4. Select the Kill existing topology check box to make the Storm system stop any topology that has the same name as the Job you are designing and want to run.

    If you clear this check box and that Job of the same name is already running in the Storm system, the current Job will fail when you send it to Storm to run.

  5. Select the Submit topology check box to submit the current Job to the Storm system. This feature is used when you need to send a new topology to Storm or used together with the Kill existing topology feature when you need to update a running topology in the Storm system.

    If you clear this check box, when you run the current Job, the submission of this Job is ignored while the other configuration information is still taken into account, for example, killing an existing topology.

  6. Select the Monitor topology after submission check box to monitor the current Storm Job in the console of the Run view.

    If you clear this check box, you cannot read the monitoring information in the console.

  7. In the Stop monitoring after timeout reached field, enter, without the double quotation marks, the numeric value to indicate whether to stop the monitoring when a running topology reaches its timeout. The default value is -1, which means no timeout is applied.

  8. Select the Kill topology on quitting Talend Job check box to allow the Studio to kill, if it is still running, the current Job from the Storm system when you stop this Job from the Studio.

    If you clear this check box, the topology for this Job continues to run in the Storm system even though you kill it within the Studio or eventually shut down the Studio.

  9. If you need to use any other Storm properties specific to your situation, add them to the Storm configuration table.

Receiving the message from the Kafka channel

  1. Double-click tKafkaInput to open its Component view.

  2. In the Zookeeper host field, enter the location of the Zookeeper service used to coordinate the Kafka cluster to be used.

  3. In the Port field, enter the port number of this Zookeeper service.

  4. In the Topic name field, enter the name of the topic in which you need to receive messages. In this scenario, the topic is activities.

Extracting information from the message

  1. Double-click tJavaStorm to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. On the output side (right), click the [+] button three times to add three rows and in the Column column, rename them to firstname, gender and activity, respectively. These columns correspond to the information you can extract from the sample message.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Bolt code area, enter the main method of the bolt to be executed. In this scenario, the code is as follows:

    String[] tokens = input.get_str().split("\\|");
    collector.emit(new Values(
    		tokens[0],
    		tokens[1],
    		tokens[2]
    	));

Aggregating the extracted information

  1. Double-click tAggregateRow to open its Component view. This component allows you to find out the most popular activity recorded in the received messages.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. On the output side (right), click the [+] button three times to add three rows and in the Column column, rename them to activity, gender and popularity, respectively.

  4. In the Type column of the popularity row of the output side, select Double.

  5. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  6. In the Group by table, add two rows by clicking the [+] button twice and configure these two rows as follows to group the outputted data:

    • Output column: select the columns from the output schema to be used as the conditions to group the outputted data. In this example, they are activity and gender.

    • Input column position: select the columns from the input schema to send data to the output columns you have selected in the Output column column. In this scenario, they are activity and gender.

  7. In the Operations table, add one row by clicking the [+] button once and configure this row as follows to calculate the popularity of each activity:

    • Output column: select the column from the output schema to carry the calculated results. In this scenario, it is popularity.

    • Function: select the function to be used to process the incoming data. In this scenario, select count. It counts the frequency of each activity in the received messages.

    • Input column position: select the column from the input schema to provide the data to be processed. In this scenario, it is activity.

Executing the Job

Then you can run this Job.

The tLogRow component is used to present the execution result of the Job.

Press F6 to run this Job

Once done, the Run view is opened automatically, where you can check the execution result.

You can read that the activity Drink is the most popular with 3 occurrences for the gender M (Male) and 1 occurrence for the gender F (Female) in the messages.

The Storm topology continues to run, waiting for messages to appear on the Kafka message broker until you kill the Job. In this scenario, because the Kill topology on quitting Talend Job check box is selected, the Storm topology will be stopped and removed from the cluster when this Job is stopped.