tKafkaOutputの標準プロパティ - Cloud - 8.0

Kafka

Version
Cloud
8.0
Language
日本語
Product
Talend Big Data
Talend Big Data Platform
Talend Data Fabric
Talend Real-Time Big Data Platform
Module
Talend Studio
Content
ジョブデザインと開発 > サードパーティーシステム > メッセージングコンポーネント > Kafka
データガバナンス > サードパーティーシステム > メッセージングコンポーネント > Kafka
データクオリティとプレパレーション > サードパーティーシステム > メッセージングコンポーネント > Kafka
Last publication date
2024-03-07

これらのプロパティは、標準ジョブのフレームワークで実行されているtKafkaOutputを設定するために使われます。

標準tKafkaOutputコンポーネントは、インターネットファミリーに属しています。

このフレームワークのコンポーネントは、すべてのビッグデータ対応のTalend製品およびTalend Data Fabricで使用できます。

基本設定

[Input type] (入力タイプ)

ドロップダウンリストで、Kafkaに読み取らせるメッセージのタイプとして次のいずれかを選択します。
  • byte[]: コンポーネントはシリアライズされたメッセージをバイト配列に受信します。
  • ProducerRecord: コンポーネントは、キー/値のペアとしてシリアライズされたメッセージを受信します。メッセージキーとメッセージ値はAvroとしてシリアライズできます。

これらのプロパティは、Kafka 2.2.1以降で利用できます。

