tKafkaInput - 6.1

Talend Components Reference Guide

EnrichVersion
6.1
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for Data Quality
Talend Open Studio for ESB
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
task
Data Governance
Data Quality and Preparation
Design and Development
EnrichPlatform
Talend Studio

Function

The tKafkaInput component transmits messages you need to process to the components that follow in the Job you are designing.

Purpose

tKafkaInput is a generic message broker that transmits messages to the Job that runs transformations over these messages.

If you have subscribed to one of the Talend solutions with Big Data, this component is available in the following types of Jobs:

  • Standard: see tKafkaInput properties.

  • Spark Streaming: see tKafkaInput properties in Spark Streaming Jobs.

    Warning

    The streaming version of this component is available in the Palette of the studio on the condition that you have subscribed to Talend Real-time Big Data Platform or Talend Data Fabric.

  • Storm: see tKafkaInput properties in Storm Jobs.

    Warning

    The streaming version of this component is available in the Palette of the studio on the condition that you have subscribed to Talend Real-time Big Data Platform or Talend Data Fabric.

tKafkaInput properties

Component family

Internet/Kafka

 

Basic settings

Schema and Edit schema

A schema is a row description. It defines the number of fields (columns) to be processed and passed on to the next component. The schema is either Built-In or stored remotely in the Repository.

Note that the schema of this component is read-only. It stores the messages sent from the message producer.

 

Output type

Select the type of the data to be sent to the next component.

Typically, using String is recommended, because tKafkaInput can automatically translate the Kafka byte[] messages into strings to be processed by the Job. However, in case that the format of Kafka messages is not known to tKafkaInput, such as Protobuf, you can select byte[] and then use a Custom code component such as tJavaRow to deserialize the messages into strings so that the other components of the same Job can process these messages.

 

Use an existing connection

Select this check box and in the Component List click the relevant connection component to reuse the connection details you already defined.

 

Version

Select the version of the Kafka cluster to be used.

 

Zookeeper quorum list

Enter the address of the Zookeeper service of the Kafka cluster to be used.

The form of this address should be hostname:port. This information is the name and the port of the hosting node in this Kafka cluster.

If you need to specify several addresses, separate them using a comma (,).

 

Reset offsets on consumer group

Select this check box to clear the offsets saved for the consumer group to be used so that this consumer group is handled as a new group that has not consumed any messages.

 

New consumer group starts from

Select the starting point from which the messages of a topic are consumed.

In Kafka, the increasing ID number of a message is called offset. When a new consumer group starts, from this list, you can select beginning to start consumption from the oldest message of the entire topic, or select latest to wait for a new message.

Note that the consumer group takes into account only the offset-committed messages to start from.

Each consumer group has its own counter to remember the position of a message it has consumed. For this reason, once a consumer group starts to consume messages of a given topic, a consumer group recognizes the latest message only with regard to the position where this group stops the consumption, rather than to the entire topic. Based on this principle, the following behaviors can be expected:

  • If you are resuming an existing consumer group, this option determines the starting point for this consumer group only if it does not already have a committed starting point. Otherwise, this consumer group starts from this committed starting point. For example, a topic has 100 messages. If an existing consumer group has successfully processed 50 messages, and has committed their offsets, then the same consumer group restarts from the offset 51.

  • If you create a new consumer group or reset an existing consumer group, which, in either case, means this group has not consumed any message of this topic, then when you start it from latest, this new group starts and waits for the offset 101.

 Offset storage

Select the system to which you want to commit the offsets of the consumed messages.

 Enable dual commit

If you select Kafka as the offset storage system, the Enable dual commit check box is displayed. By default it is selected to let the Job commit the messages to both Zookeeper and Kafka. If you want the Job to commit only to Kafka, clear this check box.

 

Auto-commit offsets

Select this check box to make tKafkaInput automatically save its consumption state at the end of each given time interval. You need to define this interval in the Interval field that is displayed.

Note that the offsets are committed only at the end of each interval. If your Job stops in the middle of an interval, the message consumption state within this interval is not committed.

 Topic name

Enter the name of the topic from which tKafkaInput receives the feed of messages.

 

Consumer group ID

Enter the name of the consumer group to which you want the current consumer (the tKafkaInput component) to belong.

This consumer group will be created at runtime if it does not exist at that moment.

 

Stop after a maximum total duration (ms)

