tKafkaInput - 6.1

Composants Talend Guide de référence

EnrichVersion
6.1
EnrichProdName
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Data Integration
Talend Data Management Platform
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for Big Data
Talend Open Studio for Data Integration
Talend Open Studio for Data Quality
Talend Open Studio for ESB
Talend Open Studio for MDM
Talend Real-Time Big Data Platform
task
Création et développement
Gouvernance de données
Qualité et préparation de données
EnrichPlatform
Studio Talend

Fonction

Le composant tKafkaInput transmet des messages à traiter aux composants qui suivent dans le Job que vous créez.

Objectif

Le tKafkaInput est un broker de messages générique transmettant des messages au Job exécutant les transformations sur ces messages.

Si vous avez souscrit à l'une des solutions Big Data de Talend, ce composant est disponible dans les types de Job suivants :

Propriétés du tKafkaInput

Famille du composant

Internet/Kafka

 

Basic settings

Schema et Edit schema

Un schéma est une description de lignes, il définit le nombre de champs (colonnes) qui sont traités et passés au composant suivant. Le schéma est soit local (Built-In), soit distant dans le Repository.

Notez que le schéma de ce composant est en lecture seule. Il stocke les messages envoyés du producteur de messages.

 

Output type

Sélectionnez le type de données à envoyer au composant suivant.

De manière générale, il est recommandé d'utiliser des types String, car le tKafkaInput peut traduire automatiquement les messages Kafka byte[] en chaînes de caractères à traiter par le Job. Cependant, si ce format de messages Kafka n'est pas connu par le tKafkaInput, comme le Protobuf, vous pouvez sélectionner byte[] et utiliser un composant Custom code comme le tJavaRow afin de désérialiser les messages en chaînes de caractères afin que les autres composants du même Job puissent traiter ces messages.

 

Use an existing connection

Cochez cette case et sélectionnez le composant de connexion adéquat à partir de la liste Component List pour réutiliser les paramètres d'une connexion que vous avez déjà définie.

 

Version

Sélectionnez la version du cluster Kafka à utiliser.

 

Zookeeper quorum list

Saisissez l'adresse du service Zookeeper pour le cluster Kafka à utiliser.

L'adresse doit se présenter sous la forme suivante : hostname:port. Ces informations contiennent le nom et le port du nœud hébergeant dans le cluster Kafka.

Si vous devez spécifier plusieurs adresses, séparez-les à l'aide d'une virgule (,).

 

Reset offsets on consumer group

Cochez cette case pour supprimer les offsets sauvegardés pour le groupe de consommateurs à utiliser, afin que ce groupe de consommateurs soit géré comme un nouveau groupe n'ayant consommé aucun message.

 

New consumer group starts from

Sélectionnez le point de départ duquel les messages d'un sujet sont consommés.

Dans Kafka, le numéro d'ID augmentant d'un message se nomme offset. Lorsqu'un nouveau groupe de consommateurs démarre, dans cette liste, vous pouvez sélectionner beginning pour commencer la consommation depuis le message le plus ancien du sujet entier ou sélectionner latest pour attendre un nouveau message.

Notez que le groupe de consommateurs prend en compte uniquement les messages dont l'offset a été commité comme point de départ.

Chaque groupe de consommateurs possède son propre compteur pour se rappeler la position d'un message consommé. Pour cette raison, une fois qu'un groupe de consommateurs à commencé à consommer des messages d'un sujet donné, un groupe de consommateurs reconnaît le message le plus récent en voyant simplement la position où son groupe a arrête la consommation, plutôt que le sujet complet. Partant de ce principe, les comportements suivants peuvent être attendus :

  • Si vous reprenez un groupe de consommateurs existant, cette option détermine le point de départ de ce groupe de consommateur uniquement s'il n'a pas déjà de point de départ commité. Sinon, ce groupe de consommateurs démarre du point de départ commité. Par exemple, un sujet contient 100 messages. Si un groupe de consommateurs existant a traité 50 messages et a commité leurs offsets, le même groupe de consommateurs reprend à partir de l'offset 51.

  • si vous créez un nouveau groupe de consommateurs ou en réinitialisez un existant, ce qui signifie que ce groupe n'a consommé aucun message de ce sujet, lorsque vous le démarrez depuis le dernier message, ce nouveau groupe démarre et attend l'offset 101.

 Offset storage

Sélectionnez le système dans lequel vous souhaitez commiter les offsets des messages consommés.

 Enable dual commit

Si vous sélectionnez Kafka comme système de stockage des offsets, la case Enable dual commit est disponible. Elle est cochée par défaut pour permettre au Job de commiter les messages dans Zookeeper et Kafka. Si vous souhaitez que le Job commite uniquement dans Kafka, décochez cette case.

 

Auto-commit offsets

Cochez cette case afin de permettre au tKafkaInput de sauvegarder automatiquement son état de consommation à la fin de chaque intervalle de temps donné. Vous devez définir cet intervalle dans le champ Interval affiché.

Notez que les offsets sont commités uniquement à la fin de chaque intervalle. Si votre Job s'arrête au milieu d'un intervalle, l'était de consommation du message dans cet intervalle n'est pas commité.

 Topic name

Saisissez le nom du sujet depuis lequel le tKafkaInput reçoit le flux de messages.

 

Consumer group ID

Saisissez le nom du groupe de consommateurs auquel vous souhaitez que le consommateur courant (le tKafkaInput) appartienne.

Ce groupe de consommateurs sera crééé lors de l'exécution s'il n'existe pas.

 

Stop after a maximum total duration (ms)

Cochez cette case et, dans le champ qui s'affiche, saisissez la durée (en millisecondes) à la fin de laquelle le tKafkaInput arrête de s'exécuter.

 

Stop after receiving a maximum number of messages

Cochez cette case et, dans le champ qui s'affiche, saisissez le nombre maximal de messages que vous souhaitez que le tKafkaInput reçoive, avant d'arrêter automatiquement son exécution.

 

