Accéder au contenu principal Passer au contenu complémentaire

Créer un JAR d'un bean personnalisé

Procédure

  1. Dans la vue Repository, cliquez-droit sur Code > Custom Bean Jars et sélectionnez Create Bean Jar dans le menu contextuel.
  2. Dans la boîte de dialogue New Bean Jar, saisissez Kafka_Avro_Beans dans le champ Name et cliquez sur Finish.
  3. Cliquez-droit sur Kafka_Avro_Beans dans le nœud Custom Bean Jars et sélectionnez Create Bean dans le menu contextuel.
  4. L'assistant New Bean s'ouvre. Dans le champ Name, saisissez KafkaAvroSerializerBean et cliquez sur Finish.
  5. Saisissez le code suivant dans l'espace de modélisation graphique.
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    
    public class KafkaAvroSerializerBean extends KafkaAvroSerializer {
    
    	@Override
    	public void configure(KafkaAvroSerializerConfig config) {
    		this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000);
    		this.strategyUsesSchema(true);
    		this.autoRegisterSchema = true;
    //		this.useSchemaReflection = true;
    //		this.normalizeSchema = true;
    //		this.useLatestVersion = true; 
    //		this.avroReflectionAllowNull = true;
    	}
    }
  6. Appuyez sur Ctrl+S pour sauvegarder votre bean.
  7. Cliquez-droit sur Kafka_Avro_Beans dans le nœud Custom Bean Jars à nouveau et créez un Bean du nom KafkaAvroDeserializerBean, avec la contenu suivant :
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    
    public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer {
    
    	@Override
    	public void configure(KafkaAvroDeserializerConfig config) {
    		this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000);
    		this.useSpecificAvroReader = false;
    	}
    }
  8. Répétez cette opération pour créer un Bean dans Kafka_Avro_Beans, du nom KafkaAvroConverterBean, avec le contenu suivant :
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import java.io.ByteArrayOutputStream;
    import java.nio.charset.StandardCharsets;
    
    import org.apache.avro.Schema;
    
    import org.apache.avro.generic.GenericData.Record;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericDatumWriter;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.DatumWriter;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.avro.io.EncoderFactory;
    import org.apache.avro.io.JsonEncoder;
    
    
    public class KafkaAvroConverterBean {
    
    	// AVRO schema
    	static Schema  schema = new Schema.Parser().parse( 
    			"{" 
    	         + "\"doc\": \"Sample schema to help you get started.\"," 
    	         + "\"fields\": [" 
    		     + "{"
    			 + "\"doc\": \"The int type is a 32-bit signed integer.\"," 
    		     + "\"name\": \"my_field1\","
    			 + "\"type\": \"int\"" 
    			 + "}," 
    			 + "{"
    			 + "\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\","
    			 + "\"name\": \"my_field2\"," 
    			 + "\"type\": \"int\"" 
    			 + "}," 
    			 + "{"
    			 + "\"doc\": \"The string is a unicode character sequence.\"," 
    			 + "\"name\": \"my_field3\","
    			 + "\"type\": \"string\"" 
    			 + "}" 
    			 + "]," 
    			 + "\"name\": \"AvroSample\","
    			 + "\"namespace\": \"talend\"," 
    			 + "\"type\": \"record\"" 
    			 + "}");
    	
    	
    	public static String avroToJsonString(Object body) throws RuntimeException {
    		try {
    			ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DatumWriter<Record> writer = new GenericDatumWriter<Record>(schema);
                JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
                writer.write((Record)body, encoder);
                encoder.flush();
                baos.flush();
                return new String(baos.toByteArray(), StandardCharsets.UTF_8);
    	    } catch (Exception e) {
    		      throw new RuntimeException(
    		          String.format("Error coverting Avro to Json of schema %s", schema), e);
    	    } 
    	}
    	
    	public static Record jsonStringToAvro(Object jsonString) throws RuntimeException {
    	    try {
    	      DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
    	      return (Record)reader.read(null, DecoderFactory.get().jsonDecoder(schema, (String)jsonString));
    	    } catch (Exception e) {
    		      throw new RuntimeException(
    		          String.format("Error coverting json %s to Avro of schema %s", jsonString, schema), e);
    	    }
    	}
    }
  9. Cliquez-droit sur Kafka_Avro_Beans dans le nœud Custom Bean Jars et sélectionnez Edit Bean Jar Libraries dans le menu contextuel.
  10. Ajoutez les dépendances suivantes.
  11. Créez une Route pour l'échange de messages avec le broker de message Confluent Kafka.
  12. Cliquez-droit sur la Route créée et sélectionnez Setup Code dependencies dans le menu contextuel.
  13. Dans la boîte de dialogue Setup Code dependencies, sélectionnez Kafka_Avro_Beans dans l'onglet Custom Bean Jars et cliquez sur OK.

Cette page vous a-t-elle aidé ?

Si vous rencontrez des problèmes sur cette page ou dans son contenu – une faute de frappe, une étape manquante ou une erreur technique – faites-le-nous savoir.