Select this check box and in the pop-up field, enter the duration (in milliseconds) at the end of which tKafkaInput stops running.

 

Stop after receiving a maximum number of messages

Select this check box and in the pop-up field, enter the maximum number of messages you want tKafkaInput to receive before it automatically stops running.

 

Stop after maximum time waiting between messages (ms)

Select this check box and in the pop-up field, enter the waiting time (in milliseconds) by tKafkaInput for a new message. If tKafkaInput does not receive any new message when this waiting time meets its end, it automatically stops running.

Advanced settings

Kafka properties

Add the Kafka consumer properties you need to customize to this table. For example, you can set a specific zookeeper.connection.timeout.ms value to avoid ZkTimeoutException.

For further information about the consumer properties you can define in this table, see the section describing the consumer configuration in Kafka's documentation in http://kafka.apache.org/documentation.html#consumerconfigs.

 

Timeout precision(ms)

Enter the time duration in millisecond at the end of which you want a timeout exception to be returned if no message is available for consumption.

The value -1 indicates that no timeout is set.

 

Load the offset with the message

Select this check box to output the offsets of the consumed messages to the next component. When selecting it, a read-only column called offset is added to the schema.

 

Custom encoding

You may encounter encoding issues when you process the stored data. In that situation, select this check box to display the Encoding list.

Select the encoding from the list or select Custom and define it manually.

 

tStatCatcher Statistics

Select this check box to gather the processing metadata at the Job level as well as at each component level.

Usage

This component is used as a start component and requires an output link. When the Kafka topic it needs to use does not exist, it can be used along with the tKafkaCreateTopic component to read the topic created by the latter component.

Related scenarios

No scenario is available for this component yet.

tKafkaInput properties in Spark Streaming Jobs

Component family

Messaging/Kafka

 

Basic settings

Schema and Edit schema

A schema is a row description. It defines the number of fields (columns) to be processed and passed on to the next component. The schema is either Built-In or stored remotely in the Repository.

Note that the schema of this component is read-only. It stores the message body sent from the message producer.

 

Broker list

Enter the addresses of the broker nodes of the Kafka cluster to be used.

The form of this address should be hostname:port. This information is the name and the port of the hosting node in this Kafka cluster.

If you need to specify several addresses, separate them using a comma (,).

 

Starting offset

Select the starting point from which the messages of a topic are consumed.

In Kafka, the sequential ID number of a message is called offset. From this list, you can select From beginning to start consumption from the oldest message of the entire topic, or select From latest to start from the latest message that has been consumed by the same consumer group and of which the offset is tracked by Spark within Spark checkpoints.

Note that in order to enable the component to remember the position of a consumed message, you need to activate the Spark Streaming checkpointing in the Spark Configuration tab in the Run view of the Job.

Each consumer group has its own counter to remember the position of a message it has consumed. For this reason, once a consumer group starts to consume messages of a given topic, a consumer group recognizes the latest message only with regard to the position where this group stops the consumption, rather than to the entire topic. Based on this principle, the following behaviors can be expected:

  • A topic has for example 100 messages. If a consumer group has stopped the consumption at the message of the offset 50, then when you select From latest, the same consumer group restarts from the offset 51.

  • If you create a new consumer group or reset an existing consumer group, which, in either case, means this group has not consumed any message of this topic, then when you start it from latest, this new group starts and waits for the offset 101.

 Topic name

Enter the name of the topic from which tKafkaInput receives the feed of messages.

 

Set number of records per second to read from each Kafka partition

Enter this number within double quotation marks to limit the size of each batch to be sent for processing.

For example, if you put 100 and the batch value you define in the Spark configuration tab is 2 seconds, the size from a partition for each batch is 200 messages.

If you leave this check box clear, the component tries to read all the available messages in one second into one single batch before sending it, potentially resulting in Job hanging in case of a huge quantity of messages.

Advanced settings

Kafka properties

Add the Kafka consumer properties you need to customize to this table. For example, you can set a specific zookeeper.connection.timeout.ms value to avoid ZkTimeoutException.

For further information about the consumer properties you can define in this table, see the section describing the consumer configuration in Kafka's documentation in http://kafka.apache.org/documentation.html#consumerconfigs.

 

Encoding

Select the encoding from the list or select Custom and define it manually.

This encoding is used by tKafkaInput to decode the input messages.

