メイン コンテンツをスキップする 補完的コンテンツへスキップ

Bean Jarを作成

手順

  1. リポジトリーツリービューで[Code] (コード) > [Custom Bean Jars] (カスタムBean Jar)を右クリックし、コンテキストメニューから[Create Bean Jar] (Bean Jarを作成)を選択します。
  2. [New Bean Jar] (新規Bean Jar)ダイアログボックスの[Name] (名前)フィールドにKafka_Avro_Beansと入力し、[Finish] (終了)をクリックします。
  3. [Custom Bean Jars] (カスタムBean Jar)の下にあるKafka_Avro_Beansを右クリックし、コンテキストメニューで[Create Bean] (Beanを作成)を選択します。
  4. [New Bean] (新規Bean)ウィザードが開きます。[Name] (名前)フィールドにKafkaAvroSerializerBeanと入力し、[Finish] (終了)をクリックします。
  5. デザインワークスペースに次のコードを入力します。
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    
    public class KafkaAvroSerializerBean extends KafkaAvroSerializer {
    
    	@Override
    	public void configure(KafkaAvroSerializerConfig config) {
    		this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000);
    		this.strategyUsesSchema(true);
    		this.autoRegisterSchema = true;
    //		this.useSchemaReflection = true;
    //		this.normalizeSchema = true;
    //		this.useLatestVersion = true; 
    //		this.avroReflectionAllowNull = true;
    	}
    }
  6. [Ctrl]+[S]を押し、Beanを保存します。
  7. [Custom Bean Jars] (カスタムBean Jar)ノードの下にあるKafka_Avro_Beansを再び右クリックし、KafkaAvroDeserializerBeanという名前と次の内容でBeanを作成します:
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
    
    public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer {
    
    	@Override
    	public void configure(KafkaAvroDeserializerConfig config) {
    		this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000);
    		this.useSpecificAvroReader = false;
    	}
    }
  8. この操作を繰り返し、Kafka_Avro_Beans内にKafkaAvroConverterBeanという名前と次の内容でBeanを作成します:
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import java.io.ByteArrayOutputStream;
    import java.nio.charset.StandardCharsets;
    
    import org.apache.avro.Schema;
    
    import org.apache.avro.generic.GenericData.Record;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericDatumWriter;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.DatumWriter;
    import org.apache.avro.io.DecoderFactory;
    import org.apache.avro.io.EncoderFactory;
    import org.apache.avro.io.JsonEncoder;
    
    
    public class KafkaAvroConverterBean {
    
    	// AVRO schema
    	static Schema  schema = new Schema.Parser().parse( 
    			"{" 
    	         + "\"doc\": \"Sample schema to help you get started.\"," 
    	         + "\"fields\": [" 
    		     + "{"
    			 + "\"doc\": \"The int type is a 32-bit signed integer.\"," 
    		     + "\"name\": \"my_field1\","
    			 + "\"type\": \"int\"" 
    			 + "}," 
    			 + "{"
    			 + "\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\","
    			 + "\"name\": \"my_field2\"," 
    			 + "\"type\": \"int\"" 
    			 + "}," 
    			 + "{"
    			 + "\"doc\": \"The string is a unicode character sequence.\"," 
    			 + "\"name\": \"my_field3\","
    			 + "\"type\": \"string\"" 
    			 + "}" 
    			 + "]," 
    			 + "\"name\": \"AvroSample\","
    			 + "\"namespace\": \"talend\"," 
    			 + "\"type\": \"record\"" 
    			 + "}");
    	
    	
    	public static String avroToJsonString(Object body) throws RuntimeException {
    		try {
    			ByteArrayOutputStream baos = new ByteArrayOutputStream();
                DatumWriter<Record> writer = new GenericDatumWriter<Record>(schema);
                JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false);
                writer.write((Record)body, encoder);
                encoder.flush();
                baos.flush();
                return new String(baos.toByteArray(), StandardCharsets.UTF_8);
    	    } catch (Exception e) {
    		      throw new RuntimeException(
    		          String.format("Error coverting Avro to Json of schema %s", schema), e);
    	    } 
    	}
    	
    	public static Record jsonStringToAvro(Object jsonString) throws RuntimeException {
    	    try {
    	      DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
    	      return (Record)reader.read(null, DecoderFactory.get().jsonDecoder(schema, (String)jsonString));
    	    } catch (Exception e) {
    		      throw new RuntimeException(
    		          String.format("Error coverting json %s to Avro of schema %s", jsonString, schema), e);
    	    }
    	}
    }
  9. [Custom Bean Jars] (カスタムBean Jar)の下にあるKafka_Avro_Beansを右クリックし、コンテキストメニューで[Edit Bean Jar Libraries] (Bean Jarライブラリーを編集)を選択します。
  10. 次の依存項目を追加します。
  11. Confluent Kafkaメッセージブローカーとのメッセージ交換用のルートを作成します。
  12. 作成したルートを右クリックし、コンテキストメニューから[Setup Code dependencies] (コード依存項目を設定)を選択します。
  13. [Setup Code dependencies] (コード依存項目を設定)ダイアログボックスの[Custom Bean Jars] (カスタムBean Jar)タブでKafka_Avro_Beansを選択し、OKをクリックします。

このページは役に立ちましたか?

このページまたはコンテンツに、タイポ、ステップの省略、技術的エラーなどの問題が見つかった場合は、お知らせください。改善に役立たせていただきます。