Scénario : Analyser les activités de personnes à l'aide d'un Topology Storm - 6.1

Talend Data Fabric Guide de prise en main

EnrichVersion
6.1
EnrichProdName
Talend Data Fabric
task
Création et développement
Gouvernance de données
Qualité et préparation de données
EnrichPlatform
Studio Talend

Dans ce scénario, un Job Storm (Topology) comprenant quatre composants est créé pour transmettre des messages concernant les activités de personnes données au Topology que vous créez, afin d'analyser la popularités de ces activités.

Ce Job souscrit à un sujet créé par le producteur de sujets Kafka, ce qui signifie que vous devez installer le cluster Kafka dans votre système de messaging pour maintenir le flux des messages. Pour plus d'informations concernant le service de messaging Kafka, consultez la documentation Apache Kafka.

Ce Job s'exécute sur Storm, vous devez donc vous assurer que votre système Storm est prêt à être utilisé. Pour plus d'informations concernant Storm, consultez la documentation Apache Storm.

Notez que, lorsque vous utilisez le système Storm installé dans Hortonwork Data Platform 2.1 (HDP2.1), vous devez vous assurer que les noms de serveurs DRPC (appel de procédures à distance distribué) de Storm sont bien définis dans la zone Custom storm.yaml dans l'onglet Config de Storm, dans la console Web Ambari. Par exemple, si vous utilisez deux serveurs DRPC Storm, Server1 et Server2, vous devez les définir dans la zone Custom storm.yaml comme suit : [Server1,Server2].

Pour reproduire ce scénario, procédez comme suit.

Produire les messages d'exemple

Dans un cas d'usage réel, le système produisant les messages vers Kafka est complètement découplé. Dans ce scénario d'exemple, Kafka lui-même est utilisé pour produire les messages d'exemple. Effectuez les opérations suivantes pour produire ces messages :

  1. Créez le sujet Kafka à utiliser pour catégoriser les messages. La commande suivante est utilisée à des fins de démonstration uniquement. Pour plus d'informations concernant la création d'un sujet Kafka, consultez la documentation Apache Kafka.

    /usr/lib/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic activities --partitions 1 --replication-factor 1

    Cette commande crée un sujet nommé activities, utilisant les brokers Kafka gérés par le service Zookeeper sur la machine localhost.

  2. Publiez le message que vous souhaitez analyser dans le sujet activities que vous venez de créer. Dans ce scénario, Kafka est utilisé pour effectuer cette publication, en utilisant par exemple la commande suivante :

    echo 'Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink | /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list  localhost:9092 --topic  activities

    Cette commande publie dix messages simples :

    Ryan|M|Pool
    Remy|M|Drink
    Remy|M|Hiking
    Irene|F|Drink
    Pierre|M|Movie
    Irene|F|Pool
    Thomas|M|Drink
    Ryan|M|Cooking
    Wang|F|Cooking
    Chen|M|Drink

    Comme expliqué précédemment, vous pouvez utiliser votre producteur de messages actuel pour effectuer la publication.

Relier les composants

  1. Dans la perspective Integration du Studio, créez un Job vide Storm à partir du nœud Job Designs dans la vue Repository.

    Pour plus d'informations concernant la création d'un Job Storm, consultez Prise en main d'un Job Storm.

  2. Dans l'espace de modélisation graphique, saisissez le nom du composant à utiliser et sélectionnez le composant dans la liste qui s'affiche. Dans ce scénario, les composants sont un tKafkaInput, un tJavaStorm, un tAggregateRow et un tLogRow.

  3. Reliez-les à l'aide de liens Row > Main.

Configurer la connexion

  1. Cliquez sur Run afin d'ouvrir cette vue. Cliquez sur l'onglet Storm configuration afin de configurer la connexion au système Storm à utiliser.

  2. Dans la zone Storm node roles, renseignez les informations du serveur Nimbus ainsi que l'endpoint du DRPC du cluster Storm à utiliser :

    • Local mode : cochez cette case pour construire l'environnement Storm dans le Studio. Dans ce cas, le Job Storm en cours de création est exécuté localement dans le Studio. Vous n'avez pas besoin de définir d'adresse d'endpoint spécifique Nimbus ou DRPC.

    • Nimbus host et Port : dans ces deux champs, saisissez l'adresse du serveur Nimbus Storm et son numéro de port, respectivement.

    • DRPC endpoint et Port : dans ces deux champs, saisissez l'adresse de l'endpoint DRPC de Storm et son numéro de port, respectivement.

  3. Dans le champ Topology name, saisissez le nom que vous souhaitez que le système Storm utilise pour le Job Storm, ou Topology en termes Storm. Il est recommandé d'utiliser le même nom que pour le Job Storm, afin de reconnaître le Job facilement, même au sein du système Storm.

  4. Cochez la case Kill existing topology pour faire s'arrêter tout Topology dans le système Storm ayant le même nom que le Job en cours de création que vous souhaitez exécuter.

    Si vous décochez cette case et que le Job du même nom est déjà en cours d'exécution dans le système Storm, le Job courant échoue lorsqu'il est envoyé dans Storm pour exécution.

  5. Cochez la case Submit topology afin de soumettre le Job courant au système Storm. Cette fonctionnalité est utilisée lorsque vous devez envoyer un nouveau Topology à Storm ou utilisée avec la fonctionnalité Kill existing topology lorsque vous devez mettre à jour un Topology en cours d'exécution dans le système Storm.

    Si vous décochez cette case lors de l'exécution du Job, la soumission de ce Job est ignorée, mais les autres informations de configuration sont prises en compte, par exemple l'arrêt d'un Topology existant.

  6. Cochez la case Monitor topology after submission afin de monitorer le Job Storm dans la console de la vue Run.

    Si vous décochez cette case, vous ne pouvez lire les informations de monitoring dans la console.

  7. Dans le champ Stop monitoring after timeout reached, saisissez, sans guillemet double, la valeur numérique pour indiquer l'arrêt du monitoring lorsqu'un Topology en cours d'exécution atteint son délai avant suspension. La valeur par défaut est -1, ce qui signifie qu'aucune suspension n'est appliquée.

  8. Cochez la case Kill topology on quitting Talend Job pour permettre au Studio d'arrêter, s'il est toujours en cours d'exécution, le Job dans le système Storm lorsque vous l'arrêtez dans le Studio.

    Si vous décochez cette case, le Topology de ce Job continue à s'exécuter dans le système Storm même si vous l'arrêtez dans le Studio ou si vous arrêtez le Studio.

  9. Si vous souhaitez utiliser d'autres propriétés Storm spécifiques, ajoutez-les à la table Storm configuration.

