About this task
Configure the reading Job.
Procedure
-
From the reading Job, double-click the tKafkaInput
component to open its Basic settings view and specify the
following parameters:
-
From the Output type drop-down list, select
ConsumerRecord.
When you use ConsumerRecord, the Avro records are classified as
Object in
Talend Studio as follows:
-
From the Version drop-down list, select the
version of the Kafka cluster to be used.
-
In the Broker list field, enter the address of
the broker nodes of the Kafka cluster to be used.
-
In the Topic name field, enter the name of the
topic from which tKafkaInput receives the feed of
messages.
-
In the Consumer group id field, enter the name
of the consumer group to which you want
tKafkaInput to belong.
-
Double-click the tJavaRow component to open its
Basic settings view and specify the following
parameters:
-
Click the […] button next to Edit
schema to open the Schema dialog
box.
-
Click the [+] button to add a column and give a
name to the column. For example:
-
Click OK to validate these changes and accept
the propagation prompted by the pop-up dialog box.
-
In the Code field, enter the Java code to
extract the content. For example:
org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord) input_row.record;
output_row.topic = record.topic();
output_row.partition = record.partition();
output_row.offset = record.offset();
output_row.timestamp = record.timestamp();
output_row.timestampType = record.timestampType();
output_row.header1 = record.headers().lastHeader("header1").value();
output_row.header2 = record.headers().lastHeader("header2").value();
output_row.key = (byte[]) record.key();
output_row.value = (byte[]) record.value();
-
Double-click the tLogRow component to open its
Basic settings view and specify the following
parameter:
-
Select Table (print values in cells of a table)
in the Mode area for better readability of the
result.
Results
The reading Job is configured.