Propriétés du tKafkaInput Standard - Cloud - 8.0

Kafka

Version
Cloud
8.0
Language
Français (France)
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Open Studio for Big Data
Talend Real-Time Big Data Platform
Module
Studio Talend
Content
Création et développement > Systèmes tiers > Composants Messaging (Intégration) > Composants Kafka
Gouvernance de données > Systèmes tiers > Composants Messaging (Intégration) > Composants Kafka
Qualité et préparation de données > Systèmes tiers > Composants Messaging (Intégration) > Composants Kafka

Ces propriétés sont utilisées pour configurer le tKafkaInput s'exécutant dans le framework de Jobs Standard.

Le composant tKafkaInput Standard appartient à la famille Internet.

Le composant de ce framework est disponible dans tous les produits Talend avec Big Data et dans Talend Data Fabric.

Paramètres simples

Schema et Edit schema

Un schéma est une description de lignes. Il définit le nombre de champs (colonnes) à traiter et à passer au composant suivant. Lorsque vous créez un Job Spark, évitez le mot réservé line lors du nommage des champs.

Notez que le schéma de ce composant est en lecture seule. Il stocke les messages envoyés du producteur de messages.

Output type

Sélectionnez dans la liste déroulante le type de données à envoyer au composant suivant :
  • String : le composant envoie des messages sérialisés en chaînes de caractères.
  • byte[] : le composant envoie des messages sérialisés en tableaux d'octets.
  • ConsumerRecord : le composant envoie des messages sérialisés en paires de clé/valeur. La clé et la valeur du message peuvent être sérialisés comme Avro.

De manière générale, il est recommandé d'utiliser des types String, car le tKafkaInput peut traduire automatiquement les messages Kafka byte[] en chaînes de caractères à traiter par le Job. Cependant, si ce format de messages Kafka n'est pas connu par le tKafkaInput, comme le Protobuf, vous pouvez sélectionner byte[] et utiliser un composant Custom code comme le tJavaRow afin de désérialiser les messages en chaînes de caractères afin que les autres composants du même Job puissent traiter ces messages.

Use an existing connection

Cochez cette case et sélectionnez le composant de connexion adéquat dans la liste Component list pour réutiliser les paramètres d'une connexion que vous avez déjà définie.

Version

Sélectionnez la version du cluster Kafka à utiliser.

Zookeeper quorum list

Saisissez l'adresse du service Zookeeper pour le cluster Kafka à utiliser.

L'adresse doit se présenter sous la forme suivante : hostname:port. Ces informations contiennent le nom et le port du nœud hébergeant dans le cluster Kafka.

Si vous devez spécifier plusieurs adresses, séparez-les à l'aide d'une virgule (,).

Ce champ est disponible uniquement pour Kafka 0.8.2.0.

Broker list

Saisissez les adresses des nœuds du broker du cluster Kafka à utiliser.

L'adresse doit se présenter sous la forme suivante : hostname:port. Ces informations contiennent le nom et le port du nœud hébergeant dans le cluster Kafka.

Si vous devez spécifier plusieurs adresses, séparez-les à l'aide d'une virgule (,).

Ce champ est disponible depuis Kafka 0.9.0.1.

Topic name

Saisissez le nom du topic depuis lequel le tKafkaInput reçoit le flux de messages.

Consumer group ID

Saisissez le nom du groupe de consommateurs auquel vous souhaitez que le consommateur courant (le tKafkaInput) appartienne.

Ce groupe de consommateurs sera créé lors de l'exécution s'il n'existe pas.

Reset offsets on consumer group

Cochez cette case pour supprimer les offsets sauvegardés pour le groupe de consommateurs à utiliser, afin que ce groupe de consommateurs soit géré comme un nouveau groupe n'ayant consommé aucun message.

New consumer group starts from

Sélectionnez le point de départ duquel les messages d'un topic sont consommés.

Dans Kafka, le numéro d'ID augmentant d'un message se nomme offset. Lorsqu'un nouveau groupe de consommateurs démarre, dans cette liste, vous pouvez sélectionner beginning pour commencer la consommation depuis le message le plus ancien du topic entier ou sélectionner latest pour attendre un nouveau message.

Notez que le groupe de consommateurs prend en compte uniquement les messages dont l'offset a été commité comme point de départ.