Recevoir un message du canal Kafka

  1. Double-cliquez sur le tKafkaInput pour ouvrir sa vue Component.

  2. Dans le champ Zookeeper host, saisissez l'adresse du service Zookeeper utilisé pour coordonner le cluster Kafka à utiliser.

  3. Dans le champ Port, saisissez le numéro du port de ce service Zookeeper.

  4. Dans le champ Topic name, saisissez le nom du sujet dans lequel vous devez recevoir des messages. Dans ce scénario, le sujet est activities.

Extraire des informations d'un message

  1. Double-cliquez sur le tJavaStorm pour ouvrir sa vue Component.

  2. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur du schéma.

  3. Du côté de la sortie, à droite, cliquez trois fois sur le bouton [+] pour ajouter trois lignes et, dans la colonne Column, renommez-les respectivement firstname, gender et activity. Ces colonnes correspondent aux informations que vous pouvez extraire d'un message d'exemple.

  4. Cliquez sur OK afin de valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.

  5. Dans la zone Bolt code, saisissez la méthode principale du Bolt à exécuter. Dans ce scénario, le code est le suivant :

    String[] tokens = input.get_str().split("\\|");
    collector.emit(new Values(
    		tokens[0],
    		tokens[1],
    		tokens[2]
    	));

Regrouper les informations extraites

  1. Double-cliquez sur le tAggregateRow pour ouvrir sa vue Component. Ce composant vous permet de voir quelle est l'activité la plus populaire dans les messages reçus.

  2. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur du schéma.

  3. Du côté de la sortie, à droite, cliquez trois fois sur le bouton [+] pour ajouter trois lignes. Dans la colonne Column, renommez-les respectivement activity, gender et popularity.

  4. Dans la colonne Type de la ligne popularity du schéma de sortie, sélectionnez Double.

  5. Cliquez sur OK afin de valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.

  6. Dans la table Group by, ajoutez deux lignes en cliquant deux fois sur le bouton [+] et en configurant ces lignes comme suit afin de regrouper les données de sortie.

    • Dans la colonne Output column, sélectionnez les colonnes du schéma de sortie à utiliser comme conditions pour grouper les données de sortie. Dans cet exemple, mes colonnes à utiliser sont activity et gender.

    • Dans la colonne Input column position, sélectionnez les colonnes du schéma d'entrée permettant d'envoyer les données aux colonnes de sortie sélectionnées dans la colonne Output column. Dans ce scénario, ces colonnes sont activity et gender.

  7. Dans la table Operations, ajoutez une ligne en cliquant sur le bouton [+] et configurez-la comme suit afin de calculer la popularité de chaque activité :

    • dans la colonne Output column, sélectionnez la colonne du schéma de sortie qui contiendra les résultats calculés. Dans ce scénario, la colonne est popularity.

    • dans la colonne Function, sélectionnez la fonction à utiliser pour traiter les données entrantes. Dans ce scénario, sélectionnez count. La fonction compte la fréquence de chaque activité dans les messages reçus.

    • dans la colonne Input column position, sélectionnez la colonne du schéma d'entrée afin de fournir les données à traiter. Dans ce scénario, la colonne est activity.

Exécuter le Job

Vous pouvez exécuter le Job.

Le composant tLogRow est utilisé pour présenter les résultats d'exécution du Job.

Appuyez sur F6 pour exécuter le Job

La vue Run s'ouvre automatiquement. Vous pouvez examiner les résultats d'exécution.

Vous pouvez constater que Drink est l'activité la plus populaire dans les messages avec trois occurrences parmi les personnes de sexe masculin M et une occurrence parmi les personnes de sexe féminin F.

Le Topology Storm continue à s'exécuter, attendant l'arrivée de messages dans le broker de messages Kafka, jusqu'à ce que vous arrêtiez le Job. Dans ce scénario, puisque la case Kill topology on quitting Talend Job est cochée, le Topology Storm est arrêté et supprimé du cluster lorsque le Job est arrêté.