[Schema] (スキーマ)[Edit schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

このコンポーネントのスキーマは読み取り専用です。公開するメッセージを保管します。

[Use an existing connection] (既存の接続を使用)

定義済みの接続の詳細を再利用する場合は、このチェックボックスをオンにして、[Component List] (コンポーネントリスト)ドロップダウンリストから、目的の接続コンポーネントを選択します。

[Version] (バージョン)

使うKafkaクラスターのバージョンを選択します。

Talendが提供する 8.0.1-R2024-02以降のTalend Studioマンスリーアップデートをインストール済みである場合、Kafka 2.4.x以前のバージョンは非推奨となります。

[Broker list] (ブローカーリスト)

使用するKafkaクラスターのブローカーノードのアドレスを入力します。

このアドレスの形式はhostname:portです。この情報は、このKafkaクラスター内のホスティングノードの名前とポートです。

複数のアドレスを指定する必要がある場合は、コンマ(,)で区切ります。

トピック名

メッセージを公開するトピックの名前を入力します。このトピックは既に存在している必要があります。

このプロパティは、[Input type] (入力タイプ)ドロップダウンリストでByte[]が選択されている場合のみ利用できます。

[Compress the data] (データを圧縮)

[Compress the data] (データの圧縮)チェックボックスをオンにすると、出力データが圧縮されます。

[Use SSL/TLS] (SSL/TLSを使用)

SSLまたはTLS暗号化接続を有効にする場合は、このチェックボックスを選択します。

このチェックボックスはKafka 0.9.0.1以降で使えます。

[Set keystore] (キーストアを設定)

このチェックボックスを選択すると、tSetKeystoreコンポーネント経由のSSLまたはTLS暗号化接続が有効になります。

次に、同じジョブ内のtSetKeyStoreコンポーネントを使用して暗号化情報を指定する必要があります。

このチェックボックスは、[Use SSL/TLS] (SSL/TLSを使用)チェックボックスをオンにすると使用できます。

注: このオプションは、Talendが提供する8.0.1-R2022-05以降のTalend Studioマンスリーアップデートをインストール済みである場合に利用できます。詳細は管理者にお問い合わせください。

[Use Kerberos authentication] (Kerberos認証を使用)

使用するKafkaクラスターをKerberosで保護する場合は、このチェックボックスをオンにして、定義する関連パラメーターを表示します。

  • JAAS configuration path (JAAS設定パス): パスを入力するか、Kafkaへのクライアントの認証を実行するジョブによって使われるJAAS設定ファイルを参照します。

    このJAASファイルには、kinitモードまたはkeytabモードのいずれかを使用して、クライアント(TalendによるKafka関連ジョブ)をKafkaブローカーノードに接続する方法が指定されています。JAASファイルは、これらのジョブが実行されるマシンに保管されている必要があります。

    Talend、Kerberos、またはKafkaはこのJAASファイルを提供していません。ユーザーの組織のセキュリティ戦略に応じて、Configuring Kafka clientの説明に従ってJAASファイルを作成する必要があります。

  • Kafka brokers principal name (Kafkaブローカーのプリンシパル名): ブローカークラスターの作成時にブローカーのために定義したKerberosのプリンシパルの主要部分を入力します。たとえばkafka/kafka1.hostname.com@EXAMPLE.COMというプリンシパルの場合、このフィールドへの入力に使用する主要部分はkafkaになります。

  • Set kinit command path (kinitコマンドパス): Kerberosは、そのkinit実行可能ファイルへのデフォルトのパスを使用します。このパスを変更した場合は、このチェックボックスをオンにしてカスタムアクセスパスを入力します。

    このチェックボックスをオフにする場合は、デフォルトのパスが使用されます。

  • Set Kerberos configuration path (Kerberos設定パス): Kerberosは、たとえば、Kerberos 5の設定ファイルであるkrb5.confファイル(または、Windowsの場合はkrb5.ini)へのデフォルトのパスを使用します。このパスを変更した場合は、このチェックボックスをオンにして、Kerberos設定ファイルへのカスタムアクセスパスを入力します。

    このチェックボックスをオフにすると、必要な設定情報を見つけるために、Kerberosによって所定の戦略が適用されます。この戦略の詳細は、Kerberos requirementsLocating the krb5.conf Configuration Fileをご覧ください。

KafkaクラスターをKerberosで保護する方法は、SASLを使用した認証をご覧ください。

このチェックボックスはKafka 0.9.0.1以降で使えます。

詳細設定

[Kafka properties] (Kafkaのプロパティ)

カスタマイズする必要がある新しいKafkaプロデューサープロパティをこのテーブルに追加します。

このテーブルで定義できる新しいプロデューサーのプロパティに関する詳細は、Kafkaの公式ドキュメンテーションで新しいプロデューサーの設定を説明しているセクションをご覧ください。

[Set Headers] (ヘッダーを設定)

送信するメッセージにヘッダーを追加するには、このチェックボックスを選択します。

この機能は、Kafka 1.1.0以降で使えます。

[Use schema registry] (スキーマレジストリーを使用)

このチェックボックスを選択すると、Confluent Schema Registryが使用され、定義する次の関連パラメーターが表示されます。
  • URL: スキーマレジストリインスタンスURLを入力します。
  • [Basic authentication] (基本認証): このチェックボックスをオンにして[Username] (ユーザー名)[Password] (パスワード)に認証情報を入力します。
  • [Set schema registry keystore] (スキーマレジストリーのキーストアを設定): このチェックボックスを選択すると、SSLまたはTLS暗号化接続が有効になります。次に、同じジョブ内のtSetKeystoreコンポーネントを使用して暗号化情報を指定する必要があります。Kafka SSL設定がスキーマレジストリーに再利用されるため、このチェックボックスは、コンポーネントの[Basic settings] (基本設定)ビューでtSetKeystoreが既に設定されている場合には利用できません。
  • [Key deserializer] (キーデシリアライザー)[Value deserializer] (値デシリアライザー): キーと値に使用するスキーマ形式をドロップダウンリストから選択します。デフォルトのカスタムシリアライザーはorg.apache.kafka.common.serialization.ByteArraySerializerです。

スキーマレジストリーの詳細は、Confluentのドキュメンテーションをご覧ください。

このオプションは、[Basic settings] (基本設定)ビューで[Input type] (入力タイプ)ドロップダウンリストからProducerRecordが選択されている場合のみ利用できます。

注: このオプションは、Talendが提供する8.0.1-R2022-01以降のTalend Studioマンスリーアップデートをインストール済みである場合に利用できます。詳細は管理者にお問い合わせください。

[tStatCatcher Statistics] (tStatCatcher統計)

このチェックボックスを選択すると、ジョブレベルおよび各コンポーネントレベルで処理メタデータが収集されます。

グローバル変数

ERROR_MESSAGE

エラー発生時にコンポーネントによって生成されるエラーメッセージ。これはAfter変数で、文字列を返します。この変数は、[Die on error] (エラー発生時に強制終了)チェックボックスがオンになっている場合のみ機能します。

NB_LINE

処理された行数。これはAfter変数で、整数を返します。

NB_ERRORS

The number of rows processed with errors. これはAfter変数で、整数を返します。

NB_SUCCESS

正しく処理された行数。これはAfter変数で、整数を返します。

使用方法

使用ルール

このコンポーネントは、終了コンポーネントです。受信データをシリアライズされたバイト配列に変換するには、tJavaRowまたはtJavaコンポーネントが必要です。

次のサンプルは、この変換を実行するステートメントを作成する方法を示しています。

output_row.serializedValue = input_row.users.getBytes();

このコードでは、output_row変数はtKafkaOutputに出力されるデータのスキーマを、output_row.serializedValueはそのスキーマの単一の読み取り専用カラムを、input_row変数は受信データのスキーマを、input_row.usersgetBytes()メソッドによってバイト配列に変換されるusersという名前の入力カラムを表します。