Stop after maximum time waiting between messages (ms)

Cochez cette case et, dans le champ qui s'affiche, saisissez le temps (en millisecondes) durant lequel le tKafkaInput attend un nouveau message. Si le tKafkaInput ne reçoit pas de nouveau message lorsque ce temps d'attente est écoulé, il arrête de s'exécuter.

Advanced settings

Kafka properties

Ajoutez les propriétés de consommation Kafka nécessaires pour personnaliser cette table. Par exemple, configurez une valeur spécifique zookeeper.connection.timeout.ms pour éviter l'exception ZkTimeoutException.

Pour plus d'information concernant les propriétés de consommation à définir dans cette table, consultez la section décrivant la configuration du consommateur dans la documentation Kafka, à l'adresse suivante : http://kafka.apache.org/documentation.html#consumerconfigs (en anglais).

 

Timeout precision(ms)

Saisissez, en millisecondes, la durée à la suite de laquelle vous souhaitez retourner une exception de suspension si aucun message n'est disponible à la consommation.

La valeur -1 indique qu'aucune suspension n'est configurée.

 

Load the offset with the message

Cochez cette case pour écrire en sortie les offsets des messages consommés au composant suivant. Lorsque vous cochez cette case, une colonne offset en lecture seule est ajoutée au schéma.

 

Custom encoding

Il est possible de rencontrer des problèmes d'encodage lorsque vous traitez les données stockées. Dans ce cas, cochez cette case pour afficher la liste Encoding.

Sélectionnez l'encodage à partir de la liste ou sélectionnez Custom et définissez-le manuellement.

 

tStatCatcher Statistics

Cochez cette case pour collecter les métadonnées de traitement au niveau du Job ainsi qu'au niveau de chaque composant.

Utilisation

Ce composant est utilisé en tant que composant d'entrée et nécessite un lien de sortie. Lorsque le sujet Kafka à utiliser n'existe pas, il peut être utilisé avec le composant tKafkaCreateTopic pour lire le sujet créé par ce dernier.

Scénario associé

Aucun scénario n'est disponible pour ce composant.

Propriétés du tKafkaInput dans des Jobs Spark Streaming

Famille du composant

Messaging/Kafka

 

Basic settings

Schema et Edit schema

Un schéma est une description de lignes, il définit le nombre de champs (colonnes) qui sont traités et passés au composant suivant. Le schéma est soit local (Built-In), soit distant dans le Repository.

Le schéma de ce composant est en lecture seule. Il stocke le corps du message envoyé depuis le producteur du message.

 

Broker list

Saisissez les adresses des nœuds du broker du cluster Kafka à utiliser.

L'adresse doit se présenter sous la forme suivante : hostname:port. Ces informations contiennent le nom et le port du nœud hébergeant dans le cluster Kafka.

Si vous devez spécifier plusieurs adresses, séparez-les à l'aide d'une virgule (,).

 

Starting offset

Sélectionnez le point de départ duquel les messages d'un sujet sont consommés.

Dans Kafka, le numéro d'ID séquentiel d'un message se nomme offset. Dans cette liste, vous pouvez sélectionner From beginning pour commencer la consommation depuis le message le plus ancien du sujet entier ou sélectionner From latest pour commencer depuis le message le plus récent ayant été consommé par le même groupe de consommateurs et à partir duquel l'offset a été commité.

Notez que, pour permettre au composant de se souvenir de la position d'un message consommé, vous devez activer le point de contrôle Spark Streaming dans l'onglet Spark Configuration de la vue Run du Job.

Chaque groupe de consommateurs possède son propre compteur pour se rappeler la position d'un message consommé. Pour cette raison, une fois qu'un groupe de consommateurs à commencé à consommer des messages d'un sujet donné, un groupe de consommateurs reconnaît le message le plus récent en voyant simplement la position où son groupe a arrête la consommation, plutôt que le sujet complet. Partant de ce principe, les comportements suivants peuvent être attendus :

  • un sujet possède par exemple 100 messages. Si un groupe de consommateurs a arrêté la consommation du message à l'offset 50, lorsque vous sélectionnez From latest, le même groupe de consommateurs reprend à partir de l'offset 51.

  • si vous créez un nouveau groupe de consommateurs ou en réinitialisez un existant, ce qui signifie que ce groupe n'a consommé aucun message de ce sujet, lorsque vous le démarrez depuis le dernier message, ce nouveau groupe démarre et attend l'offset 101.

 Topic name

Saisissez le nom du sujet depuis lequel le tKafkaInput reçoit le flux de messages.

 

Set number of records per second to read from each Kafka partition

Saisissez ce nombre entre guillemet doubles afin de limiter la taille de chaque batch à envoyer pour traitement.

Par exemple, si vous saisissez 100 et que la valeur du batch définie dans l'onglet Spark configuration est 2 secondes, la taille de partition pour chaque batch est de 200 messages.

Si vous laissez cette case décochée, le composant essaye de lire tous les messages disponibles en une seconde dans un batch avant d'envoyer ce dernier, ce qui peut créer un échec du Job s'il gère d'énormes quantités de messages.

Advanced settings

Kafka properties

Ajoutez les propriétés de consommation Kafka nécessaires pour personnaliser cette table. Par exemple, configurez une valeur spécifique zookeeper.connection.timeout.ms pour éviter l'exception ZkTimeoutException.

Pour plus d'information concernant les propriétés de consommation à définir dans cette table, consultez la section décrivant la configuration du consommateur dans la documentation Kafka, à l'adresse suivante : http://kafka.apache.org/documentation.html#consumerconfigs (en anglais).

 

Encoding

Sélectionnez l'encodage à partir de la liste ou sélectionnez Custom et définissez-le manuellement.

Cet encodage est utilisé par le tKafkaInput pour décoder les messages d'entrée.

Utilisation dans des Jobs Spark Streaming

Dans un Job Talend Spark Streaming, il est utilisé en tant que composant d'entrée et nécessite un lien de sortie. Les autres composants utilisés avec celui-ci doivent également être des composants Spark Streaming. Ils génèrent nativement du code Spark pouvant être exécuté directement dans un cluster Spark.

