Skip to main content Skip to complementary content

Security configuration (optional)

Procedure

  1. Modify the Bean KafkaAvroSerializerBean inside Kafka_Avro_Beans with the following content:
    package org.example.local_project.beansjar.kafka_avro_beans;
    
    import java.util.HashMap;
    import java.util.Map;
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
    import io.confluent.kafka.serializers.KafkaAvroSerializer;
    import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;
    
    public class KafkaAvroSerializerBean extends KafkaAvroSerializer {
    
    	@Override
    	public void configure(KafkaAvroSerializerConfig config) {
    
          	Map<String, String> configs = new HashMap<String, String>();
          
    configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
       config.getString("basic.auth.credentials.source"));
    configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, 
       config.getPassword("basic.auth.user.info").value());       		
           this.schemaRegistry = new CachedSchemaRegistryClient(  
              config.getSchemaRegistryUrls(), 1000, configs);
           this.strategyUsesSchema(true);
           this.autoRegisterSchema = true;
    //	  this.useSchemaReflection = true;
    //	  this.normalizeSchema = true;
    //	  this.useLatestVersion = true; 
    //	  this.avroReflectionAllowNull = true;
    	}
    }
  2. Modity the Bean KafkaAvroDeserializerBean inside Kafka_Avro_Beans with the following content:
    package org.example.local_project.beansjar.kafka_avro_beans;
     
    import java.util.HashMap;
    import java.util.Map;
    import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
    import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientConfig;
    import io.confluent.kafka.serializers.KafkaAvroDeserializer;
    import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
     
    public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer {
     
    @Override
    public void configure(KafkaAvroDeserializerConfig config) {
        Map<String, String> configs = new HashMap<String, String>();
          
        configs.put(SchemaRegistryClientConfig.BASIC_AUTH_CREDENTIALS_SOURCE,
            config.getString("basic.auth.credentials.source"));
        configs.put(SchemaRegistryClientConfig.USER_INFO_CONFIG, 
            config.getPassword("basic.auth.user.info").value());       		
        this.schemaRegistry = new CachedSchemaRegistryClient(  
            config.getSchemaRegistryUrls(), 1000, configs);
        this.useSpecificAvroReader = false;
    }}
  3. Create context variables for the Route, API_KEY of String type and API_SECRET of Password type with values of API KEY and API SECRET issued by Confluent.
    For more information on context parameters, see Using contexts and variables.
    For more information about API KEY and API SECRET, see Confluent documentation.
  4. Create context parameters SCHEMA_REGISTRY_KEY of String type and SCHEMA_REGISTRY_SECRET of Password type with values of SCHEMA REGISTRY KEY and SCHEMA REGISTRY SECRET issued by Confluent.
  5. In the Advanced settings tab of the cKafka components SendMessageToKafka and ReceiveMessageFromKafka, add the following parameters in the Kafka Properties field.
    Name Value

    "additionalProperties.basic.auth.user.info"

    "RAW(" + context.SCHEMA_REGISTRY_KEY + ":" + context.SCHEMA_REGISTRY_SECRET+")"

    "additionalProperties.basic.auth.credentials.source"

    "USER_INFO"

    "additionalProperties.security.protocol"

    "SASL_SSL"

    "additionalProperties.sasl.jaas.config"

    "org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ context.API_KEY +"' password='" + context.API_SECRET + "';"

    "additionalProperties.sasl.mechanism"

    "PLAIN"

    For more information on the security configuration, see the Confluent Kafka security documentation https://docs.confluent.io/platform/current/security/auth-overview.html.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – let us know how we can improve!