Lire des données Avro depuis ConsumerRecord - Cloud - 8.0

Kafka

Version
Cloud
8.0
Language
Français (France)
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Open Studio for Big Data
Talend Real-Time Big Data Platform
Module
Studio Talend
Content
Création et développement > Systèmes tiers > Composants Messaging (Intégration) > Composants Kafka
Gouvernance de données > Systèmes tiers > Composants Messaging (Intégration) > Composants Kafka
Qualité et préparation de données > Systèmes tiers > Composants Messaging (Intégration) > Composants Kafka

Pourquoi et quand exécuter cette tâche

Configurez le Job de lecture.

Procédure

  1. Depuis le Job de lecture, double-cliquez sur le composant tKafkaInput pour ouvrir sa vue Basic settings et configurez les paramètres suivants :
    1. Dans la liste déroulante Output type, sélectionnez ConsumerRecord.
      Lorsque vous utilisez ConsumerRecord, les enregistrements Avro sont classifiés comme Object dans le Studio Talend, comme suit :
    2. Dans la liste déroulante Version, sélectionnez la version du cluster Kafka à utiliser.
    3. Dans le champ Broker list, saisissez l'adresse des nœuds du broker du cluster Kafka à utiliser.
    4. Dans le champ Topic name, saisissez le nom du topic duquel le tKafkaInput reçoit le flux de messages.
    5. Dans le champ Consumer group id, saisissez le nom du groupe de consommation auquel vous souhaitez que le tKafkaInput appartienne.
  2. Double-cliquez sur le tJavaRow pour ouvrir sa vue Basic settings et configurez les paramètres suivants :
    1. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur du Schema.
    2. Cliquez sur le bouton [+] pour ajouter une colonne et nommez cette colonne. Par exemple :
    3. Cliquez sur OK pour valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.
    4. Dans le champ Code, saisissez le code Java pour extraire le contenu. Par exemple :
      org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord) input_row.record;
      
      output_row.topic = record.topic();
      output_row.partition = record.partition();
      output_row.offset = record.offset();
      output_row.timestamp = record.timestamp();
      output_row.timestampType = record.timestampType();
      
      output_row.header1 = record.headers().lastHeader("header1").value();
      output_row.header2 = record.headers().lastHeader("header2").value();
      
      output_row.key = (byte[]) record.key();
      output_row.value = (byte[]) record.value();
  3. Double-cliquez sur le tLogRow pour ouvrir sa vue Basic settings et configurez le paramètre suivant :
    1. Dans la zone Mode, sélectionnez Table (print values in cells of a table) pour afficher un résultat plus lisible.

Résultats

Le Job de lecture est configuré.