Ce composant, ainsi que les composants Spark Streaming de la Palette à laquelle il appartient, s'affichent uniquement lorsque vous créez un Job Spark Streaming.

Notez que, dans cette documentation, sauf mention contraire , un scénario présente uniquement des Jobs de type Standard, c'est-à-dire des Jobs Talend traditionnels d'intégration de données.

Dans l'implémentation du composant courant dans Spark, les offsets Kafka sont automatiquement gérés par Spark, c'est-à-dire, au lieu d'être commités dans Zookeeper ou Kafka, les offsets sont suivis dans les points de contrôle Spark. Pour plus d'informations concernant cette implémentation, consultez la section relative à l'approche directe dans la documentation de Spark : http://spark.apache.org/docs/latest/streaming-kafka-integration.html (en anglais).

Spark Connection

Vous devez utiliser l'onglet Spark Configuration de la vue Run afin de définir la connexion à un cluster Spark donné pour le Job complet. De plus, puisque le Job attend ses fichiers .jar dépendants pour l'exécution, un (et un seul) composant relatif à un système de fichiers de la famille Storage est requis au sein du même Job, afin que Spark puisse utiliser ce composant pour se connecter au système de fichiers auquel les fichiers .jar dépendants du Job sont transférés :

Cette connexion fonctionne uniquement pour le Job dans lequel vous l'avez définie.

Log4j

Si vous utilisez une solution Talend soumise à souscription, l'activité de ce composant peut être journalisée avec la fonctionnalité log4j. Pour plus d'informations sur cette fonctionnalité, consultez le Guide utilisateur du Studio Talend.

Pour plus d'informations sur les niveaux de logs du log4j, consultez la documentation d'Apache : http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html (en anglais).

Analyser un flux Twitter en quasi temps réel

Dans ce scénario, vous créez un Job Spark Streaming pour analyser, après chaque intervalle de 15 secondes, les hashtags les plus utilisés par les utilisateurs de Twitter lorsqu'ils mentionnent Paris, au cours de la période des 20 secondes précédentes.

Un programme tiers open source est utilisé pour recevoir et écrire des flux Twitter dans un sujet Kafka donné, twitter_live par exemple et le Job que vous créez dans ce scénario est utilisé pour consommer les Tweets de ce sujet.

Une ligne de données brutes Twitter avec des hashtags se présente comme exemple à l'adresse suivante https://dev.twitter.com/overview/api/entities-in-twitter-objects#hashtags (en anglais).

Avant de reproduire ce scénario, vous devez vous assurer que votre système Kafka s'exécute correctement et que vous avez les droits et permissions appropriés pour accéder au sujet Kafka à utiliser. Vous avez besoin d'un programme de mise en flux de Twitter afin de transférer des flux Twitter dans Kafka en quasi temps réel. Talend ne fournit pas ce genre de programme mais certains programmes gratuits, créés à cet effet, sont disponibles sur des communautés en ligne, comme Github.

Pour reproduire ce scénario, procédez comme suit :

Relier les composants

  1. Dans la perspective Integration du Studio, créez un Job Spark Streaming depuis le nœud Job Designs dans la vue Repository.

    Pour plus d'informations concernant la création d'un Job Spark Streaming, consultez Guide de prise en main de Talend Big Data.

  2. Dans l'espace de modélisation graphique, saisissez le nom du composant à utiliser dans la liste qui s'affiche. Dans ce scénario, les composants utilisés sont tHDFSConfiguration, tKafkaInput, tWindow, tExtractJSONFields, tMap, tAggregateRow, tTop et tLogRow.

  3. Reliez le tKafkaInput, le tWindow, le tExtractJSONFields et le tMap à l'aide d'un lien Row > Main.

  4. Reliez le tMap au tAggregateRow à l'aide d'un lien Row > Main et nommez cette connexion dans la boîte de dialogue qui s'ouvre. Par exemple, nommez-la hashtag.

  5. Reliez le tAggregateRow, le tTop et le tLogRow à l'aide d'un lien Row > Main.

  6. Ne connectez pas le tHDFSConfiguration.