Usage in Spark Streaming Jobs

In a Talend Spark Streaming Job, it is used as a start component and requires an output link. The other components used along with it must be Spark Streaming components, too. They generate native Spark code that can be executed directly in a Spark cluster.

This component, along with the Spark Streaming component Palette it belongs to, appears only when you are creating a Spark Streaming Job.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents only Standard Jobs, that is to say traditional Talend data integration Jobs.

In the implementation of the current component in Spark, the Kafka offsets are automatically managed by Spark itself, that is to say, instead of being committed to Zookeeper or Kafka, the offsets are tracked within Spark checkpoints. For further information about this implementation, see the Direct approach section in the Spark documentation: http://spark.apache.org/docs/latest/streaming-kafka-integration.html.

Spark Connection

You need to use the Spark Configuration tab in the Run view to define the connection to a given Spark cluster for the whole Job. In addition, since the Job expects its dependent jar files for execution, one and only one file system related component from the Storage family is required in the same Job so that Spark can use this component to connect to the file system to which the jar files dependent on the Job are transferred:

This connection is effective on a per-Job basis.

Log4j

If you are using a subscription-based version of the Studio, the activity of this component can be logged using the log4j feature. For more information on this feature, see Talend Studio User Guide.

For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.

Analyzing a Twitter flow in near real-time

In this scenario, you create a Spark Streaming Job to analyze, at the end of each 15-second interval, which hashtags are most used by Twitter users when they mention Paris in their Tweets over the previous 20 seconds.

An open source third-party program is used to receive and write Twitter streams in a given Kafka topic, twitter_live for example, and the Job you design in this scenario is used to consume the Tweets from this topic.

A row of Twitter raw data with hashtags reads like the example presented at https://dev.twitter.com/overview/api/entities-in-twitter-objects#hashtags.

Before replicating this scenario, you need to ensure that your Kafka system is up and running and you have proper rights and permissions to access the Kafka topic to be used. You also need a Twitter-streaming program to transfer Twitter streams into Kafka in near real-time. Talend does not provide this kind of program but some free programs created for this purpose are available in some online communities such as Github.

To replicate this scenario, proceed as follows:

Linking the components

  1. In the Integration perspective of the Studio, create an empty Spark Streaming Job from the Job Designs node in the Repository tree view.

    For further information about how to create a Spark Streaming Job, see Talend Big Data Getting Started Guide.

  2. In the workspace, enter the name of the component to be used and select this component from the list that appears. In this scenario, the components are tHDFSConfiguration, tKafkaInput, tWindow, tExtractJSONFields, tMap, tAggregateRow, tTop and tLogRow.

  3. Connect tKafkaInput, tWindow, tExtractJSONFields and tMap using the Row > Main link.

  4. Connect tMap to tAggregateRow using the Row > Main link and name this connection in the dialog box that is displayed. For example, name it to hashtag.

  5. Connect tAggregateRow, tTop and tLogRow using the Row > Main link.

  6. Leave the tHDFSConfiguration component alone without any connection.