Chaque groupe de consommateurs possède son propre compteur pour se rappeler la position d'un message consommé. Pour cette raison, une fois qu'un groupe de consommateurs a commencé à consommer des messages d'un topic donné, un groupe de consommateurs reconnaît le message le plus récent en voyant simplement la position où son groupe a arrêté la consommation, plutôt que le topic complet. Partant de ce principe, les comportements suivants peuvent être attendus :

  • Si vous reprenez un groupe de consommateurs existant, cette option détermine le point de départ de ce groupe de consommateurs uniquement s'il n'a pas déjà de point de départ commité. Sinon, ce groupe de consommateurs démarre du point de départ commité. Par exemple, un topic contient 100 messages. Si un groupe de consommateurs existant a traité 50 messages et a commité leurs offsets, le même groupe de consommateurs reprend à partir de l'offset 51.

  • Si vous créez un nouveau groupe de consommateurs ou en réinitialisez un existant, ce qui signifie que ce groupe n'a consommé aucun message de ce topic, lorsque vous le démarrez depuis le dernier message, ce nouveau groupe démarre et attend l'offset 101.

Auto-commit offsets

Cochez cette case afin de permettre au tKafkaInput de sauvegarder automatiquement son état de consommation à la fin de chaque intervalle de temps donné. Vous devez définir cet intervalle dans le champ Interval affiché.

Notez que les offsets sont commités uniquement à la fin de chaque intervalle. Si votre Job s'arrête au milieu d'un intervalle, l'était de consommation du message dans cet intervalle n'est pas commité.

Stop after a maximum total duration (ms)

Cochez cette case et, dans le champ qui s'affiche, saisissez la durée (en millisecondes) à la fin de laquelle le tKafkaInput arrête de s'exécuter.

Stop after receiving a maximum number of messages

Cochez cette case et, dans le champ qui s'affiche, saisissez le nombre maximal de messages que vous souhaitez que le tKafkaInput reçoive, avant d'arrêter automatiquement son exécution.

Stop after maximum time waiting between messages (ms)

Cochez cette case et, dans le champ qui s'affiche, saisissez le temps (en millisecondes) durant lequel le tKafkaInput attend un nouveau message. Si le tKafkaInput ne reçoit pas de nouveau message lorsque ce temps d'attente est écoulé, il arrête de s’exécuter.

Use SSL/TLS

Cochez cette case pour activer la connexion chiffrée SSL ou TLS.

Cette case est disponible depuis Kafka 0.9.0.1.

Set keystore

Cochez cette case pour activer la connexion chiffrée SSL ou TLS via un composant tSetKeystore.

Utilisez le composant tSetKeystore dans le même Job afin de spécifier les informations de chiffrement.

Cette case est disponible uniquement lorsque vous cochez la case Use SSL/TLS.

Remarque : Cette option est disponible si vous avez installé la mise à jour mensuelle 8.0.1-R2022-05 du Studio ou une plus récente fournie par Talend. Pour plus d'informations, contactez votre administrateur ou administratrice.

Use Kerberos authentication

Si le cluster Kafka à utiliser est sécurisé par Kerberos, cochez cette case pour afficher les paramètres associés à définir :

  • JAAS configuration path : saisissez le chemin d'accès ou parcourez votre système jusqu'au fichier de configuration JAAS à utiliser par le Job pour authentification en tant que client à Kafka.

    Le fichier JAAS décrit comment les clients, les Jobs Kafka en termes Talend peuvent se connecter aux nœuds du broker Kafka, en utilisant soit le mode kinit, soit le mode keytab. Il doit être stocké sur la machine où sont exécutés les Jobs.

    Talend , Kerberos ou Kafka ne fournissent pas ce fichier JAAS. Vous devez le créer en suivant les explications dans Configuring Kafka client (en anglais), selon la stratégie de sécurité de votre entreprise.

  • Kafka brokers principal name : saisissez le membre primaire du Principal Kerberos défini pour les brokers lorsque vous avez créé le cluster de brokers. Par exemple, dans ce Principal kafka/kafka1.hostname.com@EXAMPLE.COM, le membre primaire à utiliser pour renseigner ce champ est kafka.

  • Set kinit command path : Kerberos utilise un chemin par défaut pour son exécutable kinit. Si vous avez modifié ce chemin, cochez cette case et saisissez votre chemin d'accès personnalisé.

    Si vous laissez cette case décochée, le chemin par défaut est utilisé.

  • Set Kerberos configuration path : Kerberos utilise un chemin par défaut vers son fichier de configuration, le fichier krb5.conf (ou krb5.ini sous Windows) pour Kerberos 5 par exemple. Si vous avez modifié ce chemin, cochez cette case et saisissez le chemin d'accès personnalisé au fichier de configuration Kerberos.

    Si vous laissez cette case décochée, une stratégie donnée est appliquée par Kerberos pour tenter de trouver les informations de configuration nécessaires. Pour plus d'informations concernant cette stratégie, consultez la section Locating the krb5.conf Configuration File dans Kerberos requirements (en anglais).