Configurer la connexion à Spark

  1. Cliquez sur le bouton Run pour ouvrir sa vue, puis cliquez sur l'onglet Spark Configuration pour afficher sa vue et configurer la connexion à Spark.

    Cette vue doit ressembler à l'image ci-dessous :

  2. Sélectionnez le type de cluster Spark auquel vous connecter.

    • Local : le Studio construit l'environnement Spark en lui-même au moment de l'exécution locale du Job dans le Studio. Avec ce mode, chaque processeur de la machine locale est utilisé comme Worker Spark pour effectuer les calculs. Ce mode requiert la configuration d'un minimum de paramètres dans la vue de configuration.

      Notez que cette machine locale est la machine sur laquelle s'exécute le Job. Le mode Local est le mode par défaut. Vous devez décocher cette case pour afficher la liste déroulante dans laquelle sélectionner les autres modes.

    • Standalone : le Studio se connecte à un cluster compatible Spark pour exécuter le Job depuis ce cluster.

    • Yarn client : le Studio exécute le pilote Spark pour orchestrer comment le Job doit être exécuté puis envoie l'orchestration au service Yarn d'un cluster Hadoop donné, afin que le Resource Manager de ce service Yarn demande des ressources pour l'exécution.

  3. Si vous utilisez le mode Yarn client, la liste Property type s'affiche et vous permet de sélectionner une connexion à Hadoop établie depuis le Repository, si vous avez créé cette connexion dans le Repository. Le Studio réutilise l'ensemble des informations de connexion pour ce Job.

    Pour plus d'informations concernant la création d'une connexion Hadoop dans le Repository, consultez le chapitre décrivant le nœud Hadoop cluster dans le Guide utilisateur du Studio Talend.

  4. Sélectionnez la version de la distribution Hadoop à utiliser avec Spark.

    Si vous ne trouvez pas votre distribution dans la liste déroulante, cela signifie que la distribution à laquelle vous souhaitez vous connecter n'est pas officiellement supportée par Talend. Dans ce cas, vous pouvez sélectionner Custom puis la version de Spark, dans la liste Spark version, du cluster auquel vous connecter. Cliquez sur le bouton pour afficher une boîte de dialogue dans laquelle vous pouvez :

    1. Sélectionner Import from existing version pour importer une distribution officiellement supportée comme base et ajouter d'autres fichiers .jar requis que la distribution de base ne fournit pas.

    2. Sélectionnez Import from zip pour importer le .zip de configuration pour la distribution personnalisée à utiliser. Ce fichier .zip doit contenir les bibliothèques des différents éléments Hadoop/Spark et le fichier d'index de ces bibliothèques.

      Notez que les versions personnalisées ne sont pas officiellement supportées par Talend. Talend et sa communauté fournissent l'opportunité de vous connecter à des versions personnalisées depuis le Studio mais ne peuvent vous garantir la simplicité de la configuration de la version que vous choisissez. Il est recommandé de configurer ces connexions si vous avez une expérience suffisante de Hadoop et de Spark pour gérer par vous-mêmes les problèmes pouvant survenir.

  5. Configurez les informations de connexion aux principaux services du cluster à utiliser.

    Si vous utilisez le mode Yarn client, vous devez saisir les adresses des différents services dans les champs correspondants (si vous laissez décochée la case d'un service, lors de l'exécution, la configuration du paramètre en question du cluster Hadoop sera ignorée) :

    • Dans le champ Resource manager, saisissez l'adresse du service ResourceManager du cluster Hadoop à utiliser.

    • Cochez la case Set resourcemanager scheduler address et saisissez l'adresse de l'ordonnanceur (Scheduler) dans le champ qui apparaît.

    • Cochez la case Set jobhistory address et saisissez l'emplacement du serveur JobHistory du cluster Hadoop à utiliser. Cela permet de stocker les informations relatives aux métriques du Job courant sur le serveur JobHistory.

    • Cochez la case Set staging directory et saisissez le chemin d'accès au répertoire défini dans votre cluster Hadoop pour les fichiers temporaires créés par l'exécution de programmes. Ce répertoire se trouve sous la propriété yarn.app.mapreduce.am.staging-dir dans les fichiers de configuration, notamment les fichiers yarn-site.xml et mapred-site.xml de votre distribution.

    • Cochez la case Set memory pour allouer des volumes de mémoire aux calculs Map et Reduce et au service ApplicationMaster de YARN.

    • Si vous accédez au cluster Hadoop s'exécutant avec la sécurité Kerberos, cochez cette case. Saisissez les noms des principaux Kerberos pour le service du ResourceManager et le service du JobHistory dans les champs qui s'affichent. Cela vous permet d'utiliser votre identifiant pour vous authentifier par rapport aux informations stockées dans Kerberos. Ces principaux se trouvent dans les fichiers de configuration de votre distribution. Par exemple, dans une distribution CDH4, le Principal du ResourceManager est configuré dans le fichier yarn-site.xml et celui du JobHistory dans le fichier mapred-site.xml.

      Si vous souhaitez utiliser un fichier Kerberos keytab pour vous identifier, cochez la case Use a keytab to authenticate. Un fichier keytab contient des paires de principaux et clés cryptées Kerberos. Vous devez saisir le principal à utiliser dans le champ Principal et le chemin d'accès au fichier keytab dans le champ Keytab.

      Notez que l'utilisateur qui exécute un Job utilisant un keytab n'est pas forcément celui désigné par le principal mais qu'il doit avoir le droit de lecture pour le fichier keytab utilisé. Par exemple, le nom d'utilisateur que vous utilisez pour exécuter le Job est user1 et le principal à utiliser est guest. Dans cette situation, assurez-vous que user1 a les droits de lecture pour le fichier keytab à utiliser.

    • Le champ User name est disponible lorsque vous n'utilisez pas Kerberos pour vous authentifier. Dans ce champ, saisissez votre identifiant pour cette distribution. Si vous laissez le champ vide, le nom de la machine hébergeant le Studio sera utilisé.

      Comme le Job doit charger dans HDFS des fichiers .jar du cluster à utiliser, vous devez vous assurer que le nom d'utilisateur est le même que celui défini dans le tHDFSConfiguration, le composant utilisé pour fournir à Spark les informations de connexion à HDFS.

    Si vous utilisez le mode Standalone, vous devez configurer les paramètres suivants :

    • Dans le champ Spark host, saisissez l'URI du Spark Master du cluster Hadoop utilisé.

    • Dans le champ Spark home, saisissez l'emplacement où l'exécutable Spark est installé dans le cluster Hadoop utilisé.

  6. Si vous devez exécuter le Job courant sous Windows, il est recommandé de spécifier où le programme winutils.exe à utiliser est stocké.

    • Si vous savez où se trouve le fichier winutils.exe et que vous souhaitez l'utiliser, cochez la case Define the Hadoop home directory et saisissez le répertoire dans lequel est stocké winutils.exe.

    • Sinon, laissez cette case décochée. Le Studio en génère un par lui-même et l'utilise pour le Job.

  7. Si le cluster Spark ne peut reconnaître la machine sur laquelle le Job est lancé, cochez la case Define the driver hostname or IP address et saisissez le nom de l'hôte ou l'adresse IP de cette machine. Cela permet au Master et à son Worker Spark de reconnaître la machine où se trouve le Job et donc son pilote.

    Notez que, dans cette situation, vous devez également ajouter le nom et l'adresse IP de cette machine dans son fichier host.

  8. Dans le champ Batch size, saisissez l'intervalle de temps à la fin duquel le Job revoit la source de données pour identifier les modifications et traite les nouveaux micro-batches.

  9. Si nécessaire, cochez la case Define a streaming timeout et, dans le champ qui s'affiche, saisissez la période de temps à la fin de laquelle le Job Streaming s'arrête automatiquement.

  10. Si vous souhaitez que le Job résiste aux échecs, cochez la case Activate checkpointing pour activer l'opération Spark de point de contrôle. Dans le champ qui s'affiche, saisissez le répertoire dans lequel Spark stocke, dans le système de fichiers du cluster, les données de contexte des calculs de Streaming, comme les métadonnées et les RDD générés par ce calcul.

    Pour plus d'informations concernant les points de contrôle Spark, consultez http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#checkpointing (en anglais).

  11. Cochez la case Set Tuning properties pour optimiser l'allocation des ressources à utiliser pour exécuter le Job. Ces propriétés ne sont pas obligatoires pour que le Job s'exécute correctement, mais elles sont utiles lorsque Spark est congestionné par des problèmes de ressources dans le cluster, comme le processeur, la bande passante ou la mémoire :

    • Driver memory et Driver core : saisissez la taille de la mémoire et le nombre de cœurs à allouer au pilote du Job courant.

    • Executor memory : saisissez la taille de la mémoire à allouer à chaque exécuteur Spark.

    • Core per executor : cochez cette case et, dans le champ affiché, saisissez le nombre de cœurs à utiliser par chaque exécuteur. Si vous laissez cette case décochée, l'allocation définie par défaut par Spark est utilisée. Par exemple, tous les cœurs disponibles sont utilisés par un exécuteur en mode Standalone.

    • Set Web UI port : si vous devez modifier le port par défaut de l'application Web de Spark, cochez cette case et saisissez le numéro du port à utiliser.

    • Broadcast factory : sélectionnez l'implémentation du broadcast à utiliser pour mettre les variables en cache sur chaque machine de Worker.

    • Customize Spark serializer : si vous devez importer un sérialiseur Spark externe, cochez cette case et, dans le champ qui s'affiche, saisissez le nom de la classe complètement qualifié du sérialiseur à utiliser.

    • Yarn resource allocation : sélectionnez comment vous souhaitez que Yarn alloue des ressources parmi les exécuteurs.

      • Auto : laissez Yarn gérer l'allocation seul.

      • Fixed : vous devez saisir le nombre d'exécuteurs à utiliser dans le champ Num executors qui s'affiche.

      • Dynamic : Yarn adapte le nombre d'exécuteurs afin de s'adapter à la charge de travail. Vous devez définir l'échelle de cette allocation dynamique en définissant le nombre initial d'exécuteurs à exécuter dans le champ Initial executors, le nombre le plus faible d'exécuteurs dans le champ Min executors et le plus grand nombre d'exécuteurs dans le champ Max executors.

      Cette fonctionnalité est disponible en mode Yarn client uniquement.

  12. Dans le champ Spark "scratch" directory, saisissez le répertoire dans lequel le Studio stocke, dans le système local, les fichiers temporaires comme les fichiers .jar à transférer. Si vous lancez votre Job sous Windows, le disque par défaut est C:. Si vous laissez /tmp dans ce champ, le répertoire est C:/tmp.

  13. Ajoutez toute propriété Spark à utiliser pour écraser la propriété équivalente utilisée par le Studio.

    Si vous souhaitez que les logs de l'application Spark de ce Job soient persistants dans le système de fichiers, ajoutez les propriétés relatives dans la table Advanced properties. Par exemple, les propriétés à configurer pour le mode Yarn client sont :

    • spark.yarn.historyServer.address

    • spark.eventLog.enabled

    • spark.eventLog.dir

    La valeur de la propriété spark.eventlog.enabled property doit être true. Pour les valeurs des deux autres propriétés, contactez l'administrateur du cluster Spark à utiliser.

    Pour plus d'informations concernant les propriétés Spark valides, consultez la documentation Spark à l'adresse https://spark.apache.org/docs/latest/configuration (en anglais).

