Skip to main content Skip to complementary content

Creating a Bean Jar

Procedure

  1. From the repository tree view, right-click Code > Custom Bean Jars and select Create Bean Jar from the contextual menu.
  2. In the New Bean Jar dialog box, enter Kafka_Avro_Beans in the Name field and click Finish.
  3. Right-click Kafka_Avro_Beans under the Custom Bean Jars node and select Create Bean from the contextual menu.
  4. The New Bean wizard opens. In the Name field, enter KafkaAvroSerializerBean and click Finish.
  5. Enter the following code in the design workspace.
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    
    public class KafkaAvroSerializerBean extends KafkaAvroSerializer {
    
    	@Override
    	public void configure(KafkaAvroSerializerConfig config) {
    		this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000);
    		this.strategyUsesSchema(true);
    		this.autoRegisterSchema = true;
    //		this.useSchemaReflection = true;
    //		this.normalizeSchema = true;
    //		this.useLatestVersion = true; 
    //		this.avroReflectionAllowNull = true;
    	}
    }
  6. Press Ctrl+S to save your bean.
  7. Right-click Kafka_Avro_Beans under the Custom Bean Jars node again and create a Bean with the name KafkaAvroDeserializerBean and the following content:
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    
    public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer {
    
    	@Override
    	public void configure(KafkaAvroDeserializerConfig config) {
    		this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000);
    		this.useSpecificAvroReader = false;
    	}
    }
  8. Repeat this operation to create a Bean inside Kafka_Avro_Beans with the name KafkaAvroConverterBean and the following content:
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import java.io.ByteArrayOutputStream;
    import java.nio.charset.StandardCharsets;
    
    import org.apache.avro.Schema;
    
    import org.apache.avro.generic.GenericData.Record;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericDatumWriter;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.DatumWriter;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.avro.io.EncoderFactory;
    import org.apache.avro.io.JsonEncoder;
    
    
    public class KafkaAvroConverterBean {
    
    	// AVRO schema
    	static Schema  schema = new Schema.Parser().parse( 
    			"{" 
    	         + "\"doc\": \"Sample schema to help you get started.\"," 
    	         + "\"fields\": [" 
    		     + "{"
    			 + "\"doc\": \"The int type is a 32-bit signed integer.\"," 
    		     + "\"name\": \"my_field1\","
    			 + "\"type\": \"int\"" 
    			 + "}," 
    			 + "{"
    			 + "\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\","
    			 + "\"name\": \"my_field2\"," 
    			 + "\"type\": \"int\"" 
    			 + "}," 
    			 + "{"
    			 + "\"doc\": \"The string is a unicode character sequence.\"," 
    			 + "\"name\": \"my_field3\","
    			 + "\"type\": \"string\"" 
    			 + "}" 
    			 + "]," 
    			 + "\"name\": \"AvroSample\","
    			 + "\"namespace\": \"talend\"," 
    			 + "\"type\": \"record\"" 
    			 + "}");
    	
    	
    	public static String avroToJsonString(Object body) throws RuntimeException {
    		try {
    			ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DatumWriter<Record> writer = new GenericDatumWriter<Record>(schema);
                JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
                writer.write((Record)body, encoder);
                encoder.flush();
                baos.flush();
                return new String(baos.toByteArray(), StandardCharsets.UTF_8);
    	    } catch (Exception e) {
    		      throw new RuntimeException(
    		          String.format("Error coverting Avro to Json of schema %s", schema), e);
    	    } 
    	}
    	
    	public static Record jsonStringToAvro(Object jsonString) throws RuntimeException {
    	    try {
    	      DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
    	      return (Record)reader.read(null, DecoderFactory.get().jsonDecoder(schema, (String)jsonString));
    	    } catch (Exception e) {
    		      throw new RuntimeException(
    		          String.format("Error coverting json %s to Avro of schema %s", jsonString, schema), e);
    	    }
    	}
    }
  9. Right-click Kafka_Avro_Beans under the Custom Bean Jars node and select Edit Bean Jar Libraries from the contextual menu.
  10. Add the following dependencies.
  11. Create a Route for the message exchange with Confluent Kafka message broker.
  12. Right-click the Route you just created and select Setup Code dependencies from the contextual menu.
  13. In the Setup Code dependencies dialog box, select Kafka_Avro_Beans in the Custom Bean Jars tab and click OK.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – let us know how we can improve!