Reading Avro data from ConsumerRecord - Cloud - 8.0

Kafka

Version
Cloud
8.0
Language
English
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Open Studio for Big Data
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
Data Governance > Third-party systems > Messaging components (Integration) > Kafka components
Data Quality and Preparation > Third-party systems > Messaging components (Integration) > Kafka components
Design and Development > Third-party systems > Messaging components (Integration) > Kafka components

About this task

Configure the reading Job.

Procedure

  1. From the reading Job, double-click the tKafkaInput component to open its Basic settings view and specify the following parameters:
    1. From the Output type drop-down list, select ConsumerRecord.
      When you use ConsumerRecord, the Avro records are classified as Object in Talend Studio as follows:
    2. From the Version drop-down list, select the version of the Kafka cluster to be used.
    3. In the Broker list field, enter the address of the broker nodes of the Kafka cluster to be used.
    4. In the Topic name field, enter the name of the topic from which tKafkaInput receives the feed of messages.
    5. In the Consumer group id field, enter the name of the consumer group to which you want tKafkaInput to belong.
  2. Double-click the tJavaRow component to open its Basic settings view and specify the following parameters:
    1. Click the […] button next to Edit schema to open the Schema dialog box.
    2. Click the [+] button to add a column and give a name to the column. For example:
    3. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.
    4. In the Code field, enter the Java code to extract the content. For example:
      org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord) input_row.record;
      
      output_row.topic = record.topic();
      output_row.partition = record.partition();
      output_row.offset = record.offset();
      output_row.timestamp = record.timestamp();
      output_row.timestampType = record.timestampType();
      
      output_row.header1 = record.headers().lastHeader("header1").value();
      output_row.header2 = record.headers().lastHeader("header2").value();
      
      output_row.key = (byte[]) record.key();
      output_row.value = (byte[]) record.value();
  3. Double-click the tLogRow component to open its Basic settings view and specify the following parameter:
    1. Select Table (print values in cells of a table) in the Mode area for better readability of the result.

Results

The reading Job is configured.