Configurer la connexion au système de fichiers à utiliser par Spark

  1. Double-cliquez sur le tHDFSConfiguration pour ouvrir sa vue Component. Notez que le tHDFSConfiguration est utilisé car le mode Spark Yarn client est utilisé pour exécuter des Jobs Spark dans ce scénario.

    Spark utilise ce composant pour se connecter au système HDFS auquel sont transférés les fichiers .jar dépendants du Job.

  2. Dans la zone Version, sélectionnez la distribution Hadoop et la version à laquelle vous connecter.

  3. Dans le champ NameNode URI, saisissez l'emplacement de la machine hébergeant le service NameNode du cluster.

  4. Dans le champ Username, saisissez les informations d'authentification utilisée pour vous connecter au système HDFS à utiliser. Notez que le nom d'utilisateur doit être le même que celui saisi dans l'onglet Spark configuration.

Lire des messages d'un sujet Kafka donné

  1. Double-cliquez sur le tKafkaInput pour ouvrir sa vue Component.

  2. Dans le champ Broker list, saisissez l'emplacement des brokers du cluster Kafka à utiliser, en les séparant à l'aide d'une virgule (,). Dans cet exemple, seul un broker existe et son emplacement est localhost:9092.

  3. Dans la liste Starting offset, sélectionnez le point de départ duquel les messages d'un sujet sont consommés. Dans ce scénario, sélectionnez From latest, ce qui signifie commencer par le message le plus récent ayant été consommé par le même groupe de consommateurs et à partir duquel l'offset a été commité.

  4. Dans le champ Topic name, saisissez le nom du sujet duquel le Job consomme Job des flux Twitter. Dans ce scénario, le sujet est twitter_live.

    Ce sujet doit exister dans votre système Kafka. Pour plus d'informations concernant la création d'un sujet Kafka, consultez la documentation d'Apache Kafka ou utilisez le composant tKafkaCreateTopic fourni avec le Studio. Notez que le tKafkaCreateTopic n'est pas disponible pour les Jobs Spark.

  5. Cochez la case Set number of records per second to read from each Kafka partition. Cela permet de limiter la taille de chaque micro-batch à envoyer pour traitement.