Pour plus d'informations concernant la manière dont est sécurisé un cluster Kafka via Kerberos, consultez Authenticating using SASL (en anglais).

Cette case est disponible depuis Kafka 0.9.0.1.

Paramètres avancés

Kafka properties

Ajoutez les propriétés de consommation Kafka nécessaires pour personnaliser cette table. Par exemple, configurez une valeur spécifique zookeeper.connection.timeout.ms pour éviter l'exception ZkTimeoutException.

Vous pouvez également configurer les propriétés de sécurité, comme le chiffrement SSL avec ssl.truststore.location ou ssl.keystore.location.

Pour plus d'informations concernant les propriétés de consommation à définir dans cette table, consultez la section décrivant la configuration du consommateur dans la documentation Kafka, à l'adresse suivante : http://kafka.apache.org/documentation.html#consumerconfigs (en anglais).

Apply security properties after advanced Kafka properties

Cochez cette case pour prendre en compte d'abord les propriétés de sécurité configurées dans la table Kafka properties, dans la vue Advanced settings, plutôt que les propriétés de sécurité configurées dans le composant tSetKeyStore lorsque la case Use SSL/TLS est cochée, dans la vue Basic settings.

Timeout precision(ms)

Saisissez, en millisecondes, la durée à la suite de laquelle vous souhaitez retourner une exception de suspension si aucun message n'est disponible à la consommation.

La valeur -1 indique qu'aucun délai avant suspension n'est configuré.

Use schema registry

Cochez cette case pour utiliser Confluent Schema Registry et afficher les paramètres associés à définir 
  • URL : saisissez l'URL de l'instance de Schema Registry.
  • Basic authentication : cochez cette case et saisissez vos identifiants dans les champs Username et Password.
  • Set schema registry keystore : cochez cette case pour activer la connexion chiffrée SSL ou TLS. Utilisez le composant tSetKeystore dans le même Job afin de spécifier les informations de chiffrement. Cette case n'est pas disponible lorsque vous avez déjà configuré un tSetKeystore dans la vue Basic settings du composant, car la configuration du SSL Kafka est réutilisée pour le registre du schéma.
  • Key deserializer et Value deserializer : sélectionnez le format de schéma à utiliser pour la clé et la valeur, dans la liste déroulante. Le sérialiseur Custom par défaut est org.apache.kafka.common.serialization.ByteArraySerializer.

Pour plus d'informations concernant Schema Registry, consultez la documentation Confluent (en anglais).

Cette option est disponible uniquement lorsque vous sélectionnez ConsumerRecord dans la liste déroulante Output Type de la vue Basic settings.

Remarque : Cette option est disponible si vous avez installé la mise à jour mensuelle 8.0.1-R2022-01 du Studio ou une plus récente fournie par Talend. Pour plus d'informations, contactez votre administrateur ou administratrice.

Load the offset with the message

Cochez cette case pour écrire en sortie les offsets des messages consommés au composant suivant. Lorsque vous cochez cette case, une colonne offset en lecture seule est ajoutée au schéma.

Cette propriété est disponible uniquement lorsque vous sélectionnez String ou byte[] dans la liste déroulante Output type dans la vue Basic settings.

Custom encoding

Il est possible de rencontrer des problèmes d'encodage lorsque vous traitez les données stockées. Dans ce cas, cochez cette case pour afficher la liste Encoding.

Sélectionnez l'encodage à partir de la liste ou sélectionnez Custom et définissez-le manuellement.

Cette propriété est disponible uniquement lorsque vous sélectionnez String ou byte[] dans la liste déroulante Output type dans la vue Basic settings.

Statistiques du tStatCatcher

Cochez cette case pour collecter les métadonnées de traitement au niveau du Job ainsi qu'au niveau de chaque composant.

Variables globales

ERROR_MESSAGE

Message d'erreur généré par le composant lorsqu'une erreur survient. Cette variable est une variable After et retourne une chaîne de caractères.

Utilisation

Usage rule

Ce composant est utilisé en tant que composant de début et nécessite un lien de sortie. Lorsque le topic Kafka à utiliser n'existe pas, il peut être utilisé avec le composant tKafkaCreateTopic pour lire le topic créé par ce dernier.