Setting up Spark connection

  1. Click Run to open its view and then click the Spark Configuration tab to display its view for configuring the Spark connection.

    This view looks like the image below:

  2. Select the type of the Spark cluster you need to connect to.

    • Local: the Studio builds the Spark environment in itself at runtime to run the Job locally within the Studio. With this mode, each processor of the local machine is used as a Spark worker to perform the computations. This mode requires minimum parameters to be set in this configuration view.

      Note this local machine is the machine in which the Job is actually run. The Local mode is the default mode and you need to clear its check box to display the drop-down list for you to select the other modes.

    • Standalone: the Studio connects to a Spark-enabled cluster to run the Job from this cluster.

    • Yarn client: the Studio runs the Spark driver to orchestrate how the Job should be performed and then send the orchestration to the Yarn service of a given Hadoop cluster so that the Resource Manager of this Yarn service requests execution resources accordingly.

  3. If you are using the Yarn client mode, the Property type list is displayed to allow you to select an established Hadoop connection from the Repository, on the condition that you have created this connection in the Repository. Then the Studio will reuse that set of connection information for this Job.

    For further information about how to create an Hadoop connection in Repository, see the chapter describing the Hadoop cluster node of the Talend Studio User Guide.

  4. Select the version of the Hadoop distribution you are using along with Spark.

    If you cannot find the distribution corresponding to yours from this drop-down list, this means the distribution you want to connect to is not officially supported by Talend. In this situation, you can select Custom, then select the Spark version of the cluster to be connected and click the button to display the dialog box in which you can alternatively:

    1. Select Import from existing version to import an officially supported distribution as base and then add other required jar files which the base distribution does not provide.

    2. Select Import from zip to import the configuration zip for the custom distribution to be used. This zip file should contain the libraries of the different Hadoop/Spark elements and the index file of these libraries.

      Note that custom versions are not officially supported by Talend. Talend and its community provide you with the opportunity to connect to custom versions from the Studio but cannot guarantee that the configuration of whichever version you choose will be easy. As such, you should only attempt to set up such a connection if you have sufficient Hadoop and Spark experience to handle any issues on your own.

  5. Configure the connection information to the principal services of the cluster to be used.

    If you are using the Yarn client mode, you need to enter the addresses of the following different services in their corresponding fields (if you leave the check box of a service clear, then at runtime, the configuration about this parameter in the Hadoop cluster to be used will be ignored ):

    • In the Resource manager field, enter the address of the ResourceManager service of the Hadoop cluster to be used.

    • Select the Set resourcemanager scheduler address check box and enter the Scheduler address in the field that appears.

    • Select the Set jobhistory address check box and enter the location of the JobHistory server of the Hadoop cluster to be used. This allows the metrics information of the current Job to be stored in that JobHistory server.

    • Select the Set staging directory check box and enter this directory defined in your Hadoop cluster for temporary files created by running programs. Typically, this directory can be found under the yarn.app.mapreduce.am.staging-dir property in the configuration files such as yarn-site.xml or mapred-site.xml of your distribution.

    • Select the Set memory check box to allocate proper memory volumes to the Map and the Reduce computations and the ApplicationMaster of YARN.

    • If you are accessing the Hadoop cluster running with Kerberos security, select this check box, then, enter the Kerberos principal names for the ResourceManager service and the JobHistory service in the displayed fields. This enables you to use your user name to authenticate against the credentials stored in Kerberos. These principals can be found in the configuration files of your distribution. For example, in a CDH4 distribution, the Resource manager principal is set in the yarn-site.xml file and the Job history principal in the mapred-site.xml file.

      If you need to use a Kerberos keytab file to log in, select Use a keytab to authenticate. A keytab file contains pairs of Kerberos principals and encrypted keys. You need to enter the principal to be used in the Principal field and the access path to the keytab file itself in the Keytab field.

      Note that the user that executes a keytab-enabled Job is not necessarily the one a principal designates but must have the right to read the keytab file being used. For example, the user name you are using to execute a Job is user1 and the principal to be used is guest; in this situation, ensure that user1 has the right to read the keytab file to be used.

    • The User name field is available when you are not using Kerberos to authenticate. In the User name field, enter the login user name for your distribution. If you leave it empty, the user name of the machine hosting the Studio will be used.

      Since the Job needs to upload jar files to HDFS of the cluster to be used, you must ensure that this user name is the same as the one you have put in tHDFSConfiguration, the component used to provides HDFS connection information to Spark.

    If you are using the Standalone mode, you need to set the following parameters:

    • In the Spark host field, enter the URI of the Spark Master of the Hadoop cluster to be used.

    • In the Spark home field, enter the location of the Spark executable installed in the Hadoop cluster to be used.

  6. If you need to run the current Job on Windows, it is recommended to specify where the winutils.exe program to be used is stored.

    • If you know where to find your winutils.exe file and you want to use it, select the Define the Hadoop home directory check box and enter the directory where your winutils.exe is stored.

    • Otherwise, leave this check box clear, the Studio generates one by itself and automatically uses it for this Job.

  7. If the Spark cluster cannot recognize the machine in which the Job is launched, select this Define the driver hostname or IP address check box and enter the host name or the IP address of this machine. This allows the Spark master and its workers to recognize this machine to find the Job and thus its driver.

    Note that in this situation, you also need to add the name and the IP address of this machine to its host file.

  8. In the Batch size field, enter the time interval at the end of which the Job reviews the source data to identify changes and processes the new micro batches.

  9. If needs be, select the Define a streaming timeout check box and in the field that is displayed, enter the time frame at the end of which the streaming Job automatically stops running.

  10. If you need the Job to be resilient to failure, select the Activate checkpointing check box to enable the Spark checkpointing operation. In the field that is displayed, you need to enter the directory in which Spark stores, in the file system of the cluster, the context data of the streaming computation such as the metadata and the generated RDDs of this computation.

    For further information about the Spark checkpointing operation, see http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing.

  11. Select the Set Tuning properties check box to optimize the allocation of the resources to be used to run this Job. These properties are not mandatory for the Job to run successfully, but they are useful when Spark is bottlenecked by any resource issue in the cluster such as CPU, bandwidth or memory:

    • Driver memory and Driver core: enter the allocation size of memory and the number of cores to be used by the driver of the current Job.

    • Executor memory: enter the allocation size of memory to be used by each Spark executor.

    • Core per executor: select this check box and in the displayed field, enter the number of cores to be used by each executor. If you leave this check box clear, the default allocation defined by Spark is used, for example, all available cores are used by one single executor in the Standalone mode.

    • Set Web UI port: if you need to change the default port of the Spark Web UI, select this check box and enter the port number you want to use.

    • Broadcast factory: select the broadcast implementation to be used to cache variables on each worker machine.

    • Customize Spark serializer: if you need to import an external Spark serializer, select this check box and in the field that is displayed, enter the fully qualified class name of the serializer to be used.

    • Yarn resource allocation: select how you want Yarn to allocate resources among executors.

      • Auto: you leave Yarn to manage the allocation by itself.

      • Fixed: you need to enter the number of executors to be used in the Num executors that is displayed.

      • Dynamic: Yarn adapts the number of executors to suit the workload. You need to define the scale of this dynamic allocation by defining the initial number of executors to run in the Initial executors field, the lowest number of executors in the Min executors field and the largest number of executors in the Max executors field.

      This feature is available to the Yarn client mode only.

  12. In the Spark "scratch" directory field, enter the directory in which the Studio stores in the local system the temporary files such as the jar files to be transferred. If you launch the Job on Windows, the default disk is C:. So if you leave /tmp in this field, this directory is C:/tmp.

  13. Add any Spark properties you need to use to override their default counterparts used by the Studio.

    For example, if you are using the Yarn client mode with a CDH distribution, you need to specify the Yarn classpath of your cluster for the Job. The property to be added is spark.hadoop.yarn.application.classpath. Please contact the administrator of your cluster to obtain related information.

    The following value from a Cloudera cluster is presented for demonstration purposes:

    /etc/hadoop/conf,/usr/lib/hadoop/*,/usr/lib/hadoop/lib/*,/usr/lib/hadoop-hdfs/*,/usr/lib/hadoop-hdfs/lib/*,/usr/lib/hadoop-mapreduce/*,/usr/lib/hadoop-mapreduce/lib/*,/usr/lib/hadoop-yarn/*,/usr/lib/hadoop-yarn/lib/*,/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/*

    If you want the Spark application logs of this Job to be persistent in the file system, add the related properties to this Advanced properties table. For example, the properties to be set for the Yarn client mode are:

    • spark.yarn.historyServer.address

    • spark.eventLog.enabled

    • spark.eventLog.dir

    The value of the spark.eventlog.enabled property should be true; for the values of the other two properties, contact the administrator of the Spark cluster to be used.

    For further information about the valid Spark properties, see Spark documentation at https://spark.apache.org/docs/latest/configuration.

Configuring the connection to the file system to be used by Spark

  1. Double-click tHDFSConfiguration to open its Component view. Note that tHDFSConfiguration is used because the Spark Yarn client mode is used to run Spark Jobs in this scenario.

    Spark uses this component to connect to the HDFS system to which the jar files dependent on the Job are transferred.

  2. In the Version area, select the Hadoop distribution you need to connect to and its version.

  3. In the NameNode URI field, enter the location of the machine hosting the NameNode service of the cluster.

  4. In the Username field, enter the authentication information used to connect to the HDFS system to be used. Note that the user name must be the same as you have put in the Spark configuration tab.

Reading messages from a given Kafka topic

  1. Double-click tKafkaInput to open its Component view.

  2. In the Broker list field, enter the locations of the brokers of the Kafka cluster to be used, separating these locations using comma (,). In this example, only one broker exists and its location is localhost:9092.

  3. From the Starting offset drop-down list, select the starting point from which the messages of a topic are consumed. In this scenario, select From latest, meaning to start from the latest message that has been consumed by the same consumer group and of which the offset has been committed.

  4. In the Topic name field, enter the name of the topic from which this Job consumes Twitter streams. In this scenario, the topic is twitter_live.

    This topic must exist in your Kafka system. For further information about how to create a Kafka topic, see the documentation from Apache Kafka or use the tKafkaCreateTopic component provided with the Studio. But note that tKafkaCreateTopic is not available to the Spark Jobs.

  5. Select the Set number of records per second to read from each Kafka partition check box. This limits the size of each micro batch to be sent for processing.

Configuring how frequent the Tweets are analyzed

  1. Double-click tWindow to open its Component view.

    This component is used to apply a Spark window on the input RDD so that this Job always analyzes the Tweets of the last 20 seconds at the end of each 15 seconds. This creates, between every two window applications, the overlap of one micro batch, counting 5 seconds as defined in the Batch size field in the Spark configuration tab.

  2. In the Window duration field, enter 20000, meaning 20 seconds.

  3. Select the Define the slide duration check box and in the field that is displayed, enter 15000, meaning 15 seconds.

The configuration of the window is then displaed above the icon of tWindow in the Job you are designing.

Extracting the hashtag field from the raw Tweet data

  1. Double-click tExtractJSONFields to open its Component view.

    As you can read from https://dev.twitter.com/overview/api/entities-in-twitter-objects#hashtags, the raw Tweet data uses the JSON format.

  2. Click Sync columns to retrieve the schema from its preceding component. This is actually the read-only schema of tKafkaInput, since tWindow does not impact the schema.

  3. Click the [...] button next to Edit schema to open the schema editor.

  4. Rename the single column of the output schema to hashtag. This column is used to carry the hashtag field extracted from the Tweet JSON data.

  5. Click OK to validate these changes.

  6. From the Read by list, select JsonPath.

  7. From the JSON field list, select the column of the input schema from which you need to extract fields. In this scenario, it is payload.

  8. In the Loop Jsonpath query field, enter JSON path pointing to the element over which extraction is looped. According to the JSON structure of a Tweet as you can read from the documentation of Twitter, enter $.entities.hashtags to loop over the hashtags entity.

  9. In the Mapping table, in which the hashtag column of the output schema has been filled in automatically, enter the element on which the extraction is performed. In this example, this is the text attribute of each hashtags entity. Therefore, enter text within double quotation marks in the Json query column.

Aligning each hashtag to lower case

  1. Double-click tMap to open its Map editor.

  2. In the table representing the output flow (on the right side), enter StringHandling.DOWNCASE(row2.hashtag) in the Expression column. This automatically creates the map between the hashtag column of the input schema and the hashtag column of the output schema.

    Note that row2 in this expression is the ID of the input link to tMap. It can be labeled differently in the Job you are designing.

  3. Click Apply to validate these changes and click OK to close this editor.

Counting the occurrences of each hashtag

  1. Double-click tAggregateRow to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. On the output side, click the [+] button two times to add two rows to the output schema table and rename these new schema columns to hashtag and count, respectively.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Group by table, add one row by clicking the [+] button and select hashtag for both the Output column column and the Input column position column. This passes data from the hashtag column of the input schema to the hashtag column of the output schema.

  6. In the Operations table, add one row by clicking the [+] button.

  7. In the Output column column, select count, in the Function column, select count and in the Input column position column, select hashtag.

Selecting the 5 most used hashtags in each 20 seconds

  1. Double-click tTop to open its Component view.

  2. In the Number of line selected field, enter the number of rows to be output to the next component, counting down from the first row of the data sorted by tTop. In this example, it is 5, meaning the 5 most used hashtags in each 20 seconds.

  3. In the Criteria table, add one row by clicking the [+] button.

  4. In the Schema column column, select count, the column for which the data is sorted, in the sort num or alpha column, select num, which means the data to be sorted are numbers, and in the Order asc or desc column, select desc to arrange the data in descending order.

Executing the Job

Then you can run this Job.

The tLogRow component is used to present the execution result of the Job.

  1. Ensure that your Twitter streaming program is still running and keep writing the received Tweets into the given topic.

  2. Press F6 to run this Job.

Leave the Job running a while and then in the console of the Run view, you can read the Job is listing the 5 most used hashtags in each batch of Tweets mentioning Paris. According to the configuration of the size of each micro batch and the Spark window, each of these Tweet batches contains the last 20 seconds' worth of Tweets received at the end of each 15-second interval.

Note that you can manage the level of the execution information to be outputted in this console by selecting the log4jLevel check box in the Advanced settings tab and then selecting the level of the information you want to display.

For more information on the log4j logging levels, see the Apache documentation at http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html.

tKafkaInput properties in Storm Jobs

Component family

Messaging/Input

 

Basic settings

Schema and Edit schema

A schema is a row description. It defines the number of fields (columns) to be processed and passed on to the next component. The schema is either Built-In or stored remotely in the Repository.

Note that the schema of this component is read-only. It stores the messages sent from the message producer.

 

Zookeeper host

Enter the address of the Zookeeper service of the Kafka system to be used.

 

Port

Enter the number of the client listening port of the Zookeeper service to be used.

  Topic name Enter the name of the topic from which tKafkaInput receives the feed of messages.

Usage

In a Talend Storm Job, it is used as a start component. The other components used along with it must be Storm components, too. They generate native Storm code that can be executed directly in a Storm system.

For further information about a Talend Storm Job, see the sections describing how to create and configure a Talend Storm Job of the Talend Big Data Getting Started Guide.

Note that in this documentation, unless otherwise explicitly stated, a scenario presents only Standard Jobs, that is to say traditional Talend data integration Jobs.

Storm Connection

You need to use the Storm Configuration tab in the Run view to define the connection to a given Storm system for the whole Job.

This connection is effective on a per-Job basis.

Scenario: analyzing people's activities using a Storm topology

In this scenario, a four-component Storm Job (a topology) is created to transmit messages about the activities of some given people to the topology you are designing in order to analyze the popularities of those activities.

This Job subscribes to the related topic created by the Kafka topic producer, which means you need to install the Kafka cluster in your messagingg system to maintain the feeds of messages. For further information about the Kafka messaging service, see Apache's documentation about Kafka.

On the other hand, since this Job runs on top of Storm, you need to ensure that your Storm system is ready for use. For further information about Storm, see Apache's documentation about Storm.

Note that when you use the Storm system installed in the Hortonwork Data Platform 2.1 (HDP2.1), ensure that the Storm DRPC (distributed remote procedure call) servers' names have been properly defined in the Custom storm.yaml section of the Config tab of Storm in Ambari's web console. For example, you need to use two Storm DRPC servers which are Server1 and Server2, then you must define them in the Custom storm.yaml secton as follows: [Server1,Server2].

To replicate this scenario, proceed as follows.

Producing the sample messages

In the real-world practice, the system that produces messages to Kafka is completely decoupled. While in this scenario, Kafka itself is used to produce the sample messages. You need to perform the following operations to produce these messages:

  1. Create the Kafka topic to be used to categorize the messages. The following command is used for demonstration purposes only. If you need further information about the creation of a Kafka topic, see Apache's documentation for Kafka.

    /usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic activities --partitions 1 --replication-factor 1

    This command creates a topic named activities, using the Kafka brokers managed by the Zookeeper service on the localhost machine.

  2. Publish the message you want to analyze to the activities topic you have just created. In this scenario, Kafka is used to perform this publication using, for example, the following command:

    echo 'Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink | /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list  localhost:9092 --topic  activities

    This command publishes 10 simple messages

    Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink

    As explained previously, you can use your actual message producer system instead to perform the publication.

Linking the components

  1. In the Integration perspective of the Studio, create an empty Storm Job from the Job Designs node in the Repository tree view.

    For further information about how to create a Storm Job, see Talend Big Data Getting Started Guide.

  2. In the workspace, enter the name of the component to be used and select this component from the list that appears. In this scenario, the components are tKafkaInput, tJavaStorm, tAggregateRow, and tLogRow.

  3. Connect them using Row > Main links.

Configuring the connection

  1. Click Run to open its view and then click Storm configuration to set up the connection to the Storm system to be used.

  2. In the Storm node roles area, indicate the locations of the Nimbus server and the DRPC endpoint of the Storm cluster to be used:

    • Local mode: select this check box to build the Storm environment within the Studio. In this situation, the Storm Job you are designing is run locally within the Studio and you do not need to define any specific Nimbus and DRPC endpoint addresses.

    • Nimbus host and Port: in these two fields, enter the location of the Storm Nimbus server and its port number, respectively.

    • DRPC endpoint and Port: in these two fields, enter the location of the Storm DRPC endpoint and its port number, respectively.

  3. In the Topology name field, enter the name you want the Storm system to use for the Storm Job, or topology in terms of the Storm, you are designing. It is recommended to use the same name as you have named this Storm Job so that you can easily recognize this Job even within the Storm system.

  4. Select the Kill existing topology check box to make the Storm system stop any topology that has the same name as the Job you are designing and want to run.

    If you clear this check box and that Job of the same name is already running in the Storm system, the current Job will fail when you send it to Storm to run.

  5. Select the Submit topology check box to submit the current Job to the Storm system. This feature is used when you need to send a new topology to Storm or used together with the Kill existing topology feature when you need to update a running topology in the Storm system.

    If you clear this check box, when you run the current Job, the submission of this Job is ignored while the other configuration information is still taken into account, for example, killing an existing topology.

  6. Select the Monitor topology after submission check box to monitor the current Storm Job in the console of the Run view.

    If you clear this check box, you cannot read the monitoring information in the console.

  7. In the Stop monitoring after timeout reached field, enter, without the double quotation marks, the numeric value to indicate whether to stop the monitoring when a running topology reaches its timeout. The default value is -1, which means no timeout is applied.

  8. Select the Kill topology on quitting Talend Job check box to allow the Studio to kill, if it is still running, the current Job from the Storm system when you stop this Job from the Studio.

    If you clear this check box, the topology for this Job continues to run in the Storm system even though you kill it within the Studio or eventually shut down the Studio.

  9. If you need to use any other Storm properties specific to your situation, add them to the Storm configuration table.

Receiving the message from the Kafka channel

  1. Double-click tKafkaInput to open its Component view.

  2. In the Zookeeper host field, enter the location of the Zookeeper service used to coordinate the Kafka cluster to be used.

  3. In the Port field, enter the port number of this Zookeeper service.

  4. In the Topic name field, enter the name of the topic in which you need to receive messages. In this scenario, the topic is activities.

Extracting information from the message

  1. Double-click tJavaStorm to open its Component view.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. On the output side (right), click the [+] button three times to add three rows and in the Column column, rename them to firstname, gender and activity, respectively. These columns correspond to the information you can extract from the sample message.

  4. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  5. In the Bolt code area, enter the main method of the bolt to be executed. In this scenario, the code is as follows:

    String[] tokens = input.get_str().split("\\|");
    collector.emit(new Values(
    		tokens[0],
    		tokens[1],
    		tokens[2]
    	));

Aggregating the extracted information

  1. Double-click tAggregateRow to open its Component view. This component allows you to find out the most popular activity recorded in the received messages.

  2. Click the [...] button next to Edit schema to open the schema editor.

  3. On the output side (right), click the [+] button three times to add three rows and in the Column column, rename them to activity, gender and popularity, respectively.

  4. In the Type column of the popularity row of the output side, select Double.

  5. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.

  6. In the Group by table, add two rows by clicking the [+] button twice and configure these two rows as follows to group the outputted data:

    • Output column: select the columns from the output schema to be used as the conditions to group the outputted data. In this example, they are activity and gender.

    • Input column position: select the columns from the input schema to send data to the output columns you have selected in the Output column column. In this scenario, they are activity and gender.

  7. In the Operations table, add one row by clicking the [+] button once and configure this row as follows to calculate the popularity of each activity:

    • Output column: select the column from the output schema to carry the calculated results. In this scenario, it is popularity.

    • Function: select the function to be used to process the incoming data. In this scenario, select count. It counts the frequency of each activity in the received messages.

    • Input column position: select the column from the input schema to provide the data to be processed. In this scenario, it is activity.

Executing the Job

Then you can run this Job.

The tLogRow component is used to present the execution result of the Job.

Press F6 to run this Job

Once done, the Run view is opened automatically, where you can check the execution result.

You can read that the activity Drink is the most popular with 3 occurrences for the gender M (Male) and 1 occurrence for the gender F (Female) in the messages.

The Storm topology continues to run, waiting for messages to appear on the Kafka message broker until you kill the Job. In this scenario, because the Kill topology on quitting Talend Job check box is selected, the Storm topology will be stopped and removed from the cluster when this Job is stopped.