Configurer la fréquence d'analyse des Tweets

  1. Double-cliquez sur le tWindow pour ouvrir sa vue Component.

    Ce composant est utilisé pour appliquer une fenêtre Spark sur un RDD d'entrée, afin que le Job analyse les Tweets des dernières 20 secondes après chaque intervalle de 15 secondes. Cela crée, entre l'application de deux fenêtres, un chevauchement d'un micro-batch, de 5 secondes, comme défini dans le champ Batch size dans l'onglet Spark configuration.

  2. Dans le champ Window duration, saisissez 20000 millisecondes, soit 20 secondes.

  3. Cochez la case Define the slide duration et, dans le champ qui s'affiche, saisissez 15000 millisecondes, soit 15 secondes.

La configuration de la fenêtre est présentée au-dessus de l'icône du tWindow dans le Job que vous créez.

Extraire le champ du hashtag depuis des données brutes de Tweets

  1. Double-cliquez sur le tExtractJSONFields pour ouvrir sa vue Component.

    Comme vous pouvez lire à l'adresse suivante, https://dev.twitter.com/overview/api/entities-in-twitter-objects#hashtags (en anglais), les données brutes des Tweets utilisent le format JSON.

  2. Cliquez sur Sync columns afin de récupérer le schéma du composant précédent. Le schéma récupéré est celui en lecture seule du tKafkaInput, puisque le tWindow n'a pas d'impact sur le schéma.

  3. Cliquez sur le boutoun [...] à côté du champ Edit schema pour ouvrir l'éditeur du schéma.

  4. Renommez la seule colonne du schéma de sortie en hashtag. Cette colonne est utilisée pour contenir le champ hashtag extrait des données JSON des Tweets.

  5. Cliquez sur OK afin de valider les modifications.

  6. Dans la liste Read by, sélectionnez JsonPath.

  7. Dans la liste JSON field, sélectionnez la colonne du schéma d'entrée de laquelle extraire les champs. Dans ce scénario, cette colonne est payload.

  8. Dans le champ Loop Jsonpath query, saisissez le chemin JSON pointant vers l'élément sur lequel se base la boucle d'extraction se base. Selon la structure JSON d'un Tweet, comme vous pouvez voir dans la documentation de Twitter, saisissez $.entities.hashtags pour effectuer une boucle sur l'entité hashtags.

  9. Dans la table Mapping, dans laquelle la colonne hashtag du schéma de sortie a été automatiquement renseignée, saisissez l'élément sur lequel effectuer l'extraction. Dans cet exemple, celui-ci est l'attribut text de chaque entité hashtags. Saisissez text entre guillemets doubles dans la colonne Json query.

Passer chaque hashtag en minuscules

  1. Double-cliquez sur le tMap pour ouvrir son Map editor.

  2. Dans la table du flux de sortie, à droite, saisissez StringHandling.DOWNCASE(row2.hashtag) dans la colonne Expression. Cela crée automatiquement le mapping entre la colonne hashtag du schéma d'entrée et la colonne hashtag du schéma de sortie.

    Notez que row2 dans cette expression est l'ID du lien d'entrée vers le tMap. Il peut être nommé différemment dans le Job que vous créez.

  3. Cliquez sur Apply afin de valider ces modifications et cliquez sur OK afin de fermer cet éditeur.

Compter les occurrences de chaque hashtag

  1. Double-cliquez sur le tAggregateRow pour ouvrir sa vue Component.

  2. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir son éditeur de schéma.

  3. Du côté de la sortie, cliquez deux fois sur le bouton [+] pour ajouter deux lignes au schéma de sortie, que vous nommez respectivement en hashtag et count.

  4. Cliquez sur OK afin de valider ces modifications et accepter la propagation proposée par la boîte de dialogue qui s'ouvre.

  5. Dans la table Group by, ajoutez une ligne en cliquant sur le bouton [+] et sélectionnez hashtag pour les colonnes Output column et Input column position. Cela permet de passer des données de la colonne hashtag du schéma d'entrée dans la colonne hashtag du schéma de sortie.

  6. Dans la table Operations, ajoutez une ligne en cliquant sur le bouton [+].

  7. Dans la colonne Output column, sélectionnez count. Dans la colonne Function, sélectionnez count et, dans la colonne Input column position, sélectionnez hashtag.

Sélectionner les 5 hashtags les plus utilisés par période de 20 secondes

  1. Double-cliquez sur le tTop pour ouvrir sa vue Component.

  2. Dans le champ Number of line selected, saisissez le nombre de lignes à passer au composant suivant, à partir de la première ligne de données triées par le tTop. Dans cet exemple, saisissez 5, ce qui signifie que les 5 hashtags les plus utilisés durant chaque période de 20 secondes.

  3. Dans la table Criteria, ajoutez une ligne en cliquant sur le bouton [+].

  4. Dans la colonne Schema column, sélectionnez count, la colonne de laquelle trier les données, dans la colonne sort num or alpha, sélectionnez num, ce qui signifie que les données à trier sont des nombres. Dans la colonne Order asc or desc, sélectionnez desc pour trier les données en ordre descendant.

Exécuter le Job

Exécutez votre Job.

Le composant tLogRow est utilisé pour présenter les résultats d'exécution du Job.

  1. Assurez-vous que votre programme de mise en flux de Twitter est toujours en cours d'exécution et continue à écrire les Tweets reçus dans le sujet donné.

  2. Appuyez sur F6 pour exécuter le Job.

Laissez le Job s'exécuter un moment, puis, dans la console de la vue Run, vous pouvez voir que le Job liste les 5 hashtags les plus utilisés in dans chaque batch de Tweets mentionnant Paris. Selon la configuration de la taille de chaque micro-batch et de la fenêtre Spark, chacun de ces batchs de Tweets contient les Tweets des 20 dernières secondes, reçus après chaque intervalle de 15 secondes.

Notez que vous pouvez gérer le niveau d'informations relatives à l'exécution à écrire en sortie dans la console. Cochez la case log4jLevel dans l'onglet Advanced settings et sélectionnez le niveau d'informations à afficher.

Pour plus d'informations sur les niveaux de logs du log4j, consultez la documentation d'Apache : http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/Level.html (en anglais).

Propriétés du tKafkaInput dans des Jobs Storm

Famille du composant

Messaging/Input

 

Basic settings

Schema et Edit schema

Un schéma est une description de lignes, il définit le nombre de champs (colonnes) qui sont traités et passés au composant suivant. Le schéma est soit local (Built-In), soit distant dans le Repository.

Notez que le schéma de ce composant est en lecture seule. Il stocke les messages envoyés depuis le producteur de messages.

 

Zookeeper host

Saisissez l'adresse du service Zookeeper du système Kafka à utiliser.

 

Port

Saisissez le numéro du port d'écoute du service Zookeeper à utiliser.

 

Topic name

Saisissez le nom du sujet duquel le tKafkaInput reçoit le flux de messages.

Utilisation dans des Jobs Storm

Dans un Job Storm Talend, ce composant est utilisé comme composant de début. Les autres composants utilisés avec celui-ci doivent également être des composants Storm. Ils génèrent nativement du code Storm pouvant être exécuté directement dans un système Storm.

Pour plus d'informations concernant un Job Storm Talend, consultez les sections décrivant comment créer et configurer un Job Storm Talend dans le Guide de prise en main de Talend Big Data.

Notez que, dans cette documentation, sauf lorsqu'explicitement indiqué, un scénario présente uniquement des Jobs de type Standard, c'est-à-dire des Jobs Talend traditionnels d'intégration de données.

Storm Connection

Vous devez utiliser l'onglet Storm Configuration dans la vue Run pour définir la connexion à un système Storm donné pour le Job complet.

Cette connexion fonctionne uniquement pour le Job dans lequel vous l'avez définie.

Scénario : Analyser les activités de personnes à l'aide d'un Topology Storm

Dans ce scénario, un Job Storm (Topology) comprenant quatre composants est créé pour transmettre des messages concernant les activités de personnes données au Topology que vous créez, afin d'analyser la popularités de ces activités.

Ce Job souscrit à un sujet créé par le producteur de sujets Kafka, ce qui signifie que vous devez installer le cluster Kafka dans votre système de messaging pour maintenir le flux des messages. Pour plus d'informations concernant le service de messaging Kafka, consultez la documentation Apache Kafka.

Ce Job s'exécute sur Storm, vous devez donc vous assurer que votre système Storm est prêt à être utilisé. Pour plus d'informations concernant Storm, consultez la documentation Apache Storm.

Notez que, lorsque vous utilisez le système Storm installé dans Hortonwork Data Platform 2.1 (HDP2.1), vous devez vous assurer que les noms de serveurs DRPC (appel de procédures à distance distribué) de Storm sont bien définis dans la zone Custom storm.yaml dans l'onglet Config de Storm, dans la console Web Ambari. Par exemple, si vous utilisez deux serveurs DRPC Storm, Server1 et Server2, vous devez les définir dans la zone Custom storm.yaml comme suit : [Server1,Server2].

Pour reproduire ce scénario, procédez comme suit.

Produire les messages d'exemple

Dans un cas d'usage réel, le système produisant les messages vers Kafka est complètement découplé. Dans ce scénario d'exemple, Kafka lui-même est utilisé pour produire les messages d'exemple. Effectuez les opérations suivantes pour produire ces messages :

  1. Créez le sujet Kafka à utiliser pour catégoriser les messages. La commande suivante est utilisée à des fins de démonstration uniquement. Pour plus d'informations concernant la création d'un sujet Kafka, consultez la documentation Apache Kafka.

    /usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic activities --partitions 1 --replication-factor 1

    Cette commande crée un sujet nommé activities, utilisant les brokers Kafka gérés par le service Zookeeper sur la machine localhost.

  2. Publiez le message que vous souhaitez analyser dans le sujet activities que vous venez de créer. Dans ce scénario, Kafka est utilisé pour effectuer cette publication, en utilisant par exemple la commande suivante :

    echo 'Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink | /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list  localhost:9092 --topic  activities

    Cette commande publie dix messages simples :

    Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink

    Comme expliqué précédemment, vous pouvez utiliser votre producteur de messages actuel pour effectuer la publication.

Relier les composants

  1. Dans la perspective Integration du Studio, créez un Job vide Storm à partir du nœud Job Designs dans la vue Repository.

    Pour plus d'informations concernant la création d'un Job Storm, consultez Guide de prise en main de Talend Big Data.

  2. Dans l'espace de modélisation graphique, saisissez le nom du composant à utiliser et sélectionnez le composant dans la liste qui s'affiche. Dans ce scénario, les composants sont un tKafkaInput, un tJavaStorm, un tAggregateRow et un tLogRow.

  3. Reliez-les à l'aide de liens Row > Main.

Configurer la connexion

  1. Cliquez sur Run afin d'ouvrir cette vue. Cliquez sur l'onglet Storm configuration afin de configurer la connexion au système Storm à utiliser.

  2. Dans la zone Storm node roles, renseignez les informations du serveur Nimbus ainsi que l'endpoint du DRPC du cluster Storm à utiliser :

    • Local mode : cochez cette case pour construire l'environnement Storm dans le Studio. Dans ce cas, le Job Storm en cours de création est exécuté localement dans le Studio. Vous n'avez pas besoin de définir d'adresse d'endpoint spécifique Nimbus ou DRPC.

    • Nimbus host et Port : dans ces deux champs, saisissez l'adresse du serveur Nimbus Storm et son numéro de port, respectivement.

    • DRPC endpoint et Port : dans ces deux champs, saisissez l'adresse de l'endpoint DRPC de Storm et son numéro de port, respectivement.

  3. Dans le champ Topology name, saisissez le nom que vous souhaitez que le système Storm utilise pour le Job Storm, ou Topology en termes Storm. Il est recommandé d'utiliser le même nom que pour le Job Storm, afin de reconnaître le Job facilement, même au sein du système Storm.

  4. Cochez la case Kill existing topology pour faire s'arrêter tout Topology dans le système Storm ayant le même nom que le Job en cours de création que vous souhaitez exécuter.

    Si vous décochez cette case et que le Job du même nom est déjà en cours d'exécution dans le système Storm, le Job courant échoue lorsqu'il est envoyé dans Storm pour exécution.

  5. Cochez la case Submit topology afin de soumettre le Job courant au système Storm. Cette fonctionnalité est utilisée lorsque vous devez envoyer un nouveau Topology à Storm ou utilisée avec la fonctionnalité Kill existing topology lorsque vous devez mettre à jour un Topology en cours d'exécution dans le système Storm.

    Si vous décochez cette case lors de l'exécution du Job, la soumission de ce Job est ignorée, mais les autres informations de configuration sont prises en compte, par exemple l'arrêt d'un Topology existant.

  6. Cochez la case Monitor topology after submission afin de monitorer le Job Storm dans la console de la vue Run.

    Si vous décochez cette case, vous ne pouvez lire les informations de monitoring dans la console.

  7. Dans le champ Stop monitoring after timeout reached, saisissez, sans guillemet double, la valeur numérique pour indiquer l'arrêt du monitoring lorsqu'un Topology en cours d'exécution atteint son délai avant suspension. La valeur par défaut est -1, ce qui signifie qu'aucune suspension n'est appliquée.

  8. Cochez la case Kill topology on quitting Talend Job pour permettre au Studio d'arrêter, s'il est toujours en cours d'exécution, le Job dans le système Storm lorsque vous l'arrêtez dans le Studio.

    Si vous décochez cette case, le Topology de ce Job continue à s'exécuter dans le système Storm même si vous l'arrêtez dans le Studio ou si vous arrêtez le Studio.

  9. Si vous souhaitez utiliser d'autres propriétés Storm spécifiques, ajoutez-les à la table Storm configuration.

Recevoir un message du canal Kafka

  1. Double-cliquez sur le tKafkaInput pour ouvrir sa vue Component.

  2. Dans le champ Zookeeper host, saisissez l'adresse du service Zookeeper utilisé pour coordonner le cluster Kafka à utiliser.

  3. Dans le champ Port, saisissez le numéro du port de ce service Zookeeper.

  4. Dans le champ Topic name, saisissez le nom du sujet dans lequel vous devez recevoir des messages. Dans ce scénario, le sujet est activities.

Extraire des informations d'un message

  1. Double-cliquez sur le tJavaStorm pour ouvrir sa vue Component.

  2. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur du schéma.

  3. Du côté de la sortie, à droite, cliquez trois fois sur le bouton [+] pour ajouter trois lignes et, dans la colonne Column, renommez-les respectivement firstname, gender et activity. Ces colonnes correspondent aux informations que vous pouvez extraire d'un message d'exemple.

  4. Cliquez sur OK afin de valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.

  5. Dans la zone Bolt code, saisissez la méthode principale du Bolt à exécuter. Dans ce scénario, le code est le suivant :

    String[] tokens = input.get_str().split("\\|");
    collector.emit(new Values(
    		tokens[0],
    		tokens[1],
    		tokens[2]
    	));

Regrouper les informations extraites

  1. Double-cliquez sur le tAggregateRow pour ouvrir sa vue Component. Ce composant vous permet de voir quelle est l'activité la plus populaire dans les messages reçus.

  2. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur du schéma.

  3. Du côté de la sortie, à droite, cliquez trois fois sur le bouton [+] pour ajouter trois lignes. Dans la colonne Column, renommez-les respectivement activity, gender et popularity.

  4. Dans la colonne Type de la ligne popularity du schéma de sortie, sélectionnez Double.

  5. Cliquez sur OK afin de valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.

  6. Dans la table Group by, ajoutez deux lignes en cliquant deux fois sur le bouton [+] et en configurant ces lignes comme suit afin de regrouper les données de sortie.

    • Dans la colonne Output column, sélectionnez les colonnes du schéma de sortie à utiliser comme conditions pour grouper les données de sortie. Dans cet exemple, mes colonnes à utiliser sont activity et gender.

    • Dans la colonne Input column position, sélectionnez les colonnes du schéma d'entrée permettant d'envoyer les données aux colonnes de sortie sélectionnées dans la colonne Output column. Dans ce scénario, ces colonnes sont activity et gender.

  7. Dans la table Operations, ajoutez une ligne en cliquant sur le bouton [+] et configurez-la comme suit afin de calculer la popularité de chaque activité :

    • dans la colonne Output column, sélectionnez la colonne du schéma de sortie qui contiendra les résultats calculés. Dans ce scénario, la colonne est popularity.

    • dans la colonne Function, sélectionnez la fonction à utiliser pour traiter les données entrantes. Dans ce scénario, sélectionnez count. La fonction compte la fréquence de chaque activité dans les messages reçus.

    • dans la colonne Input column position, sélectionnez la colonne du schéma d'entrée afin de fournir les données à traiter. Dans ce scénario, la colonne est activity.

Exécuter le Job

Vous pouvez exécuter le Job.

Le composant tLogRow est utilisé pour présenter les résultats d'exécution du Job.

Appuyez sur F6 pour exécuter le Job

La vue Run s'ouvre automatiquement. Vous pouvez examiner les résultats d'exécution.

Vous pouvez constater que Drink est l'activité la plus populaire dans les messages avec trois occurrences parmi les personnes de sexe masculin M et une occurrence parmi les personnes de sexe féminin F.

Le Topology Storm continue à s'exécuter, attendant l'arrivée de messages dans le broker de messages Kafka, jusqu'à ce que vous arrêtiez le Job. Dans ce scénario, puisque la case Kill topology on quitting Talend Job est cochée, le Topology Storm est arrêté et supprimé du cluster lorsque le Job est arrêté.